Repository: qpid-jms Updated Branches: refs/heads/master 9d47d6ac4 -> b6beb0791
dont ack consumption until after onMessage completes when using auto-ack sessions Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b6beb079 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b6beb079 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b6beb079 Branch: refs/heads/master Commit: b6beb0791ea5a8dc5d317092497b52c7e87afa7c Parents: 1c080fb Author: Robert Gemmell <rob...@apache.org> Authored: Thu Jan 15 16:03:08 2015 +0000 Committer: Robert Gemmell <rob...@apache.org> Committed: Thu Jan 15 16:35:08 2015 +0000 ---------------------------------------------------------------------- .../org/apache/qpid/jms/JmsMessageConsumer.java | 88 +++++++++--------- .../java/org/apache/qpid/jms/JmsSession.java | 10 ++ .../qpid/jms/provider/amqp/AmqpConsumer.java | 4 +- .../qpid/jms/consumer/JmsAutoAckTest.java | 97 +++++++++++++++++++- 4 files changed, 151 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b6beb079/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java index 9a0bc32..bce5b44 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java @@ -196,7 +196,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC sendPullCommand(0); try { - return copy(ack(this.messageQueue.dequeue(-1))); + return copy(ackFromReceive(this.messageQueue.dequeue(-1))); } catch (Exception e) { throw JmsExceptionSupport.create(e); } @@ -216,7 +216,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC if (timeout > 0) { try { - return copy(ack(this.messageQueue.dequeue(timeout))); + return copy(ackFromReceive(this.messageQueue.dequeue(timeout))); } catch (InterruptedException e) { throw JmsExceptionSupport.create(e); } @@ -236,7 +236,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC checkMessageListener(); sendPullCommand(-1); - return copy(ack(this.messageQueue.dequeueNoWait())); + return copy(ackFromReceive(this.messageQueue.dequeueNoWait())); } protected void checkClosed() throws IllegalStateException { @@ -252,7 +252,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC return envelope.getMessage().copy(); } - JmsInboundMessageDispatch ack(final JmsInboundMessageDispatch envelope) throws JMSException { + JmsInboundMessageDispatch ackFromReceive(final JmsInboundMessageDispatch envelope) throws JMSException { if (envelope != null && envelope.getMessage() != null) { JmsMessage message = envelope.getMessage(); if (message.getAcknowledgeCallback() != null) { @@ -268,7 +268,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC return envelope; } - private void doAckConsumed(final JmsInboundMessageDispatch envelope) throws JMSException { + private JmsInboundMessageDispatch doAckConsumed(final JmsInboundMessageDispatch envelope) throws JMSException { checkClosed(); try { session.acknowledge(envelope, ACK_TYPE.CONSUMED, true); @@ -276,15 +276,17 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC session.onException(ex); throw ex; } + return envelope; } - private void doAckDelivered(final JmsInboundMessageDispatch envelope) throws JMSException { + private JmsInboundMessageDispatch doAckDelivered(final JmsInboundMessageDispatch envelope) throws JMSException { try { session.acknowledge(envelope, ACK_TYPE.DELIVERED, true); } catch (JMSException ex) { session.onException(ex); throw ex; } + return envelope; } private void doAckReleased(final JmsInboundMessageDispatch envelope) throws JMSException { @@ -325,29 +327,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC } if (this.messageListener != null && this.started) { - session.getExecutor().execute(new Runnable() { - @Override - public void run() { - JmsInboundMessageDispatch envelope; - while (session.isStarted() && (envelope = messageQueue.dequeueNoWait()) != null) { - try { - // TODO - We are currently acking early. We need to ack after onMessage - // with either a delivered or a consumed ack based on the session - // ack mode. We also need to check for the message having been - // acked by message.acknowledge() in onMessage so we don't do a - // delivered ack following a real ack in the case of client ack - // mode or a future individual ack mode. - messageListener.onMessage(copy(ack(envelope))); - } catch (Exception e) { - // TODO - We need to handle exception of on message with some other - // ack such as rejected and consider adding a redlivery policy - // to control when we might just poison the message with an ack - // of modified set to not deliverable here. - session.getConnection().onException(e); - } - } - } - }); + session.getExecutor().execute(new MessageDeliverTask()); } else { if (availableListener != null) { availableListener.onMessageAvailable(this); @@ -360,7 +340,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC try { this.started = true; this.messageQueue.start(); - drainMessageQueueToListener(); //TODO: this should be handed off to the executor. + drainMessageQueueToListener(); } finally { lock.unlock(); } @@ -385,8 +365,6 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC } public void suspendForRollback() throws JMSException { - // TODO: this isnt really sufficient if we are in onMessage and there - // are previously-scheduled delivery tasks remaining after the currently executing one stop(); session.getConnection().stopResource(consumerInfo); @@ -407,19 +385,8 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC } void drainMessageQueueToListener() { - MessageListener listener = this.messageListener; - if (listener != null) { - if (!this.messageQueue.isEmpty()) { - List<JmsInboundMessageDispatch> drain = this.messageQueue.removeAll(); - for (JmsInboundMessageDispatch envelope : drain) { - try { - listener.onMessage(copy(ack(envelope))); - } catch (Exception e) { - session.getConnection().onException(e); - } - } - drain.clear(); - } + if (this.messageListener != null && this.started) { + session.getExecutor().execute(new MessageDeliverTask()); } } @@ -573,4 +540,35 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC return prefetch; } + + private final class MessageDeliverTask implements Runnable { + @Override + public void run() { + JmsInboundMessageDispatch envelope; + while (session.isStarted() && (envelope = messageQueue.dequeueNoWait()) != null) { + try { + JmsMessage copy = null; + if(acknowledgementMode == Session.AUTO_ACKNOWLEDGE) { + copy = copy(doAckDelivered(envelope)); + } else { + copy = copy(ackFromReceive(envelope)); + } + session.clearSessionRecovered(); + + messageListener.onMessage(copy); + + if(acknowledgementMode == Session.AUTO_ACKNOWLEDGE && !session.isSessionRecovered()) { + doAckConsumed(envelope); + } + } catch (Exception e) { + // TODO - We need to handle exception of on message with some other + // ack such as rejected and consider adding a redlivery policy + // to control when we might just poison the message with an ack + // of modified set to not deliverable here. + session.getConnection().onException(e); + } + } + } + } + } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b6beb079/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 4efd45d..fa6ecc6 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -96,6 +96,7 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa private final AtomicLong consumerIdGenerator = new AtomicLong(); private final AtomicLong producerIdGenerator = new AtomicLong(); private JmsLocalTransactionContext transactionContext; + private boolean sessionRecovered; protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException { this.connection = connection; @@ -151,6 +152,7 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa } this.connection.recover(getSessionId()); + sessionRecovered = true; } @Override @@ -1012,4 +1014,12 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa public JmsLocalTransactionContext getTransactionContext() { return transactionContext; } + + boolean isSessionRecovered() { + return sessionRecovered; + } + + void clearSessionRecovered() { + sessionRecovered = false; + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b6beb079/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 8d153c6..d851630 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -271,8 +271,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } sendFlowIfNeeded(); } else if (ackType.equals(ACK_TYPE.CONSUMED)) { - // A Consumer may not always send a delivered ACK so we need to check to - // ensure we don't add to much credit to the link. + // A Consumer may not always send a DELIVERED ack so we need to + // check to ensure we don't add too much credit to the link. if (isPresettle() || delivered.remove(envelope) == null) { sendFlowIfNeeded(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b6beb079/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java index db2a104..e3f3faf 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java @@ -17,9 +17,14 @@ package org.apache.qpid.jms.consumer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -97,4 +102,94 @@ public class JmsAutoAckTest extends AmqpTestSupport { } })); } -} + + @Test(timeout = 600000) + public void testRecoverInOnMessage() throws Exception { + connection = createAmqpConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + + sendMessages(connection, queue, 2); + + CountDownLatch latch = new CountDownLatch(1); + AutoAckRecoverMsgListener listener = new AutoAckRecoverMsgListener(latch, session); + consumer.setMessageListener(listener); + + connection.start(); + + assertTrue("Timed out waiting for async listener", latch.await(500, TimeUnit.SECONDS)); + assertFalse("Test failed in listener, consult logs", listener.getFailed()); + } + + private static class AutoAckRecoverMsgListener implements MessageListener { + final Session session; + final CountDownLatch latch; + private boolean seenFirstMessage = false; + private boolean seenSecondMessage = false; + private boolean complete = false; + private boolean failed = false; + + public AutoAckRecoverMsgListener(CountDownLatch latch, Session session) { + this.latch = latch; + this.session = session; + } + + @Override + public void onMessage(Message message) { + try { + int msgNumProperty = message.getIntProperty(MESSAGE_NUMBER); + + if(complete ){ + LOG.info("Test already complete, ignoring delivered message: " + msgNumProperty); + } + + if (msgNumProperty == 1) { + if (!seenFirstMessage) { + LOG.info("Received first message."); + seenFirstMessage = true; + } else { + LOG.error("Received first message again."); + complete(true); + } + } else { + if (msgNumProperty != 2) { + LOG.error("Received unexpected message: " + msgNumProperty); + complete(true); + return; + } + + if(!seenSecondMessage){ + seenSecondMessage = true; + LOG.info("Received second message. Now calling recover()"); + session.recover(); + } else { + LOG.info("Received second message again as expected."); + if(message.getJMSRedelivered()) { + LOG.info("Message was marked redelivered."); + complete(false); + } else { + LOG.error("Message was not marked redelivered."); + complete(true); + } + } + } + } catch (JMSException e) { + LOG.error("Exception caught in listener", e); + complete(true); + } + } + + public boolean getFailed() { + return failed; + } + + private void complete(boolean fail) { + failed = fail; + complete = true; + latch.countDown(); + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org