Repository: qpid-jms
Updated Branches:
  refs/heads/master 9d47d6ac4 -> b6beb0791


dont ack consumption until after onMessage completes when using auto-ack 
sessions


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b6beb079
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b6beb079
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b6beb079

Branch: refs/heads/master
Commit: b6beb0791ea5a8dc5d317092497b52c7e87afa7c
Parents: 1c080fb
Author: Robert Gemmell <rob...@apache.org>
Authored: Thu Jan 15 16:03:08 2015 +0000
Committer: Robert Gemmell <rob...@apache.org>
Committed: Thu Jan 15 16:35:08 2015 +0000

----------------------------------------------------------------------
 .../org/apache/qpid/jms/JmsMessageConsumer.java | 88 +++++++++---------
 .../java/org/apache/qpid/jms/JmsSession.java    | 10 ++
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |  4 +-
 .../qpid/jms/consumer/JmsAutoAckTest.java       | 97 +++++++++++++++++++-
 4 files changed, 151 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b6beb079/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 9a0bc32..bce5b44 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -196,7 +196,7 @@ public class JmsMessageConsumer implements MessageConsumer, 
JmsMessageAvailableC
         sendPullCommand(0);
 
         try {
-            return copy(ack(this.messageQueue.dequeue(-1)));
+            return copy(ackFromReceive(this.messageQueue.dequeue(-1)));
         } catch (Exception e) {
             throw JmsExceptionSupport.create(e);
         }
@@ -216,7 +216,7 @@ public class JmsMessageConsumer implements MessageConsumer, 
JmsMessageAvailableC
 
         if (timeout > 0) {
             try {
-                return copy(ack(this.messageQueue.dequeue(timeout)));
+                return 
copy(ackFromReceive(this.messageQueue.dequeue(timeout)));
             } catch (InterruptedException e) {
                 throw JmsExceptionSupport.create(e);
             }
@@ -236,7 +236,7 @@ public class JmsMessageConsumer implements MessageConsumer, 
JmsMessageAvailableC
         checkMessageListener();
         sendPullCommand(-1);
 
-        return copy(ack(this.messageQueue.dequeueNoWait()));
+        return copy(ackFromReceive(this.messageQueue.dequeueNoWait()));
     }
 
     protected void checkClosed() throws IllegalStateException {
@@ -252,7 +252,7 @@ public class JmsMessageConsumer implements MessageConsumer, 
JmsMessageAvailableC
         return envelope.getMessage().copy();
     }
 
