Repository: qpid-jms
Updated Branches:
  refs/heads/master b48a83d87 -> 32fc7d20c


QPIDJMS-218: prevent CME during remote session closure with multiple 
consumers/producers, add tests (for local case also)


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/32fc7d20
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/32fc7d20
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/32fc7d20

Branch: refs/heads/master
Commit: 32fc7d20cb0bc1627cb7f21438aa18715fb63e57
Parents: b48a83d
Author: Robert Gemmell <[email protected]>
Authored: Thu Nov 3 14:36:45 2016 +0000
Committer: Robert Gemmell <[email protected]>
Committed: Thu Nov 3 14:36:45 2016 +0000

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpSession.java     |   8 +-
 .../jms/integration/SessionIntegrationTest.java | 119 ++++++++++++++++---
 2 files changed, 109 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/32fc7d20/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index c09e896..efe52d6 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -16,7 +16,9 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -227,11 +229,13 @@ public class AmqpSession extends 
AmqpAbstractResource<JmsSessionInfo, Session> i
 
     @Override
     public void handleResourceClosure(AmqpProvider provider, Exception error) {
-        for (AmqpConsumer consumer : consumers.values()) {
+        List<AmqpConsumer> consumerList = new ArrayList<>(consumers.values());
+        for (AmqpConsumer consumer : consumerList) {
             consumer.locallyClosed(provider, error);
         }
 
-        for (AmqpProducer producer : producers.values()) {
+        List<AmqpProducer> producerList = new ArrayList<>(producers.values());
+        for (AmqpProducer producer : producerList) {
             producer.locallyClosed(provider, error);
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/32fc7d20/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index bbb2ec7..2922b0f 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -1598,8 +1598,45 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testLocallyCloseSessionWithConsumersAndProducers() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            // Create some consumers, don't give them any messages
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+
+            Queue queue = session.createQueue("myQueue");
+            session.createConsumer(queue);
+            session.createConsumer(queue);
+
+            // Create some producers
+            testPeer.expectSenderAttach();
+            testPeer.expectSenderAttach();
+
+            session.createProducer(queue);
+            session.createProducer(queue);
+
+            //Expect the session close
+            testPeer.expectEnd();
+
+            session.close();
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    @Test(timeout = 20000)
     @Repeat(repetitions = 1)
-    public void testRemotelyEndSessionWithProducer() throws Exception {
+    public void testRemotelyEndSessionWithProducers() throws Exception {
         final String BREAD_CRUMB = "ErrorMessage";
 
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
@@ -1614,17 +1651,22 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
 
             testPeer.expectBegin();
             Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
 
-            // Create a producer, then remotely end the session afterwards.
+            // Create a producer
+            testPeer.expectSenderAttach();
+            final MessageProducer producer = session.createProducer(queue);
+            assertNotNull(producer);
+
+            // Create a second producer, then remotely end the session 
afterwards.
             testPeer.expectSenderAttach();
             testPeer.remotelyEndLastOpenedSession(true, 50, 
AmqpError.RESOURCE_DELETED, BREAD_CRUMB);
 
-            Queue queue = session.createQueue("myQueue");
-            final MessageProducer producer = session.createProducer(queue);
+            final MessageProducer producer2 = session.createProducer(queue);
 
             testPeer.waitForAllHandlersToComplete(1000);
 
-            // Verify the producer gets marked closed
+            // Verify the producers get marked closed
             assertTrue("producer never closed.", Wait.waitFor(new 
Wait.Condition() {
                 @Override
                 public boolean isSatisified() throws Exception {
@@ -1641,7 +1683,25 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
                     }
                     return false;
                 }
-            }, 15000, 10));
+            }, 6000, 10));
+
+            assertTrue("producer2 never closed.", Wait.waitFor(new 
Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    try {
+                        producer2.getDestination();
+                    } catch (IllegalStateException jmsise) {
+                        if (jmsise.getCause() != null) {
+                            String message = jmsise.getCause().getMessage();
+                            return 
message.contains(AmqpError.RESOURCE_DELETED.toString()) &&
+                                   message.contains(BREAD_CRUMB);
+                        } else {
+                            return false;
+                        }
+                    }
+                    return false;
+                }
+            }, 6000, 10));
 
             assertTrue("Session closed callback didn't trigger", 
sessionClosed.await(10, TimeUnit.SECONDS));
 
@@ -1655,9 +1715,10 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
                 assertTrue(message.contains(BREAD_CRUMB));
             }
 
-            // Try closing it explicitly, should effectively no-op in client.
-            // The test peer will throw during close if it sends anything.
+            // Try closing producers explicitly, should effectively no-op in 
client.
+            // The test peer will throw during close if it sends anything 
unexpected.
             producer.close();
+            producer2.close();
 
             testPeer.expectClose();
             connection.close();
@@ -1772,7 +1833,7 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
-    public void testRemotelyEndSessionWithConsumer() throws Exception {
+    public void testRemotelyEndSessionWithConsumers() throws Exception {
         final String BREAD_CRUMB = "ErrorMessage";
 
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
@@ -1787,16 +1848,23 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
 
             testPeer.expectBegin();
             Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
 
-            // Create a consumer, then remotely end the session afterwards.
+            // Create a consumer
             testPeer.expectReceiverAttach();
             testPeer.expectLinkFlow();
-            testPeer.remotelyEndLastOpenedSession(true, 0, 
AmqpError.RESOURCE_DELETED, BREAD_CRUMB);
 
-            Queue queue = session.createQueue("myQueue");
             final MessageConsumer consumer = session.createConsumer(queue);
+            assertNotNull(consumer);
+
+            // Create a second consumer, then remotely end the session 
afterwards.
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+            testPeer.remotelyEndLastOpenedSession(true, 0, 
AmqpError.RESOURCE_DELETED, BREAD_CRUMB);
 
-            // Verify the consumer gets marked closed
+            final MessageConsumer consumer2 = session.createConsumer(queue);
+
+            // Verify the consumers get marked closed
             testPeer.waitForAllHandlersToComplete(1000);
             assertTrue("consumer never closed.", Wait.waitFor(new 
Wait.Condition() {
                 @Override
@@ -1814,7 +1882,25 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
                     }
                     return false;
                 }
-            }, 15000, 10));
+            }, 6000, 10));
+
+            assertTrue("consumer2 never closed.", Wait.waitFor(new 
Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    try {
+                        consumer2.getMessageListener();
+                    } catch (IllegalStateException jmsise) {
+                        if (jmsise.getCause() != null) {
+                            String message = jmsise.getCause().getMessage();
+                            return 
message.contains(AmqpError.RESOURCE_DELETED.toString()) &&
+                                   message.contains(BREAD_CRUMB);
+                        } else {
+                            return false;
+                        }
+                    }
+                    return false;
+                }
+            }, 6000, 10));
 
             assertTrue("Session closed callback didn't trigger", 
sessionClosed.await(10, TimeUnit.SECONDS));
 
@@ -1828,9 +1914,10 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
                 assertTrue(message.contains(BREAD_CRUMB));
             }
 
-            // Try closing it explicitly, should effectively no-op in client.
-            // The test peer will throw during close if it sends anything.
+            // Try closing consumers explicitly, should effectively no-op in 
client.
+            // The test peer will throw during close if it sends anything 
unexpected.
             consumer.close();
+            consumer2.close();
 
             testPeer.expectClose();
             connection.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to