This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b7bc81  QPIDJMS-509: send disposition when closing consumer inside 
onmessage after recover, or when recovering after closing consumer with unacked 
messages
9b7bc81 is described below

commit 9b7bc819f68d4d41bed9f2a27887decfdc675ff7
Author: Robbie Gemmell <rob...@apache.org>
AuthorDate: Wed Jul 15 17:38:25 2020 +0100

    QPIDJMS-509: send disposition when closing consumer inside onmessage after 
recover, or when recovering after closing consumer with unacked messages
---
 .../qpid/jms/provider/amqp/AmqpConsumer.java       | 16 +++--
 .../jms/integration/ConsumerIntegrationTest.java   | 69 +++++++++++++++++++++-
 .../jms/integration/SessionIntegrationTest.java    | 44 ++++++++++++++
 3 files changed, 122 insertions(+), 7 deletions(-)

diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 202a9e7..c5990e5 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -55,6 +55,8 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AmqpConsumer.class);
 
+    private static final int INDIVIDUAL_ACKNOWLEDGE = 101;
+
     protected final AmqpSession session;
     protected final int acknowledgementMode;
     protected AsyncResult stopRequest;
@@ -86,7 +88,10 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     private void acknowledgeUndeliveredRecoveredMessages() {
-        if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
+        if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE
+                || acknowledgementMode == Session.AUTO_ACKNOWLEDGE
+                    || acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE
+                        || acknowledgementMode == INDIVIDUAL_ACKNOWLEDGE) {
             // Send dispositions for any messages which were previously 
delivered and
             // session recovered, but were then not delivered again afterwards.
             Delivery delivery = getEndpoint().head();
@@ -440,9 +445,7 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
                     envelope.getMessage().getFacade().getRedeliveryCount() + 
1);
                 envelope.setEnqueueFirst(true);
                 envelope.setDelivered(false);
-                if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
-                    envelope.setRecovered(true);
-                }
+                envelope.setRecovered(true);
 
                 redispatchList.add(envelope);
             }
@@ -458,6 +461,11 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
         while (reverseIterator.hasPrevious()) {
             deliver(reverseIterator.previous());
         }
