Repository: qpid-jms
Updated Branches:
  refs/heads/master 592969627 -> 3b416b282


https://issues.apache.org/jira/browse/QPIDJMS-97
https://issues.apache.org/jira/browse/QPIDJMS-92

Fix issue of stuck consumer during redlivery policy enforcement, also
fix some other issues related to pull consumer.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/3b416b28
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/3b416b28
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/3b416b28

Branch: refs/heads/master
Commit: 3b416b28289c1d0eaf8b4fbe2f6923b2f46292dc
Parents: 5929696
Author: Timothy Bish <[email protected]>
Authored: Fri Aug 21 18:32:02 2015 -0400
Committer: Timothy Bish <[email protected]>
Committed: Fri Aug 21 18:32:02 2015 -0400

----------------------------------------------------------------------
 .../org/apache/qpid/jms/JmsMessageConsumer.java | 59 +++++++--------
 .../jms/message/JmsInboundMessageDispatch.java  | 10 +++
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 14 +---
 .../qpid/jms/provider/amqp/AmqpProvider.java    |  8 ++-
 .../jms/integration/SessionIntegrationTest.java |  3 +-
 .../qpid/jms/consumer/JmsZeroPrefetchTest.java  | 36 ++++++++++
 .../JmsTransactionRedeliveryPolicyTest.java     | 75 +++++++++++++++++++-
 7 files changed, 162 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b416b28/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index b3ae7f3..f3f7740 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -226,16 +226,7 @@ public class JmsMessageConsumer implements 
MessageConsumer, JmsMessageAvailableC
             timeout = -1;
         }
 
