Repository: activemq Updated Branches: refs/heads/master 9827427f4 -> 2b7bb6f81
https://issues.apache.org/jira/browse/AMQ-5837 Switching to a List to track dispatched messages in a TopicSubscription to be consistent with a PrefetchSubscription and to prevent an error in case acks come back out of order. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2b7bb6f8 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2b7bb6f8 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2b7bb6f8 Branch: refs/heads/master Commit: 2b7bb6f81b0ed0a399fd8e85ee88ac0f62fbd9c9 Parents: 9827427 Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Thu Aug 6 17:12:18 2015 +0000 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Thu Aug 6 17:14:09 2015 +0000 ---------------------------------------------------------------------- .../broker/region/TopicSubscription.java | 81 ++++++++++---------- 1 file changed, 42 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/2b7bb6f8/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 17a3137..b20d080 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -17,11 +17,9 @@ package org.apache.activemq.broker.region; import java.io.IOException; -import java.util.Comparator; -import java.util.Iterator; +import java.util.ArrayList; import java.util.LinkedList; -import java.util.NavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -76,19 +74,9 @@ public class TopicSubscription extends AbstractSubscription { protected boolean active = false; protected boolean discarding = false; - - /** - * This Map is used to keep track of messages that have been dispatched in sorted order to - * optimize message acknowledgement - */ - private NavigableMap<MessageId, MessageReference> dispatched = new ConcurrentSkipListMap<>( - new Comparator<MessageId>() { - @Override - public int compare(MessageId m1, MessageId m2) { - return m1 == null ? (m2 == null ? 0 : -1) : (m2 == null ? 1 - : Long.compare(m1.getBrokerSequenceId(), m2.getBrokerSequenceId())); - } - }); + //Used for inflight message size calculations + protected final Object dispatchLock = new Object(); + protected final List<MessageReference> dispatched = new ArrayList<MessageReference>(); public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { super(broker, context, info); @@ -267,11 +255,13 @@ public class TopicSubscription extends AbstractSubscription { MessageReference node = matched.next(); node.decrementReferenceCount(); if (node.getMessageId().equals(mdn.getMessageId())) { - matched.remove(); - getSubscriptionStatistics().getDispatched().increment(); - dispatched.put(node.getMessageId(), node); - getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); - node.decrementReferenceCount(); + synchronized(dispatchLock) { + matched.remove(); + getSubscriptionStatistics().getDispatched().increment(); + dispatched.add(node); + getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); + node.decrementReferenceCount(); + } break; } } @@ -403,23 +393,31 @@ public class TopicSubscription extends AbstractSubscription { } /** - * Update the inflight statistics on message ack. Since a message ack could be a range, - * we need to grab a subtree of the dispatched map to acknowledge messages. Finding the - * subMap is an O(log n) operation. + * Update the inflight statistics on message ack. * @param ack */ private void updateInflightMessageSizeOnAck(final MessageAck ack) { - if (ack.getFirstMessageId() != null) { - NavigableMap<MessageId, MessageReference> acked = dispatched - .subMap(ack.getFirstMessageId(), true, ack.getLastMessageId(), true); - Iterator<MessageId> i = acked.keySet().iterator(); - while (i.hasNext()) { - getSubscriptionStatistics().getInflightMessageSize().addSize(-acked.get(i.next()).getSize()); - i.remove(); + synchronized(dispatchLock) { + boolean inAckRange = false; + List<MessageReference> removeList = new ArrayList<MessageReference>(); + for (final MessageReference node : dispatched) { + MessageId messageId = node.getMessageId(); + if (ack.getFirstMessageId() == null + || ack.getFirstMessageId().equals(messageId)) { + inAckRange = true; + } + if (inAckRange) { + removeList.add(node); + if (ack.getLastMessageId().equals(messageId)) { + break; + } + } + } + + for (final MessageReference node : removeList) { + dispatched.remove(node); + getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); } - } else { - getSubscriptionStatistics().getInflightMessageSize().addSize(-dispatched.get(ack.getLastMessageId()).getSize()); - dispatched.remove(ack.getLastMessageId()); } } @@ -645,9 +643,12 @@ public class TopicSubscription extends AbstractSubscription { md.setConsumerId(info.getConsumerId()); if (node != null) { md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination()); - getSubscriptionStatistics().getDispatched().increment(); - dispatched.put(node.getMessageId(), node); - getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); + synchronized(dispatchLock) { + getSubscriptionStatistics().getDispatched().increment(); + dispatched.add(node); + getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); + } + // Keep track if this subscription is receiving messages from a single destination. if (singleDestination) { if (destination == null) { @@ -729,7 +730,9 @@ public class TopicSubscription extends AbstractSubscription { } } setSlowConsumer(false); - dispatched.clear(); + synchronized(dispatchLock) { + dispatched.clear(); + } } @Override