+
+        if(deferredClose) {
+            acknowledgeUndeliveredRecoveredMessages();
+            tryCompleteDeferredClose();
+        }
     }
 
     /**
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 9fcec81..5d1572b 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -87,6 +87,8 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ConsumerIntegrationTest.class);
 
+    private static final int INDIVIDUAL_ACKNOWLEDGE = 101;
+
     private final IntegrationTestFixture testFixture = new 
IntegrationTestFixture();
 
     @Test(timeout = 20000)
@@ -1172,6 +1174,52 @@ public class ConsumerIntegrationTest extends 
QpidJmsTestCase {
 
     @Test(timeout=20000)
     public void testMessageListenerClosesItsConsumer() throws Exception {
+        doMessageListenerClosesItsConsumerTestImpl(false, false, 
Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test(timeout=20000)
+    public void testMessageListenerClosesItsConsumerAfterRecoverAutoAck() 
throws Exception {
+        doMessageListenerClosesItsConsumerTestImpl(true, false, 
Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test(timeout=20000)
+    public void testMessageListenerClosesItsConsumerAfterRecoverClientAck() 
throws Exception {
+        doMessageListenerClosesItsConsumerTestImpl(true, false, 
Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    @Test(timeout=20000)
+    public void testMessageListenerClosesItsConsumerAfterRecoverDupsOk() 
throws Exception {
+        doMessageListenerClosesItsConsumerTestImpl(true, false, 
Session.DUPS_OK_ACKNOWLEDGE);
+    }
+
+    @Test(timeout=20000)
+    public void 
testMessageListenerClosesItsConsumerAfterRecoverIndividualAck() throws 
Exception {
+        doMessageListenerClosesItsConsumerTestImpl(true, false, 
INDIVIDUAL_ACKNOWLEDGE);
+    }
+
+    @Test(timeout=20000)
+    public void testMessageListenerClosesItsConsumerBeforeRecoverAutoAck() 
throws Exception {
+        doMessageListenerClosesItsConsumerTestImpl(false, true, 
Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test(timeout=20000)
+    public void testMessageListenerClosesItsConsumerBeforeRecoverClientAck() 
throws Exception {
+        doMessageListenerClosesItsConsumerTestImpl(false, true, 
Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    @Test(timeout=20000)
+    public void testMessageListenerClosesItsConsumerBeforeRecoverDupsOk() 
throws Exception {
+        doMessageListenerClosesItsConsumerTestImpl(false, true, 
Session.DUPS_OK_ACKNOWLEDGE);
+    }
+
+    @Test(timeout=20000)
+    public void 
testMessageListenerClosesItsConsumerBeforeRecoverIndividualAck() throws 
Exception {
+        doMessageListenerClosesItsConsumerTestImpl(false, true, 
INDIVIDUAL_ACKNOWLEDGE);
+    }
+
+    private void doMessageListenerClosesItsConsumerTestImpl(boolean 
recoverAfterClose, boolean recoverBeforeClose, int ackMode) throws Exception {
+        assertFalse("Cant recover a transacted session", ackMode == 
Session.SESSION_TRANSACTED);
+
         final CountDownLatch latch = new CountDownLatch(1);
         final CountDownLatch exceptionListenerFired = new CountDownLatch(1);
         final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -1190,7 +1238,7 @@ public class ConsumerIntegrationTest extends 
QpidJmsTestCase {
 
             testPeer.expectBegin();
 
-            final Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            final Session session = connection.createSession(false, ackMode);
             Queue destination = session.createQueue(getTestName());
             connection.start();
 
@@ -1199,15 +1247,30 @@ public class ConsumerIntegrationTest extends 
QpidJmsTestCase {
 
             MessageConsumer consumer = session.createConsumer(destination);
 
-            testPeer.expectLinkFlow(true, true, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH 
-1)));
-            testPeer.expectDisposition(true, new AcceptedMatcher());
+            if(recoverBeforeClose) {
+                testPeer.expectDisposition(true, new 
ModifiedMatcher().withDeliveryFailed(equalTo(true)));
+            } else if(recoverAfterClose) {
+                testPeer.expectLinkFlow(true, true, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH 
-1)));
+                testPeer.expectDisposition(true, new 
ModifiedMatcher().withDeliveryFailed(equalTo(true)));
+            } else {
+                testPeer.expectLinkFlow(true, true, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH 
-1)));
+                testPeer.expectDisposition(true, new AcceptedMatcher());
+            }
             testPeer.expectDetach(true, true, true);
 
             consumer.setMessageListener(new MessageListener() {
                 @Override
                 public void onMessage(Message m) {
                     try {
+                        if(recoverBeforeClose) {
+                            session.recover();
+                        }
+
                         consumer.close();
+
+                        if(recoverAfterClose) {
+                            session.recover();
+                        }
                     } catch (Throwable t) {
                         error.set(t);
                         LOG.error("Unexpected error during close", t);
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 0e19c07..4222a9a 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -2266,6 +2266,50 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void 
testCloseConsumerWithUnackedClientAckMessagesThenRecoverSession() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = 
connection.createSession(Session.CLIENT_ACKNOWLEDGE);
+
+            int msgCount = 2;
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), msgCount, false, false,
+                    
Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(msgCount)), 1, false, 
true);
+
+            Queue destination = session.createQueue(getTestName());
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            TextMessage receivedTextMessage = null;
+            assertNotNull("Expected a message", receivedTextMessage = 
(TextMessage) consumer.receive(3000));
+            assertEquals("Unexpected delivery number", 1,  
receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1);
+            assertNotNull("Expected a message", receivedTextMessage = 
(TextMessage) consumer.receive(3000));
+            assertEquals("Unexpected delivery number", 2,  
receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1);
+
+            testPeer.expectLinkFlow(true, true, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH 
- msgCount)));
+
+            consumer.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            testPeer.expectDisposition(true, new 
ModifiedMatcher().withDeliveryFailed(equalTo(true)), 1, 1);
+            testPeer.expectDisposition(true, new 
ModifiedMatcher().withDeliveryFailed(equalTo(true)), 2, 2);
+            testPeer.expectDetach(true, true, true);
+
+            session.recover();
+
+            // Verify the expectations happen in response to the recover() and 
not the following close().
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testRecoveredClientAckSessionWithDurableSubscriber() throws 
Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer, 
false, "?jms.clientID=myClientId", null, null, false);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to