-        sendPullCommand(timeout);
-
-        JmsInboundMessageDispatch envelope = null;
-        if (isPullConsumer()) {
-            envelope = dequeue(-1); // Let server tell us if empty.
-        } else {
-            envelope = dequeue(timeout); // Check local prefetch only.
-        }
-
-        return copy(ackFromReceive(envelope));
+        return copy(ackFromReceive(dequeue(timeout)));
     }
 
     /**
@@ -247,16 +238,7 @@ public class JmsMessageConsumer implements 
MessageConsumer, JmsMessageAvailableC
     public Message receiveNoWait() throws JMSException {
         checkClosed();
         checkMessageListener();
-        sendPullCommand(0);
-
-        JmsInboundMessageDispatch envelope = null;
-        if (isPullConsumer()) {
-            envelope = dequeue(-1); // Let server tell us if empty.
-        } else {
-            envelope = dequeue(0); // Check local prefetch only.
-        }
-
-        return copy(ackFromReceive(envelope));
+        return copy(ackFromReceive(dequeue(0)));
     }
 
     /**
@@ -281,8 +263,16 @@ public class JmsMessageConsumer implements 
MessageConsumer, JmsMessageAvailableC
                 deadline = System.currentTimeMillis() + timeout;
             }
 
+            sendPullCommand(timeout);
+
             while (true) {
-                JmsInboundMessageDispatch envelope = 
messageQueue.dequeue(timeout);
+                JmsInboundMessageDispatch envelope = null;
+                if (isPullConsumer()) {
+                    envelope = messageQueue.dequeue(-1);
+                } else {
+                    envelope = messageQueue.dequeue(timeout);
+                }
+
                 if (envelope == null) {
                     if (timeout > 0 && !messageQueue.isClosed()) {
                         timeout = Math.max(deadline - 
System.currentTimeMillis(), 0);
@@ -305,9 +295,8 @@ public class JmsMessageConsumer implements MessageConsumer, 
JmsMessageAvailableC
                     }
                     sendPullCommand(timeout);
                 } else if (redeliveryExceeded(envelope)) {
-                    LOG.debug("{} received with excessive redelivered: {}", 
getConsumerId(), envelope);
-                    // TODO - Future
-                    // Reject this delivery as not deliverable here
+                    LOG.debug("{} filtered message with excessive redelivery 
count: {}", getConsumerId(), envelope);
+                    doAckUndeliverable(envelope);
                     if (timeout > 0) {
                         timeout = Math.max(deadline - 
System.currentTimeMillis(), 0);
                     }
@@ -334,10 +323,12 @@ public class JmsMessageConsumer implements 
MessageConsumer, JmsMessageAvailableC
     }
 
     protected boolean redeliveryExceeded(JmsInboundMessageDispatch envelope) {
-        // TODO - Future
-        // Check for message that have been redelivered to see if they exceed
-        // some set maximum redelivery count
-        return false;
+        LOG.info("checking envelope with {} redeliveries", 
envelope.getRedeliveryCount());
+
+        JmsRedeliveryPolicy redeliveryPolicy = 
consumerInfo.getRedeliveryPolicy();
+        return redeliveryPolicy != null &&
+               redeliveryPolicy.getMaxRedeliveries() != 
JmsRedeliveryPolicy.DEFAULT_MAX_REDELIVERIES &&
+               redeliveryPolicy.getMaxRedeliveries() < 
envelope.getRedeliveryCount();
     }
 
     protected void checkClosed() throws IllegalStateException {
@@ -409,6 +400,15 @@ public class JmsMessageConsumer implements 
MessageConsumer, JmsMessageAvailableC
         }
     }
 
+    private void doAckUndeliverable(final JmsInboundMessageDispatch envelope) 
throws JMSException {
+        try {
+            session.acknowledge(envelope, ACK_TYPE.POISONED);
+        } catch (JMSException ex) {
+            session.onException(ex);
+            throw ex;
+        }
+    }
+
     private void doAckReleased(final JmsInboundMessageDispatch envelope) 
throws JMSException {
         try {
             session.acknowledge(envelope, ACK_TYPE.RELEASED);
@@ -690,6 +690,9 @@ public class JmsMessageConsumer implements MessageConsumer, 
JmsMessageAvailableC
                     if (consumeExpiredMessage(envelope)) {
                         LOG.trace("{} filtered expired message: {}", 
getConsumerId(), envelope);
                         doAckExpired(envelope);
+                    } else if (redeliveryExceeded(envelope)) {
+                        LOG.trace("{} filtered message with excessive 
redlivery count: {}", getConsumerId(), envelope);
+                        doAckUndeliverable(envelope);
                     } else {
                         boolean autoAckOrDupsOk = acknowledgementMode == 
Session.AUTO_ACKNOWLEDGE ||
                                                   acknowledgementMode == 
Session.DUPS_OK_ACKNOWLEDGE;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b416b28/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
index 817c3c1..0f0f1ee 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
@@ -62,6 +62,16 @@ public class JmsInboundMessageDispatch extends 
JmsAbstractResourceId {
         return enqueueFirst;
     }
 
+    public int getRedeliveryCount() {
+        int redeliveryCount = 0;
+
+        if (message != null) {
+            redeliveryCount = message.getFacade().getRedeliveryCount();
+        }
+
+        return redeliveryCount;
+    }
+
     @Override
     public String toString() {
         return "JmsInboundMessageDispatch {sequence = " + sequence

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b416b28/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 7710ec2..ec3a786 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -423,6 +423,7 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
                     public void run() {
                         if (getEndpoint().getRemoteCredit() != 0) {
                             getEndpoint().drain(0);
+                            session.getProvider().pumpToProtonTransport();
                         }
                     }
                 }, timeout);
@@ -479,17 +480,6 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
     private void processDelivery(Delivery incoming) throws Exception {
         setDefaultDeliveryState(incoming, Released.getInstance());
         Message amqpMessage = decodeIncomingMessage(incoming);
-        long deliveryCount = amqpMessage.getDeliveryCount();
-        int maxRedeliveries = 
getJmsResource().getRedeliveryPolicy().getMaxRedeliveries();
-
-        if (maxRedeliveries >= 0 && deliveryCount > maxRedeliveries) {
-            LOG.trace("{} rejecting delivery that exceeds max redelivery 
count. {}", this, amqpMessage.getMessageId());
-            deliveryFailed(incoming);
-            return;
-        } else {
-            getEndpoint().advance();
-        }
-
         JmsMessage message = null;
         try {
             message = AmqpJmsMessageBuilder.createJmsMessage(this, 
amqpMessage);
@@ -504,6 +494,8 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
             return;
         }
 
+        getEndpoint().advance();
+
         // Let the message do any final processing before sending it onto a 
consumer.
         // We could defer this to a later stage such as the JmsConnection or 
even in
         // the JmsMessageConsumer dispatch method if we needed to.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b416b28/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index ff36ab3..f6ef9ed 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -815,7 +815,11 @@ public class AmqpProvider implements Provider, 
TransportListener {
         }
     }
 
-    private boolean pumpToProtonTransport(AsyncResult request) {
+    protected boolean pumpToProtonTransport() {
+        return pumpToProtonTransport(NOOP_REQUEST);
+    }
+
+    protected boolean pumpToProtonTransport(AsyncResult request) {
         try {
             boolean done = false;
             while (!done) {
@@ -1087,7 +1091,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
                 long now = System.currentTimeMillis();
                 long deadline = protonTransport.tick(now);
 
-                boolean pumpSucceeded = pumpToProtonTransport(NOOP_REQUEST);
+                boolean pumpSucceeded = pumpToProtonTransport();
 
                 if (protonTransport.isClosed()) {
                     LOG.info("IdleTimeoutCheck closed the transport due to the 
peer exceeding our requested idle-timeout.");

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b416b28/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index e7e5bea..6b91559 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -927,7 +927,8 @@ public class SessionIntegrationTest extends QpidJmsTestCase 
{
                 testPeer.expectDisposition(true, modified);
             }
 
-            session.createConsumer(queue);
+            MessageConsumer consumer = session.createConsumer(queue);
+            consumer.receive(100);
 
             testPeer.waitForAllHandlersToComplete(1000);
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b416b28/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
index bfcdf87..a6c98e4 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
@@ -275,4 +275,40 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
         assertNotNull(answer);
         assertEquals("Should have received a message!", answer.getText(), 
"Msg1");
     }
+
+    @Test(timeout=60000)
+    public void testConsumerReceivePrefetchZeroRedeliveryZero() throws 
Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        // push message to queue
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test.prefetch.zero");
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage textMessage = session.createTextMessage("test Message");
+        producer.send(textMessage);
+        session.close();
+
+        // consume and rollback - increase redelivery counter on message
+        session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message message = consumer.receive(2000);
+        assertNotNull(message);
+        session.rollback();
+        session.close();
+
+        // Reconnect with zero prefetch and zero redeliveries allowed.
+        connection.close();
+        connection = createAmqpConnection();
+        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+        
((JmsConnection)connection).getRedeliveryPolicy().setMaxRedeliveries(0);
+        connection.start();
+
+        // try consume with timeout - expect it to timeout and return NULL 
message
+        session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        consumer = session.createConsumer(queue);
+        message = consumer.receive(1000);
+
+        assertNull(message);
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b416b28/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java
index 5bd4d1e..c5089f2 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java
@@ -22,8 +22,13 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
@@ -44,7 +49,7 @@ public class JmsTransactionRedeliveryPolicyTest extends 
AmqpTestSupport {
     }
 
     @Test(timeout = 30000)
-    public void testConsumeAndRollbackWithMaxRedeliveries() throws Exception {
+    public void testSyncConsumeAndRollbackWithMaxRedeliveries() throws 
Exception {
         final int MAX_REDELIVERIES = 5;
         final int MSG_COUNT = 5;
 
@@ -85,6 +90,8 @@ public class JmsTransactionRedeliveryPolicyTest extends 
AmqpTestSupport {
             LOG.info("Queue size after session rollback is: {}", 
queueView.getQueueSize());
         }
 
+        assertNull(consumer.receive(50));
+
         assertTrue("Message should get DLQ'd", Wait.waitFor(new 
Wait.Condition() {
 
             @Override
@@ -96,8 +103,74 @@ public class JmsTransactionRedeliveryPolicyTest extends 
AmqpTestSupport {
         QueueViewMBean dlq = getProxyToQueue("ActiveMQ.DLQ");
         assertEquals(MSG_COUNT, dlq.getQueueSize());
 
+        session.commit();
+    }
+
+    @Test(timeout = 30000)
+    public void testAsyncConsumeAndRollbackWithMaxRedeliveries() throws 
Exception {
+        final int MAX_REDELIVERIES = 5;
+        final int MSG_COUNT = 5;
+
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        Queue queue = session.createQueue(getDestinationName());
+        MessageConsumer consumer = session.createConsumer(queue);
+        sendMessages(connection, queue, MSG_COUNT);
+
+        final QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+
+        // Consume the message for the first time.
+        Message incoming = null;
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            incoming = consumer.receive(2000);
+            assertNotNull(incoming);
+            assertFalse(incoming.getJMSRedelivered());
+            assertTrue(incoming instanceof TextMessage);
+        }
+        session.rollback();
+
+        for (int i = 0; i < MAX_REDELIVERIES; ++i) {
+            LOG.info("Queue size before consume is: {}", 
queueView.getQueueSize());
+            assertEquals(MSG_COUNT, queueView.getQueueSize());
+
+            final CountDownLatch done = new CountDownLatch(MSG_COUNT);
+            consumer.setMessageListener(new MessageListener() {
+
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        assertTrue(message.getJMSRedelivered());
+                        assertTrue(message instanceof TextMessage);
+
+                        done.countDown();
+                    } catch (JMSException e) {
+                    }
+                }
+            });
+
+            assertTrue("Not All Messages Received", done.await(10, 
TimeUnit.SECONDS));
+            assertEquals(MSG_COUNT, queueView.getQueueSize());
+
+            consumer.setMessageListener(null);
+            session.rollback();
+            LOG.info("Queue size after session rollback is: {}", 
queueView.getQueueSize());
+        }
+
         assertNull(consumer.receive(50));
 
+        assertTrue("Message should get DLQ'd", Wait.waitFor(new 
Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getQueueSize() == 0;
+            }
+        }));
+
+        QueueViewMBean dlq = getProxyToQueue("ActiveMQ.DLQ");
+        assertEquals(MSG_COUNT, dlq.getQueueSize());
+
         session.commit();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to