Repository: activemq Updated Branches: refs/heads/trunk 7948d6905 -> dbb1d8b83
https://issues.apache.org/jira/browse/AMQ-5513 - have lastDeliveredSequence -1 indicate nothing was received, respect this value in queue sub remove and durable sub deactivate. 0 still constitutes no info - so old clients see same behaviour as before Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/dbb1d8b8 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/dbb1d8b8 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/dbb1d8b8 Branch: refs/heads/trunk Commit: dbb1d8b83d2ee5c5a38c55834a70cd21a2dd2ad0 Parents: 05f6cd6 Author: gtully <gary.tu...@gmail.com> Authored: Fri Jan 9 13:28:58 2015 +0000 Committer: gtully <gary.tu...@gmail.com> Committed: Fri Jan 9 13:30:59 2015 +0000 ---------------------------------------------------------------------- .../broker/region/DurableTopicSubscription.java | 14 +++--- .../apache/activemq/broker/region/Queue.java | 4 +- .../apache/activemq/broker/region/Topic.java | 2 +- .../activemq/broker/region/TopicRegion.java | 4 +- .../activemq/ActiveMQMessageConsumer.java | 2 +- .../org/apache/activemq/JmsRedeliveredTest.java | 47 ++++++++++++++++++++ 6 files changed, 61 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/dbb1d8b8/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 4c19c62..8df2819 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -184,7 +184,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } } - public void deactivate(boolean keepDurableSubsActive) throws Exception { + public void deactivate(boolean keepDurableSubsActive, long lastDeliveredSequenceId) throws Exception { LOG.debug("Deactivating keepActive={}, {}", keepDurableSubsActive, this); active.set(false); offlineTimestamp.set(System.currentTimeMillis()); @@ -214,11 +214,13 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us for (final MessageReference node : dispatched) { // Mark the dispatched messages as redelivered for next time. - Integer count = redeliveredMessages.get(node.getMessageId()); - if (count != null) { - redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1)); - } else { - redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1)); + if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 && node.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId)) { + Integer count = redeliveredMessages.get(node.getMessageId()); + if (count != null) { + redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1)); + } else { + redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1)); + } } if (keepDurableSubsActive && pending.isTransient()) { pending.addMessageFirst(node); http://git-wip-us.apache.org/repos/asf/activemq/blob/dbb1d8b8/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index f5f2efe..89b9081 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -547,12 +547,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index List<MessageReference> unAckedMessages = sub.remove(context, this); // locate last redelivered in unconsumed list (list in delivery rather than seq order) - if (lastDeiveredSequenceId != 0) { + if (lastDeiveredSequenceId > 0) { for (MessageReference ref : unAckedMessages) { if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) { lastDeliveredRef = ref; markAsRedelivered = true; - LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeiveredSequenceId, ref.getMessageId()); + LOG.error("found lastDeliveredSeqID: {}, message reference: {}", lastDeiveredSequenceId, ref.getMessageId()); break; } } http://git-wip-us.apache.org/repos/asf/activemq/blob/dbb1d8b8/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index cd144c3..eff9619 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -203,7 +203,7 @@ public class Topic extends BaseDestination implements Task { if (removed != null) { destinationStatistics.getConsumers().decrement(); // deactivate and remove - removed.deactivate(false); + removed.deactivate(false, 0l); consumers.remove(removed); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/dbb1d8b8/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java index cea5bb7..383f240 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java @@ -155,7 +155,7 @@ public class TopicRegion extends AbstractRegion { if ((sub.context != context) || (sub.info != info)) { sub.info = info; sub.context = context; - sub.deactivate(keepDurableSubsActive); + sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId()); } subscriptions.put(info.getConsumerId(), sub); } @@ -185,7 +185,7 @@ public class TopicRegion extends AbstractRegion { // as what is in the sub. otherwise, during linksteal // sub will get new context, but will be removed here if (sub.getContext() == context) - sub.deactivate(keepDurableSubsActive); + sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId()); } } else { super.removeConsumer(context, info); http://git-wip-us.apache.org/repos/asf/activemq/blob/dbb1d8b8/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index f43b56d..0808ead 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -147,7 +147,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0); private MessageAck pendingAck; - private long lastDeliveredSequenceId; + private long lastDeliveredSequenceId = -1; private IOException failureError; http://git-wip-us.apache.org/repos/asf/activemq/blob/dbb1d8b8/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java index 63c5911..72a1a28 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -400,6 +401,52 @@ public class JmsRedeliveredTest extends TestCase { session.close(); } + public void testNoReceiveConsumerDoesNotIncrementRedelivery() throws Exception { + connection.setClientID(getName()); + connection.start(); + + Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue("queue-" + getName()); + MessageConsumer consumer = session.createConsumer(queue); + + MessageProducer producer = createProducer(session, queue); + producer.send(createTextMessage(session)); + session.commit(); + + TimeUnit.SECONDS.sleep(1); + consumer.close(); + + consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + + assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + session.close(); + } + + public void testNoReceiveDurableConsumerDoesNotIncrementRedelivery() throws Exception { + connection.setClientID(getName()); + connection.start(); + + Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Topic topic = session.createTopic("topic-" + getName()); + MessageConsumer consumer = session.createDurableSubscriber(topic, "sub"); + + MessageProducer producer = createProducer(session, topic); + producer.send(createTextMessage(session)); + session.commit(); + + TimeUnit.SECONDS.sleep(1); + consumer.close(); + + consumer = session.createDurableSubscriber(topic, "sub"); + Message msg = consumer.receive(1000); + assertNotNull(msg); + + assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + session.close(); + } + /** * Creates a text message. *