Repository: qpid-jms Updated Branches: refs/heads/master 952de60ae -> 2a201375f
QPIDJMS-221 Ensure pending consumer requests are unblocked on close When the consumer becomes closed or it's parent session or connection does it should unblock any pending drain or stop requests. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/2a201375 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/2a201375 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/2a201375 Branch: refs/heads/master Commit: 2a201375ffeea39da43db811eb57bc293fb8d4b4 Parents: 952de60 Author: Timothy Bish <[email protected]> Authored: Fri Nov 11 17:30:27 2016 -0500 Committer: Timothy Bish <[email protected]> Committed: Fri Nov 11 17:30:27 2016 -0500 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpConsumer.java | 12 +++ .../ZeroPrefetchIntegrationTest.java | 85 ++++++++++++++++++++ 2 files changed, 97 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2a201375/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 30e1cef..02d9b47 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -572,6 +572,18 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver JmsConsumerInfo consumerInfo = getResourceInfo(); subTracker.consumerRemoved(consumerInfo); + + // When closed we need to release any pending tasks to avoid blocking + + if (stopRequest != null) { + stopRequest.onSuccess(); + stopRequest = null; + } + + if (pullRequest != null) { + pullRequest.onSuccess(); + pullRequest = null; + } } //----- Inner classes used in message pull operations --------------------// http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2a201375/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java index 29a1d46..24e133b 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java @@ -25,7 +25,10 @@ import static org.junit.Assert.assertTrue; import java.util.Date; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; import javax.jms.Message; @@ -89,6 +92,9 @@ public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase { assertTrue(m instanceof TextMessage); assertEquals("Unexpected message content", liveMsgContent, ((TextMessage) m).getText()); + testPeer.expectClose(); + connection.close(); + testPeer.waitForAllHandlersToComplete(3000); } } @@ -123,6 +129,9 @@ public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase { assertTrue(m instanceof TextMessage); assertEquals("Unexpected message content", msgContent, ((TextMessage) m).getText()); + testPeer.expectClose(); + connection.close(); + testPeer.waitForAllHandlersToComplete(3000); } } @@ -187,4 +196,80 @@ public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase { } } + @Test(timeout=40000) + public void testZeroPrefetchConsumerReceiveUnblockedOnSessionClose() throws Exception { + doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(0); + } + + @Test(timeout=40000) + public void testZeroPrefetchConsumerReceiveTimedUnblockedOnSessionClose() throws Exception { + doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(1); + } + + @Test(timeout=40000) + public void testZeroPrefetchConsumerReceiveNoWaitUnblockedOnSessionClose() throws Exception { + doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(-1); + } + + public void doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(final int timeout) throws Exception { + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Create a connection with zero prefetch + Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0"); + connection.start(); + + testPeer.expectBegin(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + // Expected the consumer to attach but NOT send credit + testPeer.expectReceiverAttach(); + + final MessageConsumer consumer = session.createConsumer(queue); + + // Expect that once receive is called, it drains with 1 credit, don't answer it + if (timeout < 0) { + testPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.ONE)); + } else { + testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.ONE)); + } + + final AtomicBoolean error = new AtomicBoolean(false); + final CountDownLatch done = new CountDownLatch(1); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.execute(new Runnable() { + + @Override + public void run() { + try { + if (timeout < 0) { + consumer.receiveNoWait(); + } else if (timeout == 0) { + consumer.receive(); + } else { + consumer.receive(10000); + } + } catch (Exception ex) { + error.set(true); + } finally { + done.countDown(); + } + } + }); + + testPeer.waitForAllHandlersToComplete(3000); + testPeer.expectEnd(); + testPeer.expectClose(); + + session.close(); + + assertTrue("Consumer did not unblock", done.await(10, TimeUnit.SECONDS)); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(3000); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
