Repository: qpid-jms
Updated Branches:
  refs/heads/master 405341282 -> 6d1adfd22


QPIDJMS-244 Ensure async delivery allows acks before close

Ensure that the async message delivery is waited on when closing to
ensure that in auto ack mode the client acks the processed message
before the connection is closed.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/6d1adfd2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/6d1adfd2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/6d1adfd2

Branch: refs/heads/master
Commit: 6d1adfd227dcbe3993cdc753ed202c53ac2a2f77
Parents: 4053412
Author: Timothy Bish <[email protected]>
Authored: Fri Jan 20 16:46:04 2017 -0500
Committer: Timothy Bish <[email protected]>
Committed: Fri Jan 20 16:46:04 2017 -0500

----------------------------------------------------------------------
 .../org/apache/qpid/jms/JmsMessageConsumer.java | 170 ++++++++++++-------
 .../java/org/apache/qpid/jms/JmsSession.java    |   8 +
 .../integration/ConsumerIntegrationTest.java    | 142 ++++++++++++++++
 3 files changed, 256 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6d1adfd2/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index b06270f..ff19388 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -64,6 +64,7 @@ public class JmsMessageConsumer implements AutoCloseable, 
MessageConsumer, JmsMe
     protected volatile JmsMessageAvailableListener availableListener;
     protected final MessageQueue messageQueue;
     protected final Lock lock = new ReentrantLock();
+    protected final Lock dispatchLock = new ReentrantLock();
     protected final AtomicBoolean suspendedConnection = new AtomicBoolean();
     protected final AtomicReference<Throwable> failureCause = new 
AtomicReference<>();
     protected final MessageDeliverTask deliveryTask = new MessageDeliverTask();
