Fixed AMQ-5160, restored previous DurableSubscription behaviour of only recovering messages when cursor is empty, retained messages are always recovered
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/42ad1039 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/42ad1039 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/42ad1039 Branch: refs/heads/trunk Commit: 42ad1039cb51b47a16f46f3d8f0fe8bf36ffdd1d Parents: 8644090 Author: Dhiraj Bokde <[email protected]> Authored: Tue May 13 13:15:04 2014 -0700 Committer: Dejan Bosanac <[email protected]> Committed: Mon May 26 11:07:19 2014 +0200 ---------------------------------------------------------------------- .../activemq/broker/region/DurableTopicSubscription.java | 5 +++++ .../policy/RetainedMessageSubscriptionRecoveryPolicy.java | 10 +++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/42ad1039/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index c82e6ef..e61a608 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -137,6 +137,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us dispatchPending(); } + // used by RetaineMessageSubscriptionRecoveryPolicy + public boolean isEmpty(Topic topic) { + return pending.isEmpty(topic); + } + public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception { if (!active.get()) { this.context = context; http://git-wip-us.apache.org/repos/asf/activemq/blob/42ad1039/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java index f1573fa..ea07c8b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.Topic; @@ -74,7 +75,14 @@ public class RetainedMessageSubscriptionRecoveryPolicy implements SubscriptionRe sub.addRecoveredMessage(context, retainedMessage); } if (wrapped != null) { - wrapped.recover(context, topic, sub); + // retain default ActiveMQ behaviour of recovering messages only for empty durable subscriptions + boolean recover = true; + if (sub instanceof DurableTopicSubscription && !((DurableTopicSubscription)sub).isEmpty(topic)) { + recover = false; + } + if (recover) { + wrapped.recover(context, topic, sub); + } } }
