Repository: qpid-jms Updated Branches: refs/heads/master b48a83d87 -> 32fc7d20c
QPIDJMS-218: prevent CME during remote session closure with multiple consumers/producers, add tests (for local case also) Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/32fc7d20 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/32fc7d20 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/32fc7d20 Branch: refs/heads/master Commit: 32fc7d20cb0bc1627cb7f21438aa18715fb63e57 Parents: b48a83d Author: Robert Gemmell <[email protected]> Authored: Thu Nov 3 14:36:45 2016 +0000 Committer: Robert Gemmell <[email protected]> Committed: Thu Nov 3 14:36:45 2016 +0000 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpSession.java | 8 +- .../jms/integration/SessionIntegrationTest.java | 119 ++++++++++++++++--- 2 files changed, 109 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/32fc7d20/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java index c09e896..efe52d6 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java @@ -16,7 +16,9 @@ */ package org.apache.qpid.jms.provider.amqp; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -227,11 +229,13 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i @Override public void handleResourceClosure(AmqpProvider provider, Exception error) { - for (AmqpConsumer consumer : consumers.values()) { + List<AmqpConsumer> consumerList = new ArrayList<>(consumers.values()); + for (AmqpConsumer consumer : consumerList) { consumer.locallyClosed(provider, error); } - for (AmqpProducer producer : producers.values()) { + List<AmqpProducer> producerList = new ArrayList<>(producers.values()); + for (AmqpProducer producer : producerList) { producer.locallyClosed(provider, error); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/32fc7d20/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index bbb2ec7..2922b0f 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -1598,8 +1598,45 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testLocallyCloseSessionWithConsumersAndProducers() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create some consumers, don't give them any messages + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlow(); + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlow(); + + Queue queue = session.createQueue("myQueue"); + session.createConsumer(queue); + session.createConsumer(queue); + + // Create some producers + testPeer.expectSenderAttach(); + testPeer.expectSenderAttach(); + + session.createProducer(queue); + session.createProducer(queue); + + //Expect the session close + testPeer.expectEnd(); + + session.close(); + + testPeer.expectClose(); + connection.close(); + } + } + + @Test(timeout = 20000) @Repeat(repetitions = 1) - public void testRemotelyEndSessionWithProducer() throws Exception { + public void testRemotelyEndSessionWithProducers() throws Exception { final String BREAD_CRUMB = "ErrorMessage"; try (TestAmqpPeer testPeer = new TestAmqpPeer();) { @@ -1614,17 +1651,22 @@ public class SessionIntegrationTest extends QpidJmsTestCase { testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); - // Create a producer, then remotely end the session afterwards. + // Create a producer + testPeer.expectSenderAttach(); + final MessageProducer producer = session.createProducer(queue); + assertNotNull(producer); + + // Create a second producer, then remotely end the session afterwards. testPeer.expectSenderAttach(); testPeer.remotelyEndLastOpenedSession(true, 50, AmqpError.RESOURCE_DELETED, BREAD_CRUMB); - Queue queue = session.createQueue("myQueue"); - final MessageProducer producer = session.createProducer(queue); + final MessageProducer producer2 = session.createProducer(queue); testPeer.waitForAllHandlersToComplete(1000); - // Verify the producer gets marked closed + // Verify the producers get marked closed assertTrue("producer never closed.", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { @@ -1641,7 +1683,25 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } return false; } - }, 15000, 10)); + }, 6000, 10)); + + assertTrue("producer2 never closed.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + try { + producer2.getDestination(); + } catch (IllegalStateException jmsise) { + if (jmsise.getCause() != null) { + String message = jmsise.getCause().getMessage(); + return message.contains(AmqpError.RESOURCE_DELETED.toString()) && + message.contains(BREAD_CRUMB); + } else { + return false; + } + } + return false; + } + }, 6000, 10)); assertTrue("Session closed callback didn't trigger", sessionClosed.await(10, TimeUnit.SECONDS)); @@ -1655,9 +1715,10 @@ public class SessionIntegrationTest extends QpidJmsTestCase { assertTrue(message.contains(BREAD_CRUMB)); } - // Try closing it explicitly, should effectively no-op in client. - // The test peer will throw during close if it sends anything. + // Try closing producers explicitly, should effectively no-op in client. + // The test peer will throw during close if it sends anything unexpected. producer.close(); + producer2.close(); testPeer.expectClose(); connection.close(); @@ -1772,7 +1833,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) - public void testRemotelyEndSessionWithConsumer() throws Exception { + public void testRemotelyEndSessionWithConsumers() throws Exception { final String BREAD_CRUMB = "ErrorMessage"; try (TestAmqpPeer testPeer = new TestAmqpPeer();) { @@ -1787,16 +1848,23 @@ public class SessionIntegrationTest extends QpidJmsTestCase { testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); - // Create a consumer, then remotely end the session afterwards. + // Create a consumer testPeer.expectReceiverAttach(); testPeer.expectLinkFlow(); - testPeer.remotelyEndLastOpenedSession(true, 0, AmqpError.RESOURCE_DELETED, BREAD_CRUMB); - Queue queue = session.createQueue("myQueue"); final MessageConsumer consumer = session.createConsumer(queue); + assertNotNull(consumer); + + // Create a second consumer, then remotely end the session afterwards. + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlow(); + testPeer.remotelyEndLastOpenedSession(true, 0, AmqpError.RESOURCE_DELETED, BREAD_CRUMB); - // Verify the consumer gets marked closed + final MessageConsumer consumer2 = session.createConsumer(queue); + + // Verify the consumers get marked closed testPeer.waitForAllHandlersToComplete(1000); assertTrue("consumer never closed.", Wait.waitFor(new Wait.Condition() { @Override @@ -1814,7 +1882,25 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } return false; } - }, 15000, 10)); + }, 6000, 10)); + + assertTrue("consumer2 never closed.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + try { + consumer2.getMessageListener(); + } catch (IllegalStateException jmsise) { + if (jmsise.getCause() != null) { + String message = jmsise.getCause().getMessage(); + return message.contains(AmqpError.RESOURCE_DELETED.toString()) && + message.contains(BREAD_CRUMB); + } else { + return false; + } + } + return false; + } + }, 6000, 10)); assertTrue("Session closed callback didn't trigger", sessionClosed.await(10, TimeUnit.SECONDS)); @@ -1828,9 +1914,10 @@ public class SessionIntegrationTest extends QpidJmsTestCase { assertTrue(message.contains(BREAD_CRUMB)); } - // Try closing it explicitly, should effectively no-op in client. - // The test peer will throw during close if it sends anything. + // Try closing consumers explicitly, should effectively no-op in client. + // The test peer will throw during close if it sends anything unexpected. consumer.close(); + consumer2.close(); testPeer.expectClose(); connection.close(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
