Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=1157238&r1=1157237&r2=1157238&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Fri Aug 12 20:29:29 2011 @@ -20,16 +20,22 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InterruptedIOException; -import java.net.*; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; + import javax.net.SocketFactory; + import org.apache.activemq.Service; import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.transport.Transport; @@ -64,7 +70,6 @@ public class TcpTransport extends Transp protected DataInputStream dataIn; protected TimeStampStream buffOut = null; - /** * The Traffic Class to be set on the socket. */ @@ -636,7 +641,6 @@ public class TcpTransport extends Transp return receiveCounter; } - /** * @param sock The socket on which to set the Traffic Class. * @return Whether or not the Traffic Class was set on the given socket.
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=1157238&r1=1157237&r2=1157238&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Fri Aug 12 20:29:29 2011 @@ -79,6 +79,7 @@ public class TcpTransportFactory extends return new TcpTransportServer(this, location, serverSocketFactory); } + @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { TcpTransport tcpTransport = (TcpTransport)transport.narrow(TcpTransport.class); @@ -98,11 +99,10 @@ public class TcpTransportFactory extends boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true")); if (useInactivityMonitor && isUseInactivityMonitor(transport)) { - transport = new InactivityMonitor(transport, format); + transport = createInactivityMonitor(transport, format); IntrospectionSupport.setProperties(transport, options); } - // Only need the WireFormatNegotiator if using openwire if (format instanceof OpenWireFormat) { transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion()); @@ -162,4 +162,8 @@ public class TcpTransportFactory extends protected SocketFactory createSocketFactory() throws IOException { return SocketFactory.getDefault(); } + + protected Transport createInactivityMonitor(Transport transport, WireFormat format) { + return new InactivityMonitor(transport, format); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=1157238&r1=1157237&r2=1157238&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Fri Aug 12 20:29:29 2011 @@ -27,7 +27,6 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.HashMap; -import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java?rev=1157238&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java Fri Aug 12 20:29:29 2011 @@ -0,0 +1,546 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.io.DataInputStream; +import java.io.IOException; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; + +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Stomp11Test extends CombinationTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(StompTest.class); + + protected String bindAddress = "stomp://localhost:61613"; + protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml"; + protected String jmsUri = "vm://localhost"; + + private BrokerService broker; + private StompConnection stompConnection = new StompConnection(); + + @Override + protected void setUp() throws Exception { + + broker = BrokerFactory.createBroker(new URI(confUri)); + broker.start(); + broker.waitUntilStarted(); + + stompConnect(); + } + + private void stompConnect() throws IOException, URISyntaxException, UnknownHostException { + URI connectUri = new URI(bindAddress); + stompConnection.open(createSocket(connectUri)); + } + + protected Socket createSocket(URI connectUri) throws IOException { + return new Socket("127.0.0.1", connectUri.getPort()); + } + + protected String getQueueName() { + return getClass().getName() + "." + getName(); + } + + @Override + protected void tearDown() throws Exception { + try { + stompDisconnect(); + } catch(Exception e) { + // Some tests explicitly disconnect from stomp so can ignore + } finally { + broker.stop(); + broker.waitUntilStopped(); + } + } + + private void stompDisconnect() throws IOException { + if (stompConnection != null) { + stompConnection.close(); + stompConnection = null; + } + } + + public void testConnect() throws Exception { + + String connectFrame = "STOMP\n" + + "login: system\n" + + "passcode: manager\n" + + "accept-version:1.1\n" + + "host:localhost\n" + + "request-id: 1\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + assertTrue(f.indexOf("response-id:1") >= 0); + assertTrue(f.indexOf("version:1.1") >= 0); + assertTrue(f.indexOf("session:") >= 0); + + String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testConnectWithVersionOptions() throws Exception { + + String connectFrame = "STOMP\n" + + "login: system\n" + + "passcode: manager\n" + + "accept-version:1.0,1.1\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + assertTrue(f.indexOf("version:1.1") >= 0); + assertTrue(f.indexOf("session:") >= 0); + + String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testConnectWithValidFallback() throws Exception { + + String connectFrame = "STOMP\n" + + "login: system\n" + + "passcode: manager\n" + + "accept-version:1.0,10.1\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + assertTrue(f.indexOf("version:1.0") >= 0); + assertTrue(f.indexOf("session:") >= 0); + + String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testConnectWithInvalidFallback() throws Exception { + + String connectFrame = "STOMP\n" + + "login: system\n" + + "passcode: manager\n" + + "accept-version:9.0,10.1\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("ERROR")); + assertTrue(f.indexOf("version") >= 0); + assertTrue(f.indexOf("message:") >= 0); + } + + public void testHeartbeats() throws Exception { + + String connectFrame = "STOMP\n" + + "login: system\n" + + "passcode: manager\n" + + "accept-version:1.1\n" + + "heart-beat:0,1000\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + + stompConnection.sendFrame(connectFrame); + String f = stompConnection.receiveFrame(); + assertTrue(f.startsWith("CONNECTED")); + assertTrue(f.indexOf("version:1.1") >= 0); + assertTrue(f.indexOf("heart-beat:") >= 0); + assertTrue(f.indexOf("session:") >= 0); + + LOG.debug("Broker sent: " + f); + + stompConnection.getStompSocket().getOutputStream().write('\n'); + + DataInputStream in = new DataInputStream(stompConnection.getStompSocket().getInputStream()); + in.read(); + { + long startTime = System.currentTimeMillis(); + int input = in.read(); + assertEquals("did not receive the correct hear beat value", '\n', input); + long endTime = System.currentTimeMillis(); + assertTrue("Broker did not send KeepAlive in time", (endTime - startTime) >= 900); + } + { + long startTime = System.currentTimeMillis(); + int input = in.read(); + assertEquals("did not receive the correct hear beat value", '\n', input); + long endTime = System.currentTimeMillis(); + assertTrue("Broker did not send KeepAlive in time", (endTime - startTime) >= 900); + } + + String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testHeartbeatsDropsIdleConnection() throws Exception { + + String connectFrame = "STOMP\n" + + "login: system\n" + + "passcode: manager\n" + + "accept-version:1.1\n" + + "heart-beat:1000,0\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + + stompConnection.sendFrame(connectFrame); + String f = stompConnection.receiveFrame(); + assertTrue(f.startsWith("CONNECTED")); + assertTrue(f.indexOf("version:1.1") >= 0); + assertTrue(f.indexOf("heart-beat:") >= 0); + assertTrue(f.indexOf("session:") >= 0); + LOG.debug("Broker sent: " + f); + + long startTime = System.currentTimeMillis(); + + try { + f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + fail(); + } catch(Exception e) { + } + + long endTime = System.currentTimeMillis(); + assertTrue("Broker did close idle connection in time.", (endTime - startTime) >= 1000); + } + + public void testRejectInvalidHeartbeats1() throws Exception { + + String connectFrame = "STOMP\n" + + "login: system\n" + + "passcode: manager\n" + + "accept-version:1.1\n" + + "heart-beat:0\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("ERROR")); + assertTrue(f.indexOf("heart-beat") >= 0); + assertTrue(f.indexOf("message:") >= 0); + } + + public void testRejectInvalidHeartbeats2() throws Exception { + + String connectFrame = "STOMP\n" + + "login: system\n" + + "passcode: manager\n" + + "accept-version:1.1\n" + + "heart-beat:T,0\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("ERROR")); + assertTrue(f.indexOf("heart-beat") >= 0); + assertTrue(f.indexOf("message:") >= 0); + } + + public void testRejectInvalidHeartbeats3() throws Exception { + + String connectFrame = "STOMP\n" + + "login: system\n" + + "passcode: manager\n" + + "accept-version:1.1\n" + + "heart-beat:100,10,50\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("ERROR")); + assertTrue(f.indexOf("heart-beat") >= 0); + assertTrue(f.indexOf("message:") >= 0); + } + + public void testSubscribeAndUnsubscribe() throws Exception { + + String connectFrame = "STOMP\n" + + "login: system\n" + + "passcode: manager\n" + + "accept-version:1.1\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(message); + + String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + + frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + Thread.sleep(2000); + + stompConnection.sendFrame(message); + + try { + frame = stompConnection.receiveFrame(); + LOG.info("Received frame: " + frame); + fail("No message should have been received since subscription was removed"); + } catch (SocketTimeoutException e) { + } + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testSubscribeWithNoId() throws Exception { + + String connectFrame = "STOMP\n" + + "login: system\n" + + "passcode: manager\n" + + "accept-version:1.1\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + + String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("ERROR")); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testUnsubscribeWithNoId() throws Exception { + + String connectFrame = "STOMP\n" + + "login: system\n" + + "passcode: manager\n" + + "accept-version:1.1\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + + String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + Thread.sleep(2000); + + frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("ERROR")); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testAckMessageWithId() throws Exception { + + String connectFrame = "STOMP\n" + + "login: system\n" + + "passcode: manager\n" + + "accept-version:1.1\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(message); + + String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n" + "ack:client\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + StompFrame received = stompConnection.receive(); + assertTrue(received.getAction().equals("MESSAGE")); + + frame = "ACK\n" + "subscription:12345\n" + "message-id:" + + received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testAckMessageWithNoId() throws Exception { + + String connectFrame = "STOMP\n" + + "login: system\n" + + "passcode: manager\n" + + "accept-version:1.1\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(message); + + String subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n" + "ack:client\n\n" + Stomp.NULL; + stompConnection.sendFrame(subscribe); + + StompFrame received = stompConnection.receive(); + assertTrue(received.getAction().equals("MESSAGE")); + + String ack = "ACK\n" + "message-id:" + + received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(ack); + + StompFrame error = stompConnection.receive(); + assertTrue(error.getAction().equals("ERROR")); + + String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n\n" + Stomp.NULL; + stompConnection.sendFrame(unsub); + + String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + public void testQueueBrowerSubscription() throws Exception { + + final int MSG_COUNT = 10; + + String connectFrame = "STOMP\n" + + "login: system\n" + + "passcode: manager\n" + + "accept-version:1.1\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + + for(int i = 0; i < MSG_COUNT; ++i) { + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + + "receipt:0\n" + + "\n" + "Hello World {" + i + "}" + Stomp.NULL; + stompConnection.sendFrame(message); + StompFrame repsonse = stompConnection.receive(); + assertEquals("0", repsonse.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID)); + } + + String subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n" + "browser:true\n\n" + Stomp.NULL; + stompConnection.sendFrame(subscribe); + + for(int i = 0; i < MSG_COUNT; ++i) { + StompFrame message = stompConnection.receive(); + assertEquals(Stomp.Responses.MESSAGE, message.getAction()); + assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION)); + } + + // We should now get a browse done message + StompFrame browseDone = stompConnection.receive(); + LOG.debug("Browse Done: " + browseDone.toString()); + assertEquals(Stomp.Responses.MESSAGE, browseDone.getAction()); + assertEquals("12345", browseDone.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION)); + assertEquals("end", browseDone.getHeaders().get(Stomp.Headers.Message.BROWSER)); + assertTrue(browseDone.getHeaders().get(Stomp.Headers.Message.DESTINATION) != null); + + String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n\n" + Stomp.NULL; + stompConnection.sendFrame(unsub); + + Thread.sleep(2000); + + subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "id:12345\n\n" + Stomp.NULL; + stompConnection.sendFrame(subscribe); + + for(int i = 0; i < MSG_COUNT; ++i) { + StompFrame message = stompConnection.receive(); + assertEquals(Stomp.Responses.MESSAGE, message.getAction()); + assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION)); + } + + stompConnection.sendFrame(unsub); + + String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1157238&r1=1157237&r2=1157238&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Fri Aug 12 20:29:29 2011 @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.JMSException; @@ -36,6 +37,7 @@ import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.management.ObjectName; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerFactory; @@ -318,7 +320,7 @@ public class StompTest extends Combinati assertEquals("Hello World", message.getText()); assertEquals("getJMSPriority", 4, message.getJMSPriority()); } - + public void testReceipts() throws Exception { StompConnection receiver = new StompConnection(); @@ -449,7 +451,7 @@ public class StompTest extends Combinati public void testSendMultipleBytesMessages() throws Exception { - final int MSG_COUNT = 50; + final int MSG_COUNT = 50; String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java?rev=1157238&r1=1157237&r2=1157238&view=diff ============================================================================== --- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java (original) +++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java Fri Aug 12 20:29:29 2011 @@ -21,9 +21,9 @@ import java.security.cert.X509Certificat import org.apache.activemq.command.Command; import org.apache.activemq.transport.TransportSupport; -import org.apache.activemq.transport.stomp.LegacyFrameTranslator; import org.apache.activemq.transport.stomp.ProtocolConverter; import org.apache.activemq.transport.stomp.StompFrame; +import org.apache.activemq.transport.stomp.StompInactivityMonitor; import org.apache.activemq.transport.stomp.StompTransport; import org.apache.activemq.transport.stomp.StompWireFormat; import org.apache.activemq.util.ByteSequence; @@ -32,19 +32,19 @@ import org.apache.activemq.util.ServiceS import org.eclipse.jetty.websocket.WebSocket; /** - * + * * Implements web socket and mediates between servlet and the broker * */ class StompSocket extends TransportSupport implements WebSocket, StompTransport { Outbound outbound; - ProtocolConverter protocolConverter = new ProtocolConverter(this, new LegacyFrameTranslator(), null); + ProtocolConverter protocolConverter = new ProtocolConverter(this, null); StompWireFormat wireFormat = new StompWireFormat(); public void onConnect(Outbound outbound) { this.outbound=outbound; } - + public void onMessage(byte frame, byte[] data,int offset, int length) {} public void onMessage(byte frame, String data) { @@ -91,4 +91,14 @@ class StompSocket extends TransportSuppo public void sendToStomp(StompFrame command) throws IOException { outbound.sendMessage(WebSocket.SENTINEL_FRAME, command.format()); } + + @Override + public StompInactivityMonitor getInactivityMonitor() { + return null; + } + + @Override + public StompWireFormat getWireFormat() { + return this.wireFormat; + } }
