Author: kwall Date: Thu Oct 29 22:58:48 2015 New Revision: 1711375 URL: http://svn.apache.org/viewvc?rev=1711375&view=rev Log: QPID-6780: [Java Client 0-10] Ensure that sessions, producers, consumers and associated threads are closed when connection is closed by peer or dropped
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1711375&r1=1711374&r2=1711375&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Thu Oct 29 22:58:48 2015 @@ -365,6 +365,10 @@ public class AMQConnectionDelegate_0_10 } + for(AMQSession<?,?> session : _conn.getSessions().values()) + { + session.markClosed(); + } _conn.setClosed(); Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1711375&r1=1711374&r2=1711375&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Oct 29 22:58:48 2015 @@ -2225,9 +2225,7 @@ public abstract class AMQSession<C exten /** * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover - * when the client has veoted resubscription. - * <p> - * The caller of this method must already hold the failover mutex. + * when the client has vetoed resubscription. */ void markClosed() { Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java?rev=1711375&r1=1711374&r2=1711375&view=diff ============================================================================== --- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java (original) +++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java Thu Oct 29 22:58:48 2015 @@ -27,10 +27,13 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; -import javax.naming.NamingException; +import javax.jms.Queue; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.BasicMessageConsumer; +import org.apache.qpid.client.BasicMessageProducer; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.transport.ConnectionException; @@ -49,6 +52,8 @@ public class BrokerClosesClientConnectio private boolean _isExternalBroker; private final RecordingExceptionListener _recordingExceptionListener = new RecordingExceptionListener(); private Session _session; + private MessageConsumer _consumer; + private MessageProducer _producer; @Override protected void setUp() throws Exception @@ -56,8 +61,12 @@ public class BrokerClosesClientConnectio super.setUp(); _connection = getConnection(); + _connection.start(); _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); _connection.setExceptionListener(_recordingExceptionListener); + Queue queue = _session.createTemporaryQueue(); + _consumer = _session.createConsumer(queue); + _producer = _session.createProducer(queue); _isExternalBroker = isExternalBroker(); } @@ -66,13 +75,13 @@ public class BrokerClosesClientConnectio { final Class<? extends Exception> expectedLinkedException = isBroker010() ? ConnectionException.class : AMQConnectionClosedException.class; - assertConnectionOpen(); + assertJmsObjectsOpen(); stopBroker(); JMSException exception = _recordingExceptionListener.awaitException(10000); assertConnectionCloseWasReported(exception, expectedLinkedException); - assertConnectionClosed(); + assertJmsObjectsClosed(); ensureCanCloseWithoutException(); } @@ -86,13 +95,13 @@ public class BrokerClosesClientConnectio return; } - assertConnectionOpen(); + assertJmsObjectsOpen(); killBroker(); JMSException exception = _recordingExceptionListener.awaitException(10000); assertConnectionCloseWasReported(exception, expectedLinkedException); - assertConnectionClosed(); + assertJmsObjectsClosed(); ensureCanCloseWithoutException(); } @@ -117,14 +126,20 @@ public class BrokerClosesClientConnectio assertEquals("Unexpected linked exception", linkedExceptionClass, exception.getLinkedException().getClass()); } - private void assertConnectionClosed() + private void assertJmsObjectsClosed() { - assertTrue("Connection should be marked as closed", ((AMQConnection)_connection).isClosed()); + assertTrue("Connection should be marked as closed", ((AMQConnection) _connection).isClosed()); + assertTrue("Session should be marked as closed", ((AMQSession) _session).isClosed()); + assertTrue("Producer should be marked as closed", ((BasicMessageProducer) _producer).isClosed()); + assertTrue("Consumer should be marked as closed", ((BasicMessageConsumer) _consumer).isClosed()); } - private void assertConnectionOpen() + private void assertJmsObjectsOpen() { - assertFalse("Connection should not be marked as closed", ((AMQConnection)_connection).isClosed()); + assertFalse("Connection should not be marked as closed", ((AMQConnection) _connection).isClosed()); + assertFalse("Session should not be marked as closed", ((AMQSession) _session).isClosed()); + assertFalse("Producer should not be marked as closed", ((BasicMessageProducer) _producer).isClosed()); + assertFalse("Consumer should not be marked as closed", ((BasicMessageConsumer) _consumer).isClosed()); } private final class RecordingExceptionListener implements ExceptionListener @@ -162,7 +177,7 @@ public class BrokerClosesClientConnectio } } - public void testNoDeliveryAfterBrokerClose() throws JMSException, NamingException, InterruptedException + public void testNoDeliveryAfterBrokerClose() throws Exception { Listener listener = new Listener(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org