Author: kwall
Date: Thu Oct 29 22:58:48 2015
New Revision: 1711375

URL: http://svn.apache.org/viewvc?rev=1711375&view=rev
Log:
QPID-6780: [Java Client 0-10] Ensure that sessions, producers, consumers and 
associated threads are closed when connection is closed by peer or dropped

Modified:
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1711375&r1=1711374&r2=1711375&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
 Thu Oct 29 22:58:48 2015
@@ -365,6 +365,10 @@ public class AMQConnectionDelegate_0_10
 
         }
 
+        for(AMQSession<?,?> session :  _conn.getSessions().values())
+        {
+            session.markClosed();
+        }
 
         _conn.setClosed();
 

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1711375&r1=1711374&r2=1711375&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java 
(original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java 
Thu Oct 29 22:58:48 2015
@@ -2225,9 +2225,7 @@ public abstract class AMQSession<C exten
 
     /**
      * Called to mark the session as being closed. Useful when the session 
needs to be made invalid, e.g. after failover
-     * when the client has veoted resubscription.
-     * <p>
-     * The caller of this method must already hold the failover mutex.
+     * when the client has vetoed resubscription.
      */
     void markClosed()
     {

Modified: 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java?rev=1711375&r1=1711374&r2=1711375&view=diff
==============================================================================
--- 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
 (original)
+++ 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
 Thu Oct 29 22:58:48 2015
@@ -27,10 +27,13 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
-import javax.naming.NamingException;
+import javax.jms.Queue;
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.client.BasicMessageProducer;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 import org.apache.qpid.transport.ConnectionException;
 
@@ -49,6 +52,8 @@ public class BrokerClosesClientConnectio
     private boolean _isExternalBroker;
     private final RecordingExceptionListener _recordingExceptionListener = new 
RecordingExceptionListener();
     private Session _session;
+    private MessageConsumer _consumer;
+    private MessageProducer _producer;
 
     @Override
     protected void setUp() throws Exception
@@ -56,8 +61,12 @@ public class BrokerClosesClientConnectio
         super.setUp();
 
         _connection = getConnection();
+        _connection.start();
         _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         _connection.setExceptionListener(_recordingExceptionListener);
+        Queue queue = _session.createTemporaryQueue();
+        _consumer = _session.createConsumer(queue);
+        _producer = _session.createProducer(queue);
 
         _isExternalBroker = isExternalBroker();
     }
@@ -66,13 +75,13 @@ public class BrokerClosesClientConnectio
     {
         final Class<? extends Exception> expectedLinkedException = 
isBroker010() ? ConnectionException.class : AMQConnectionClosedException.class;
 
-        assertConnectionOpen();
+        assertJmsObjectsOpen();
 
         stopBroker();
 
         JMSException exception = 
_recordingExceptionListener.awaitException(10000);
         assertConnectionCloseWasReported(exception, expectedLinkedException);
-        assertConnectionClosed();
+        assertJmsObjectsClosed();
 
         ensureCanCloseWithoutException();
     }
@@ -86,13 +95,13 @@ public class BrokerClosesClientConnectio
             return;
         }
 
-        assertConnectionOpen();
+        assertJmsObjectsOpen();
 
         killBroker();
 
         JMSException exception = 
_recordingExceptionListener.awaitException(10000);
         assertConnectionCloseWasReported(exception, expectedLinkedException);
-        assertConnectionClosed();
+        assertJmsObjectsClosed();
 
         ensureCanCloseWithoutException();
     }
@@ -117,14 +126,20 @@ public class BrokerClosesClientConnectio
         assertEquals("Unexpected linked exception", linkedExceptionClass, 
exception.getLinkedException().getClass());
     }
 
-    private void assertConnectionClosed()
+    private void assertJmsObjectsClosed()
     {
-        assertTrue("Connection should be marked as closed", 
((AMQConnection)_connection).isClosed());
+        assertTrue("Connection should be marked as closed", ((AMQConnection) 
_connection).isClosed());
+        assertTrue("Session should be marked as closed", ((AMQSession) 
_session).isClosed());
+        assertTrue("Producer should be marked as closed", 
((BasicMessageProducer) _producer).isClosed());
+        assertTrue("Consumer should be marked as closed", 
((BasicMessageConsumer) _consumer).isClosed());
     }
 
-    private void assertConnectionOpen()
+    private void assertJmsObjectsOpen()
     {
-        assertFalse("Connection should not be marked as closed", 
((AMQConnection)_connection).isClosed());
+        assertFalse("Connection should not be marked as closed", 
((AMQConnection) _connection).isClosed());
+        assertFalse("Session should not be marked as closed", ((AMQSession) 
_session).isClosed());
+        assertFalse("Producer should not be marked as closed", 
((BasicMessageProducer) _producer).isClosed());
+        assertFalse("Consumer should not be marked as closed", 
((BasicMessageConsumer) _consumer).isClosed());
     }
 
     private final class RecordingExceptionListener implements ExceptionListener
@@ -162,7 +177,7 @@ public class BrokerClosesClientConnectio
         }
     }
 
-    public void testNoDeliveryAfterBrokerClose() throws JMSException, 
NamingException, InterruptedException
+    public void testNoDeliveryAfterBrokerClose() throws Exception
     {
 
         Listener listener = new Listener();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to