-    JmsInboundMessageDispatch ack(final JmsInboundMessageDispatch envelope) 
throws JMSException {
+    JmsInboundMessageDispatch ackFromReceive(final JmsInboundMessageDispatch 
envelope) throws JMSException {
         if (envelope != null && envelope.getMessage() != null) {
             JmsMessage message = envelope.getMessage();
             if (message.getAcknowledgeCallback() != null) {
@@ -268,7 +268,7 @@ public class JmsMessageConsumer implements MessageConsumer, 
JmsMessageAvailableC
         return envelope;
     }
 
-    private void doAckConsumed(final JmsInboundMessageDispatch envelope) 
throws JMSException {
+    private JmsInboundMessageDispatch doAckConsumed(final 
JmsInboundMessageDispatch envelope) throws JMSException {
         checkClosed();
         try {
             session.acknowledge(envelope, ACK_TYPE.CONSUMED, true);
@@ -276,15 +276,17 @@ public class JmsMessageConsumer implements 
MessageConsumer, JmsMessageAvailableC
             session.onException(ex);
             throw ex;
         }
+        return envelope;
     }
 
-    private void doAckDelivered(final JmsInboundMessageDispatch envelope) 
throws JMSException {
+    private JmsInboundMessageDispatch doAckDelivered(final 
JmsInboundMessageDispatch envelope) throws JMSException {
         try {
             session.acknowledge(envelope, ACK_TYPE.DELIVERED, true);
         } catch (JMSException ex) {
             session.onException(ex);
             throw ex;
         }
+        return envelope;
     }
 
     private void doAckReleased(final JmsInboundMessageDispatch envelope) 
throws JMSException {
@@ -325,29 +327,7 @@ public class JmsMessageConsumer implements 
MessageConsumer, JmsMessageAvailableC
         }
 
         if (this.messageListener != null && this.started) {
-            session.getExecutor().execute(new Runnable() {
-                @Override
-                public void run() {
-                    JmsInboundMessageDispatch envelope;
-                    while (session.isStarted() && (envelope = 
messageQueue.dequeueNoWait()) != null) {
-                        try {
-                            // TODO - We are currently acking early.  We need 
to ack after onMessage
-                            //        with either a delivered or a consumed 
ack based on the session
-                            //        ack mode.  We also need to check for the 
message having been
-                            //        acked by message.acknowledge() in 
onMessage so we don't do a
-                            //        delivered ack following a real ack in 
the case of client ack
-                            //        mode or a future individual ack mode.
-                            messageListener.onMessage(copy(ack(envelope)));
-                        } catch (Exception e) {
-                            // TODO - We need to handle exception of on 
message with some other
-                            //        ack such as rejected and consider adding 
a redlivery policy
-                            //        to control when we might just poison the 
message with an ack
-                            //        of modified set to not deliverable here.
-                            session.getConnection().onException(e);
-                        }
-                    }
-                }
-            });
+            session.getExecutor().execute(new MessageDeliverTask());
         } else {
             if (availableListener != null) {
                 availableListener.onMessageAvailable(this);
@@ -360,7 +340,7 @@ public class JmsMessageConsumer implements MessageConsumer, 
JmsMessageAvailableC
         try {
             this.started = true;
             this.messageQueue.start();
-            drainMessageQueueToListener(); //TODO: this should be handed off 
to the executor.
+            drainMessageQueueToListener();
         } finally {
             lock.unlock();
         }
@@ -385,8 +365,6 @@ public class JmsMessageConsumer implements MessageConsumer, 
JmsMessageAvailableC
     }
 
     public void suspendForRollback() throws JMSException {
-        // TODO: this isnt really sufficient if we are in onMessage and there
-        // are previously-scheduled delivery tasks remaining after the 
currently executing one
         stop();
 
         session.getConnection().stopResource(consumerInfo);
@@ -407,19 +385,8 @@ public class JmsMessageConsumer implements 
MessageConsumer, JmsMessageAvailableC
     }
 
     void drainMessageQueueToListener() {
-        MessageListener listener = this.messageListener;
-        if (listener != null) {
-            if (!this.messageQueue.isEmpty()) {
-                List<JmsInboundMessageDispatch> drain = 
this.messageQueue.removeAll();
-                for (JmsInboundMessageDispatch envelope : drain) {
-                    try {
-                        listener.onMessage(copy(ack(envelope)));
-                    } catch (Exception e) {
-                        session.getConnection().onException(e);
-                    }
-                }
-                drain.clear();
-            }
+        if (this.messageListener != null && this.started) {
+            session.getExecutor().execute(new MessageDeliverTask());
         }
     }
 
@@ -573,4 +540,35 @@ public class JmsMessageConsumer implements 
MessageConsumer, JmsMessageAvailableC
 
         return prefetch;
     }
+
+    private final class MessageDeliverTask implements Runnable {
+        @Override
+        public void run() {
+            JmsInboundMessageDispatch envelope;
+            while (session.isStarted() && (envelope = 
messageQueue.dequeueNoWait()) != null) {
+                try {
+                    JmsMessage copy = null;
+                    if(acknowledgementMode == Session.AUTO_ACKNOWLEDGE) {
+                        copy = copy(doAckDelivered(envelope));
+                    } else {
+                        copy = copy(ackFromReceive(envelope));
+                    }
+                    session.clearSessionRecovered();
+
+                    messageListener.onMessage(copy);
+
+                    if(acknowledgementMode == Session.AUTO_ACKNOWLEDGE && 
!session.isSessionRecovered()) {
+                        doAckConsumed(envelope);
+                    }
+                } catch (Exception e) {
+                    // TODO - We need to handle exception of on message with 
some other
+                    //        ack such as rejected and consider adding a 
redlivery policy
+                    //        to control when we might just poison the message 
with an ack
+                    //        of modified set to not deliverable here.
+                    session.getConnection().onException(e);
+                }
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b6beb079/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 4efd45d..fa6ecc6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -96,6 +96,7 @@ public class JmsSession implements Session, QueueSession, 
TopicSession, JmsMessa
     private final AtomicLong consumerIdGenerator = new AtomicLong();
     private final AtomicLong producerIdGenerator = new AtomicLong();
     private JmsLocalTransactionContext transactionContext;
+    private boolean sessionRecovered;
 
     protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int 
acknowledgementMode) throws JMSException {
         this.connection = connection;
@@ -151,6 +152,7 @@ public class JmsSession implements Session, QueueSession, 
TopicSession, JmsMessa
         }
 
         this.connection.recover(getSessionId());
+        sessionRecovered = true;
     }
 
     @Override
@@ -1012,4 +1014,12 @@ public class JmsSession implements Session, 
QueueSession, TopicSession, JmsMessa
     public JmsLocalTransactionContext getTransactionContext() {
         return transactionContext;
     }
+
+    boolean isSessionRecovered() {
+        return sessionRecovered;
+    }
+
+    void clearSessionRecovered() {
+        sessionRecovered = false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b6beb079/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 8d153c6..d851630 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -271,8 +271,8 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
             }
             sendFlowIfNeeded();
         } else if (ackType.equals(ACK_TYPE.CONSUMED)) {
-            // A Consumer may not always send a delivered ACK so we need to 
check to
-            // ensure we don't add to much credit to the link.
+            // A Consumer may not always send a DELIVERED ack so we need to
+            // check to ensure we don't add too much credit to the link.
             if (isPresettle() || delivered.remove(envelope) == null) {
                 sendFlowIfNeeded();
             }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b6beb079/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java
index db2a104..e3f3faf 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java
@@ -17,9 +17,14 @@
 package org.apache.qpid.jms.consumer;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -97,4 +102,94 @@ public class JmsAutoAckTest extends AmqpTestSupport {
             }
         }));
     }
-}
+
+    @Test(timeout = 600000)
+    public void testRecoverInOnMessage() throws Exception {
+        connection = createAmqpConnection();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        sendMessages(connection, queue, 2);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AutoAckRecoverMsgListener listener = new 
AutoAckRecoverMsgListener(latch, session);
+        consumer.setMessageListener(listener);
+
+        connection.start();
+
+        assertTrue("Timed out waiting for async listener", latch.await(500, 
TimeUnit.SECONDS));
+        assertFalse("Test failed in listener, consult logs", 
listener.getFailed());
+    }
+
+    private static class AutoAckRecoverMsgListener implements MessageListener {
+        final Session session;
+        final CountDownLatch latch;
+        private boolean seenFirstMessage = false;
+        private boolean seenSecondMessage = false;
+        private boolean complete = false;
+        private boolean failed = false;
+
+        public AutoAckRecoverMsgListener(CountDownLatch latch, Session 
session) {
+            this.latch = latch;
+            this.session = session;
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                int msgNumProperty = message.getIntProperty(MESSAGE_NUMBER);
+
+                if(complete ){
+                    LOG.info("Test already complete, ignoring delivered 
message: " + msgNumProperty);
+                }
+
+                if (msgNumProperty == 1) {
+                    if (!seenFirstMessage) {
+                        LOG.info("Received first message.");
+                        seenFirstMessage = true;
+                    } else {
+                        LOG.error("Received first message again.");
+                        complete(true);
+                    }
+                } else {
+                    if (msgNumProperty != 2) {
+                        LOG.error("Received unexpected message: " + 
msgNumProperty);
+                        complete(true);
+                        return;
+                    }
+
+                    if(!seenSecondMessage){
+                        seenSecondMessage = true;
+                        LOG.info("Received second message. Now calling 
recover()");
+                        session.recover();
+                    } else {
+                        LOG.info("Received second message again as expected.");
+                        if(message.getJMSRedelivered()) {
+                            LOG.info("Message was marked redelivered.");
+                            complete(false);
+                        } else {
+                            LOG.error("Message was not marked redelivered.");
+                            complete(true);
+                        }
+                    }
+                }
+            } catch (JMSException e) {
+                LOG.error("Exception caught in listener", e);
+                complete(true);
+            }
+        }
+
+        public boolean getFailed() {
+            return failed;
+        }
+
+        private void complete(boolean fail) {
+            failed = fail;
+            complete = true;
+            latch.countDown();
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to