@@ -428,7 +429,6 @@ public class JmsMessageConsumer implements AutoCloseable, 
MessageConsumer, JmsMe
     }
 
     private JmsInboundMessageDispatch doAckConsumed(final 
JmsInboundMessageDispatch envelope) throws JMSException {
-        checkClosed();
         try {
             session.acknowledge(envelope, ACK_TYPE.ACCEPTED);
         } catch (JMSException ex) {
@@ -518,8 +518,10 @@ public class JmsMessageConsumer implements AutoCloseable, 
MessageConsumer, JmsMe
     public void start() {
         lock.lock();
         try {
-            this.messageQueue.start();
-            drainMessageQueueToListener();
+            if (!messageQueue.isRunning()) {
+                this.messageQueue.start();
+                drainMessageQueueToListener();
+            }
         } finally {
             lock.unlock();
         }
@@ -530,6 +532,7 @@ public class JmsMessageConsumer implements AutoCloseable, 
MessageConsumer, JmsMe
     }
 
     private void stop(boolean closeMessageQueue) {
+        dispatchLock.lock();
         lock.lock();
         try {
             if (closeMessageQueue) {
@@ -539,6 +542,7 @@ public class JmsMessageConsumer implements AutoCloseable, 
MessageConsumer, JmsMe
             }
         } finally {
             lock.unlock();
+            dispatchLock.unlock();
         }
     }
 
@@ -562,12 +566,6 @@ public class JmsMessageConsumer implements AutoCloseable, 
MessageConsumer, JmsMe
         startConsumerResource();
     }
 
-    void drainMessageQueueToListener() {
-        if (messageListener != null && messageQueue.isRunning()) {
-            session.getDispatcherExecutor().execute(deliveryTask);
-        }
-    }
-
     /**
      * @return the id
      */
@@ -592,15 +590,19 @@ public class JmsMessageConsumer implements AutoCloseable, 
MessageConsumer, JmsMe
     public void setMessageListener(MessageListener listener) throws 
JMSException {
         checkClosed();
 
-        this.messageListener = listener;
-        if (listener != null) {
-            consumerInfo.setListener(true);
-            if (isPullConsumer()){
-                startConsumerResource();
+        dispatchLock.lock();
+        try {
+            messageListener = listener;
+            consumerInfo.setListener(listener != null);
+
+            if (listener != null) {
+                if (isPullConsumer()){
+                    startConsumerResource();
+                }
+                drainMessageQueueToListener();
             }
-            drainMessageQueueToListener();
-        } else {
-            consumerInfo.setListener(false);
+        } finally {
+            dispatchLock.unlock();
         }
     }
 
@@ -713,61 +715,101 @@ public class JmsMessageConsumer implements 
AutoCloseable, MessageConsumer, JmsMe
         return false;
     }
 
-    private final class MessageDeliverTask implements Runnable {
-        @Override
-        public void run() {
-            JmsInboundMessageDispatch envelope;
-            while (session.isStarted() && (envelope = 
messageQueue.dequeueNoWait()) != null) {
-                try {
-                    JmsMessage copy = null;
-
-                    if (consumeExpiredMessage(envelope)) {
-                        LOG.trace("{} filtered expired message: {}", 
getConsumerId(), envelope);
-                        doAckExpired(envelope);
-                    } else if (redeliveryExceeded(envelope)) {
-                        LOG.trace("{} filtered message with excessive 
redelivery count: {}", getConsumerId(), envelope);
-                        doAckUndeliverable(envelope);
+    private void drainMessageQueueToListener() {
+        if (messageListener != null && session.isStarted() && 
messageQueue.isRunning()) {
+            session.getDispatcherExecutor().execute(new 
BoundedMessageDeliverTask(messageQueue.size()));
+        }
+    }
+
+    private boolean deliverNextPending() {
+        if (session.isStarted() && messageQueue.isRunning() && messageListener 
!= null) {
+            dispatchLock.lock();
+            try {
+                JmsInboundMessageDispatch envelope = 
messageQueue.dequeueNoWait();
+                if (envelope == null) {
+                    return false;
+                }
+
+                JmsMessage copy = null;
+
+                if (consumeExpiredMessage(envelope)) {
+                    LOG.trace("{} filtered expired message: {}", 
getConsumerId(), envelope);
+                    doAckExpired(envelope);
+                } else if (redeliveryExceeded(envelope)) {
+                    LOG.trace("{} filtered message with excessive redelivery 
count: {}", getConsumerId(), envelope);
+                    doAckUndeliverable(envelope);
+                } else {
+                    boolean deliveryFailed = false;
+                    boolean autoAckOrDupsOk = acknowledgementMode == 
Session.AUTO_ACKNOWLEDGE ||
+                                              acknowledgementMode == 
Session.DUPS_OK_ACKNOWLEDGE;
+                    if (autoAckOrDupsOk) {
+                        copy = copy(doAckDelivered(envelope));
                     } else {
-                        boolean deliveryFailed = false;
-                        boolean autoAckOrDupsOk = acknowledgementMode == 
Session.AUTO_ACKNOWLEDGE ||
-                                                  acknowledgementMode == 
Session.DUPS_OK_ACKNOWLEDGE;
-                        if (autoAckOrDupsOk) {
-                            copy = copy(doAckDelivered(envelope));
-                        } else {
-                            copy = copy(ackFromReceive(envelope));
-                        }
-                        session.clearSessionRecovered();
+                        copy = copy(ackFromReceive(envelope));
+                    }
+                    session.clearSessionRecovered();
 
-                        try {
-                            messageListener.onMessage(copy);
-                        } catch (RuntimeException rte) {
-                            deliveryFailed = true;
-                        }
+                    try {
+                        messageListener.onMessage(copy);
+                    } catch (RuntimeException rte) {
+                        deliveryFailed = true;
+                    }
 
-                        if (autoAckOrDupsOk && !session.isSessionRecovered()) {
-                            if (!deliveryFailed) {
-                                doAckConsumed(envelope);
-                            } else {
-                                doAckReleased(envelope);
-                            }
+                    if (autoAckOrDupsOk && !session.isSessionRecovered()) {
+                        if (!deliveryFailed) {
+                            doAckConsumed(envelope);
+                        } else {
+                            doAckReleased(envelope);
                         }
                     }
-                } catch (Exception e) {
-                    // TODO - There are two cases where we can get an error 
here, one being
-                    //        and error returned from the attempted ACK that 
was sent and the
-                    //        other being an error while attempting to copy 
the incoming message.
-                    //        We need to decide how to respond to these.
-                    session.getConnection().onException(e);
-                } finally {
-                    if (isPullConsumer()) {
-                        try {
-                            startConsumerResource();
-                        } catch (JMSException e) {
-                            LOG.error("Exception during credit replenishment 
for consumer listener {}", getConsumerId(), e);
-                        }
+                }
+            } catch (Exception e) {
+                // TODO - There are two cases where we can get an error here, 
one being
+                //        and error returned from the attempted ACK that was 
sent and the
+                //        other being an error while attempting to copy the 
incoming message.
+                //        We need to decide how to respond to these.
+                session.getConnection().onException(e);
+            } finally {
+                dispatchLock.unlock();
+
+                if (isPullConsumer()) {
+                    try {
+                        startConsumerResource();
+                    } catch (JMSException e) {
+                        LOG.error("Exception during credit replenishment for 
consumer listener {}", getConsumerId(), e);
                     }
                 }
             }
         }
+
+        return !messageQueue.isEmpty();
+    }
+
+    private final class BoundedMessageDeliverTask implements Runnable {
+
+        private final int deliveryCount;
+
+        public BoundedMessageDeliverTask(int deliveryCount) {
+            this.deliveryCount = deliveryCount;
+        }
+
+        @Override
+        public void run() {
+            int current = 0;
+
+            while (session.isStarted() && messageQueue.isRunning() && 
current++ < deliveryCount) {
+                if (!deliverNextPending()) {
+                    return;  // Another task already drained the queue.
+                }
+            }
+        }
+    }
+
+    private final class MessageDeliverTask implements Runnable {
+
+        @Override
+        public void run() {
+            deliverNextPending();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6d1adfd2/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 49729ec..3b34674 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -185,8 +185,15 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
             throw new javax.jms.IllegalStateException("Cannot call recover() 
on a transacted session");
         }
 
+        boolean wasStarted = isStarted();
+        stop();
+
         connection.recover(getSessionId());
         sessionRecovered = true;
+
+        if (wasStarted) {
+            start();
+        }
     }
 
     @Override
@@ -1184,6 +1191,7 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
         return transactionContext;
     }
 
+
     boolean isSessionRecovered() {
         return sessionRecovered;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6d1adfd2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 59eec0a..d1cab10 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -1162,4 +1162,146 @@ public class ConsumerIntegrationTest extends 
QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(2000);
         }
     }
+
+    @Test(timeout=20000)
+    public void testConsumerCloseWaitsForAsyncDeliveryToComplete() throws 
Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            final Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+            connection.start();
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), 1);
+
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            testPeer.expectDisposition(true, new AcceptedMatcher());
+
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message m) {
+                    latch.countDown();
+
+                    LOG.debug("Async consumer got Message: {}", m);
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(100);
+                    } catch (InterruptedException e) {
+                    }
+                }
+            });
+
+            boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
+            assertTrue("Messages not received within given timeout. Count 
remaining: " + latch.getCount(), await);
+
+            testPeer.expectDetach(true, true, true);
+            consumer.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testSessionCloseWaitsForAsyncDeliveryToComplete() throws 
Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            final Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+            connection.start();
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), 1);
+
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            testPeer.expectDisposition(true, new AcceptedMatcher());
+
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message m) {
+                    latch.countDown();
+
+                    LOG.debug("Async consumer got Message: {}", m);
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(100);
+                    } catch (InterruptedException e) {
+                    }
+                }
+            });
+
+            boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
+            assertTrue("Messages not received within given timeout. Count 
remaining: " + latch.getCount(), await);
+
+            testPeer.expectEnd();
+            session.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testConnectionCloseWaitsForAsyncDeliveryToComplete() throws 
Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            final Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+            connection.start();
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), 1);
+
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            testPeer.expectDisposition(true, new AcceptedMatcher());
+
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message m) {
+                    latch.countDown();
+
+                    LOG.debug("Async consumer got Message: {}", m);
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(100);
+                    } catch (InterruptedException e) {
+                    }
+                }
+            });
+
+            boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
+            assertTrue("Messages not received within given timeout. Count 
remaining: " + latch.getCount(), await);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to