Author: jstrachan Date: Fri Jan 19 10:36:41 2007 New Revision: 497898 URL: http://svn.apache.org/viewvc?view=rev&rev=497898 Log: fix for AMQ-1134 so that stomp connections are cleared up by the broker if a stomp client is killed without disconnecting properly
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java?view=diff&rev=497898&r1=497897&r2=497898 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java Fri Jan 19 10:36:41 2007 @@ -38,4 +38,9 @@ transport = new StompTransportFilter(transport, new LegacyFrameTranslator()); return super.compositeConfigure(transport, format, options); } + + protected boolean isUseInactivityMonitor(Transport transport) { + // lets disable the inactivity monitor as stomp does not use keep alive packets + return false; + } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?view=diff&rev=497898&r1=497897&r2=497898 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Fri Jan 19 10:36:41 2007 @@ -356,6 +356,10 @@ } protected void doStop(ServiceStopper stopper) throws Exception { + if (log.isDebugEnabled()) { + log.debug("Stopping transport " + this); + } + // Closing the streams flush the sockets before closing.. if the socket // is hung.. then this hangs the close. // closeStreams(); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?view=diff&rev=497898&r1=497897&r2=497898 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Fri Jan 19 10:36:41 2007 @@ -87,7 +87,9 @@ transport = new TransportLogger(transport); } - transport = new InactivityMonitor(transport); + if (isUseInactivityMonitor(transport)) { + transport = new InactivityMonitor(transport); + } // Only need the WireFormatNegotiator if using openwire if( format instanceof OpenWireFormat ) { @@ -95,6 +97,13 @@ } return transport; + } + + /** + * Returns true if the inactivity monitor should be used on the transport + */ + protected boolean isUseInactivityMonitor(Transport transport) { + return true; } protected Transport createTransport(URI location,WireFormat wf) throws UnknownHostException,IOException{ Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java?view=diff&rev=497898&r1=497897&r2=497898 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java (original) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java Fri Jan 19 10:36:41 2007 @@ -20,6 +20,7 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.broker.util.UDPTraceBrokerPlugin; import org.apache.activemq.broker.view.ConnectionDotFilePlugin; import org.apache.activemq.broker.view.DestinationDotFilePlugin; @@ -52,8 +53,15 @@ // URI(brokerURI)); BrokerService broker = new BrokerService(); broker.setPersistent(false); + + // for running on Java 5 without mx4j + ManagementContext managementContext = broker.getManagementContext(); + managementContext.setFindTigerMbeanServer(true); + managementContext.setUseMBeanServer(true); + managementContext.setCreateConnector(false); + broker.setUseJmx(true); - broker.setPlugins(new BrokerPlugin[] { new ConnectionDotFilePlugin(), new UDPTraceBrokerPlugin() }); + //broker.setPlugins(new BrokerPlugin[] { new ConnectionDotFilePlugin(), new UDPTraceBrokerPlugin() }); broker.addConnector("tcp://localhost:61616"); broker.addConnector("stomp://localhost:61613"); broker.start(); Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?view=diff&rev=497898&r1=497897&r2=497898 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Fri Jan 19 10:36:41 2007 @@ -19,14 +19,13 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.*; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.transport.stomp.Stomp; import javax.jms.*; - +import javax.jms.Connection; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -64,8 +63,6 @@ session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); queue = new ActiveMQQueue(getQueueName()); connection.start(); - - } protected Socket createSocket(URI connectUri) throws IOException { @@ -78,7 +75,9 @@ protected void tearDown() throws Exception { connection.close(); - stompSocket.close(); + if (stompSocket != null) { + stompSocket.close(); + } broker.stop(); } @@ -679,6 +678,37 @@ TextMessage message = (TextMessage) consumer.receive(1000); assertNotNull(message); assertEquals("second message", message.getText().trim()); + } + + public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception { + assertClients(1); + String frame = + "CONNECT\n" + + "login: brianm\n" + + "passcode: wombats\n\n"+ + Stomp.NULL; + + sendFrame(frame); + + // This test case is currently failing + waitForFrameToTakeEffect(); + + assertClients(2); + + // now lets kill the socket + stompSocket.close(); + stompSocket = null; + + Thread.sleep(2000); + + assertClients(1); + } + + protected void assertClients(int expected) throws Exception { + org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients(); + int actual = clients.length; + + assertEquals("Number of clients", expected, actual); } protected void waitForFrameToTakeEffect() throws InterruptedException {