This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/master by this push: new 9b7bc81 QPIDJMS-509: send disposition when closing consumer inside onmessage after recover, or when recovering after closing consumer with unacked messages 9b7bc81 is described below commit 9b7bc819f68d4d41bed9f2a27887decfdc675ff7 Author: Robbie Gemmell <rob...@apache.org> AuthorDate: Wed Jul 15 17:38:25 2020 +0100 QPIDJMS-509: send disposition when closing consumer inside onmessage after recover, or when recovering after closing consumer with unacked messages --- .../qpid/jms/provider/amqp/AmqpConsumer.java | 16 +++-- .../jms/integration/ConsumerIntegrationTest.java | 69 +++++++++++++++++++++- .../jms/integration/SessionIntegrationTest.java | 44 ++++++++++++++ 3 files changed, 122 insertions(+), 7 deletions(-) diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 202a9e7..c5990e5 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -55,6 +55,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class); + private static final int INDIVIDUAL_ACKNOWLEDGE = 101; + protected final AmqpSession session; protected final int acknowledgementMode; protected AsyncResult stopRequest; @@ -86,7 +88,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } private void acknowledgeUndeliveredRecoveredMessages() { - if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) { + if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE + || acknowledgementMode == Session.AUTO_ACKNOWLEDGE + || acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE + || acknowledgementMode == INDIVIDUAL_ACKNOWLEDGE) { // Send dispositions for any messages which were previously delivered and // session recovered, but were then not delivered again afterwards. Delivery delivery = getEndpoint().head(); @@ -440,9 +445,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver envelope.getMessage().getFacade().getRedeliveryCount() + 1); envelope.setEnqueueFirst(true); envelope.setDelivered(false); - if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) { - envelope.setRecovered(true); - } + envelope.setRecovered(true); redispatchList.add(envelope); } @@ -458,6 +461,11 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver while (reverseIterator.hasPrevious()) { deliver(reverseIterator.previous()); } + + if(deferredClose) { + acknowledgeUndeliveredRecoveredMessages(); + tryCompleteDeferredClose(); + } } /** diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java index 9fcec81..5d1572b 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java @@ -87,6 +87,8 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { private static final Logger LOG = LoggerFactory.getLogger(ConsumerIntegrationTest.class); + private static final int INDIVIDUAL_ACKNOWLEDGE = 101; + private final IntegrationTestFixture testFixture = new IntegrationTestFixture(); @Test(timeout = 20000) @@ -1172,6 +1174,52 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { @Test(timeout=20000) public void testMessageListenerClosesItsConsumer() throws Exception { + doMessageListenerClosesItsConsumerTestImpl(false, false, Session.AUTO_ACKNOWLEDGE); + } + + @Test(timeout=20000) + public void testMessageListenerClosesItsConsumerAfterRecoverAutoAck() throws Exception { + doMessageListenerClosesItsConsumerTestImpl(true, false, Session.AUTO_ACKNOWLEDGE); + } + + @Test(timeout=20000) + public void testMessageListenerClosesItsConsumerAfterRecoverClientAck() throws Exception { + doMessageListenerClosesItsConsumerTestImpl(true, false, Session.CLIENT_ACKNOWLEDGE); + } + + @Test(timeout=20000) + public void testMessageListenerClosesItsConsumerAfterRecoverDupsOk() throws Exception { + doMessageListenerClosesItsConsumerTestImpl(true, false, Session.DUPS_OK_ACKNOWLEDGE); + } + + @Test(timeout=20000) + public void testMessageListenerClosesItsConsumerAfterRecoverIndividualAck() throws Exception { + doMessageListenerClosesItsConsumerTestImpl(true, false, INDIVIDUAL_ACKNOWLEDGE); + } + + @Test(timeout=20000) + public void testMessageListenerClosesItsConsumerBeforeRecoverAutoAck() throws Exception { + doMessageListenerClosesItsConsumerTestImpl(false, true, Session.AUTO_ACKNOWLEDGE); + } + + @Test(timeout=20000) + public void testMessageListenerClosesItsConsumerBeforeRecoverClientAck() throws Exception { + doMessageListenerClosesItsConsumerTestImpl(false, true, Session.CLIENT_ACKNOWLEDGE); + } + + @Test(timeout=20000) + public void testMessageListenerClosesItsConsumerBeforeRecoverDupsOk() throws Exception { + doMessageListenerClosesItsConsumerTestImpl(false, true, Session.DUPS_OK_ACKNOWLEDGE); + } + + @Test(timeout=20000) + public void testMessageListenerClosesItsConsumerBeforeRecoverIndividualAck() throws Exception { + doMessageListenerClosesItsConsumerTestImpl(false, true, INDIVIDUAL_ACKNOWLEDGE); + } + + private void doMessageListenerClosesItsConsumerTestImpl(boolean recoverAfterClose, boolean recoverBeforeClose, int ackMode) throws Exception { + assertFalse("Cant recover a transacted session", ackMode == Session.SESSION_TRANSACTED); + final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch exceptionListenerFired = new CountDownLatch(1); final AtomicReference<Throwable> error = new AtomicReference<>(); @@ -1190,7 +1238,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { testPeer.expectBegin(); - final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Session session = connection.createSession(false, ackMode); Queue destination = session.createQueue(getTestName()); connection.start(); @@ -1199,15 +1247,30 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { MessageConsumer consumer = session.createConsumer(destination); - testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH -1))); - testPeer.expectDisposition(true, new AcceptedMatcher()); + if(recoverBeforeClose) { + testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true))); + } else if(recoverAfterClose) { + testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH -1))); + testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true))); + } else { + testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH -1))); + testPeer.expectDisposition(true, new AcceptedMatcher()); + } testPeer.expectDetach(true, true, true); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message m) { try { + if(recoverBeforeClose) { + session.recover(); + } + consumer.close(); + + if(recoverAfterClose) { + session.recover(); + } } catch (Throwable t) { error.set(t); LOG.error("Unexpected error during close", t); diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index 0e19c07..4222a9a 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -2266,6 +2266,50 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testCloseConsumerWithUnackedClientAckMessagesThenRecoverSession() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + + Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE); + + int msgCount = 2; + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), msgCount, false, false, + Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(msgCount)), 1, false, true); + + Queue destination = session.createQueue(getTestName()); + MessageConsumer consumer = session.createConsumer(destination); + + TextMessage receivedTextMessage = null; + assertNotNull("Expected a message", receivedTextMessage = (TextMessage) consumer.receive(3000)); + assertEquals("Unexpected delivery number", 1, receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1); + assertNotNull("Expected a message", receivedTextMessage = (TextMessage) consumer.receive(3000)); + assertEquals("Unexpected delivery number", 2, receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1); + + testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH - msgCount))); + + consumer.close(); + + testPeer.waitForAllHandlersToComplete(2000); + + testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), 1, 1); + testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), 2, 2); + testPeer.expectDetach(true, true, true); + + session.recover(); + + // Verify the expectations happen in response to the recover() and not the following close(). + testPeer.waitForAllHandlersToComplete(2000); + + testPeer.expectClose(); + connection.close(); + } + } + + @Test(timeout = 20000) public void testRecoveredClientAckSessionWithDurableSubscriber() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer, false, "?jms.clientID=myClientId", null, null, false); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org