Repository: qpid-jms Updated Branches: refs/heads/master 592969627 -> 3b416b282
https://issues.apache.org/jira/browse/QPIDJMS-97 https://issues.apache.org/jira/browse/QPIDJMS-92 Fix issue of stuck consumer during redlivery policy enforcement, also fix some other issues related to pull consumer. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/3b416b28 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/3b416b28 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/3b416b28 Branch: refs/heads/master Commit: 3b416b28289c1d0eaf8b4fbe2f6923b2f46292dc Parents: 5929696 Author: Timothy Bish <[email protected]> Authored: Fri Aug 21 18:32:02 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri Aug 21 18:32:02 2015 -0400 ---------------------------------------------------------------------- .../org/apache/qpid/jms/JmsMessageConsumer.java | 59 +++++++-------- .../jms/message/JmsInboundMessageDispatch.java | 10 +++ .../qpid/jms/provider/amqp/AmqpConsumer.java | 14 +--- .../qpid/jms/provider/amqp/AmqpProvider.java | 8 ++- .../jms/integration/SessionIntegrationTest.java | 3 +- .../qpid/jms/consumer/JmsZeroPrefetchTest.java | 36 ++++++++++ .../JmsTransactionRedeliveryPolicyTest.java | 75 +++++++++++++++++++- 7 files changed, 162 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b416b28/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java index b3ae7f3..f3f7740 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java @@ -226,16 +226,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC timeout = -1; } - sendPullCommand(timeout); - - JmsInboundMessageDispatch envelope = null; - if (isPullConsumer()) { - envelope = dequeue(-1); // Let server tell us if empty. - } else { - envelope = dequeue(timeout); // Check local prefetch only. - } - - return copy(ackFromReceive(envelope)); + return copy(ackFromReceive(dequeue(timeout))); } /** @@ -247,16 +238,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC public Message receiveNoWait() throws JMSException { checkClosed(); checkMessageListener(); - sendPullCommand(0); - - JmsInboundMessageDispatch envelope = null; - if (isPullConsumer()) { - envelope = dequeue(-1); // Let server tell us if empty. - } else { - envelope = dequeue(0); // Check local prefetch only. - } - - return copy(ackFromReceive(envelope)); + return copy(ackFromReceive(dequeue(0))); } /** @@ -281,8 +263,16 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC deadline = System.currentTimeMillis() + timeout; } + sendPullCommand(timeout); + while (true) { - JmsInboundMessageDispatch envelope = messageQueue.dequeue(timeout); + JmsInboundMessageDispatch envelope = null; + if (isPullConsumer()) { + envelope = messageQueue.dequeue(-1); + } else { + envelope = messageQueue.dequeue(timeout); + } + if (envelope == null) { if (timeout > 0 && !messageQueue.isClosed()) { timeout = Math.max(deadline - System.currentTimeMillis(), 0); @@ -305,9 +295,8 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC } sendPullCommand(timeout); } else if (redeliveryExceeded(envelope)) { - LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), envelope); - // TODO - Future - // Reject this delivery as not deliverable here + LOG.debug("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope); + doAckUndeliverable(envelope); if (timeout > 0) { timeout = Math.max(deadline - System.currentTimeMillis(), 0); } @@ -334,10 +323,12 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC } protected boolean redeliveryExceeded(JmsInboundMessageDispatch envelope) { - // TODO - Future - // Check for message that have been redelivered to see if they exceed - // some set maximum redelivery count - return false; + LOG.info("checking envelope with {} redeliveries", envelope.getRedeliveryCount()); + + JmsRedeliveryPolicy redeliveryPolicy = consumerInfo.getRedeliveryPolicy(); + return redeliveryPolicy != null && + redeliveryPolicy.getMaxRedeliveries() != JmsRedeliveryPolicy.DEFAULT_MAX_REDELIVERIES && + redeliveryPolicy.getMaxRedeliveries() < envelope.getRedeliveryCount(); } protected void checkClosed() throws IllegalStateException { @@ -409,6 +400,15 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC } } + private void doAckUndeliverable(final JmsInboundMessageDispatch envelope) throws JMSException { + try { + session.acknowledge(envelope, ACK_TYPE.POISONED); + } catch (JMSException ex) { + session.onException(ex); + throw ex; + } + } + private void doAckReleased(final JmsInboundMessageDispatch envelope) throws JMSException { try { session.acknowledge(envelope, ACK_TYPE.RELEASED); @@ -690,6 +690,9 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC if (consumeExpiredMessage(envelope)) { LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope); doAckExpired(envelope); + } else if (redeliveryExceeded(envelope)) { + LOG.trace("{} filtered message with excessive redlivery count: {}", getConsumerId(), envelope); + doAckUndeliverable(envelope); } else { boolean autoAckOrDupsOk = acknowledgementMode == Session.AUTO_ACKNOWLEDGE || acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b416b28/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java index 817c3c1..0f0f1ee 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java @@ -62,6 +62,16 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId { return enqueueFirst; } + public int getRedeliveryCount() { + int redeliveryCount = 0; + + if (message != null) { + redeliveryCount = message.getFacade().getRedeliveryCount(); + } + + return redeliveryCount; + } + @Override public String toString() { return "JmsInboundMessageDispatch {sequence = " + sequence http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b416b28/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 7710ec2..ec3a786 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -423,6 +423,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver public void run() { if (getEndpoint().getRemoteCredit() != 0) { getEndpoint().drain(0); + session.getProvider().pumpToProtonTransport(); } } }, timeout); @@ -479,17 +480,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver private void processDelivery(Delivery incoming) throws Exception { setDefaultDeliveryState(incoming, Released.getInstance()); Message amqpMessage = decodeIncomingMessage(incoming); - long deliveryCount = amqpMessage.getDeliveryCount(); - int maxRedeliveries = getJmsResource().getRedeliveryPolicy().getMaxRedeliveries(); - - if (maxRedeliveries >= 0 && deliveryCount > maxRedeliveries) { - LOG.trace("{} rejecting delivery that exceeds max redelivery count. {}", this, amqpMessage.getMessageId()); - deliveryFailed(incoming); - return; - } else { - getEndpoint().advance(); - } - JmsMessage message = null; try { message = AmqpJmsMessageBuilder.createJmsMessage(this, amqpMessage); @@ -504,6 +494,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver return; } + getEndpoint().advance(); + // Let the message do any final processing before sending it onto a consumer. // We could defer this to a later stage such as the JmsConnection or even in // the JmsMessageConsumer dispatch method if we needed to. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b416b28/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index ff36ab3..f6ef9ed 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -815,7 +815,11 @@ public class AmqpProvider implements Provider, TransportListener { } } - private boolean pumpToProtonTransport(AsyncResult request) { + protected boolean pumpToProtonTransport() { + return pumpToProtonTransport(NOOP_REQUEST); + } + + protected boolean pumpToProtonTransport(AsyncResult request) { try { boolean done = false; while (!done) { @@ -1087,7 +1091,7 @@ public class AmqpProvider implements Provider, TransportListener { long now = System.currentTimeMillis(); long deadline = protonTransport.tick(now); - boolean pumpSucceeded = pumpToProtonTransport(NOOP_REQUEST); + boolean pumpSucceeded = pumpToProtonTransport(); if (protonTransport.isClosed()) { LOG.info("IdleTimeoutCheck closed the transport due to the peer exceeding our requested idle-timeout."); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b416b28/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index e7e5bea..6b91559 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -927,7 +927,8 @@ public class SessionIntegrationTest extends QpidJmsTestCase { testPeer.expectDisposition(true, modified); } - session.createConsumer(queue); + MessageConsumer consumer = session.createConsumer(queue); + consumer.receive(100); testPeer.waitForAllHandlersToComplete(1000); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b416b28/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java index bfcdf87..a6c98e4 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java @@ -275,4 +275,40 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport { assertNotNull(answer); assertEquals("Should have received a message!", answer.getText(), "Msg1"); } + + @Test(timeout=60000) + public void testConsumerReceivePrefetchZeroRedeliveryZero() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + // push message to queue + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("test.prefetch.zero"); + MessageProducer producer = session.createProducer(queue); + TextMessage textMessage = session.createTextMessage("test Message"); + producer.send(textMessage); + session.close(); + + // consume and rollback - increase redelivery counter on message + session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(queue); + Message message = consumer.receive(2000); + assertNotNull(message); + session.rollback(); + session.close(); + + // Reconnect with zero prefetch and zero redeliveries allowed. + connection.close(); + connection = createAmqpConnection(); + ((JmsConnection)connection).getPrefetchPolicy().setAll(0); + ((JmsConnection)connection).getRedeliveryPolicy().setMaxRedeliveries(0); + connection.start(); + + // try consume with timeout - expect it to timeout and return NULL message + session = connection.createSession(true, Session.SESSION_TRANSACTED); + consumer = session.createConsumer(queue); + message = consumer.receive(1000); + + assertNull(message); + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b416b28/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java index 5bd4d1e..c5089f2 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java @@ -22,8 +22,13 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; @@ -44,7 +49,7 @@ public class JmsTransactionRedeliveryPolicyTest extends AmqpTestSupport { } @Test(timeout = 30000) - public void testConsumeAndRollbackWithMaxRedeliveries() throws Exception { + public void testSyncConsumeAndRollbackWithMaxRedeliveries() throws Exception { final int MAX_REDELIVERIES = 5; final int MSG_COUNT = 5; @@ -85,6 +90,8 @@ public class JmsTransactionRedeliveryPolicyTest extends AmqpTestSupport { LOG.info("Queue size after session rollback is: {}", queueView.getQueueSize()); } + assertNull(consumer.receive(50)); + assertTrue("Message should get DLQ'd", Wait.waitFor(new Wait.Condition() { @Override @@ -96,8 +103,74 @@ public class JmsTransactionRedeliveryPolicyTest extends AmqpTestSupport { QueueViewMBean dlq = getProxyToQueue("ActiveMQ.DLQ"); assertEquals(MSG_COUNT, dlq.getQueueSize()); + session.commit(); + } + + @Test(timeout = 30000) + public void testAsyncConsumeAndRollbackWithMaxRedeliveries() throws Exception { + final int MAX_REDELIVERIES = 5; + final int MSG_COUNT = 5; + + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(getDestinationName()); + MessageConsumer consumer = session.createConsumer(queue); + sendMessages(connection, queue, MSG_COUNT); + + final QueueViewMBean queueView = getProxyToQueue(getDestinationName()); + + // Consume the message for the first time. + Message incoming = null; + for (int i = 0; i < MSG_COUNT; ++i) { + incoming = consumer.receive(2000); + assertNotNull(incoming); + assertFalse(incoming.getJMSRedelivered()); + assertTrue(incoming instanceof TextMessage); + } + session.rollback(); + + for (int i = 0; i < MAX_REDELIVERIES; ++i) { + LOG.info("Queue size before consume is: {}", queueView.getQueueSize()); + assertEquals(MSG_COUNT, queueView.getQueueSize()); + + final CountDownLatch done = new CountDownLatch(MSG_COUNT); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + try { + assertTrue(message.getJMSRedelivered()); + assertTrue(message instanceof TextMessage); + + done.countDown(); + } catch (JMSException e) { + } + } + }); + + assertTrue("Not All Messages Received", done.await(10, TimeUnit.SECONDS)); + assertEquals(MSG_COUNT, queueView.getQueueSize()); + + consumer.setMessageListener(null); + session.rollback(); + LOG.info("Queue size after session rollback is: {}", queueView.getQueueSize()); + } + assertNull(consumer.receive(50)); + assertTrue("Message should get DLQ'd", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getQueueSize() == 0; + } + })); + + QueueViewMBean dlq = getProxyToQueue("ActiveMQ.DLQ"); + assertEquals(MSG_COUNT, dlq.getQueueSize()); + session.commit(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
