Repository: activemq
Updated Branches:
  refs/heads/master 9827427f4 -> 2b7bb6f81


https://issues.apache.org/jira/browse/AMQ-5837

Switching to a List to track dispatched messages in a TopicSubscription
to be consistent with a PrefetchSubscription and to prevent an error
in case acks come back out of order.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2b7bb6f8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2b7bb6f8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2b7bb6f8

Branch: refs/heads/master
Commit: 2b7bb6f81b0ed0a399fd8e85ee88ac0f62fbd9c9
Parents: 9827427
Author: Christopher L. Shannon (cshannon) <[email protected]>
Authored: Thu Aug 6 17:12:18 2015 +0000
Committer: Christopher L. Shannon (cshannon) <[email protected]>
Committed: Thu Aug 6 17:14:09 2015 +0000

----------------------------------------------------------------------
 .../broker/region/TopicSubscription.java        | 81 ++++++++++----------
 1 file changed, 42 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2b7bb6f8/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 17a3137..b20d080 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -17,11 +17,9 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-import java.util.Comparator;
-import java.util.Iterator;
+import java.util.ArrayList;
 import java.util.LinkedList;
-import java.util.NavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -76,19 +74,9 @@ public class TopicSubscription extends AbstractSubscription {
     protected boolean active = false;
     protected boolean discarding = false;
 
-
-    /**
-     * This Map is used to keep track of messages that have been dispatched in 
sorted order to
-     * optimize message acknowledgement
-     */
-    private NavigableMap<MessageId, MessageReference> dispatched = new 
ConcurrentSkipListMap<>(
-            new Comparator<MessageId>() {
-                @Override
-                public int compare(MessageId m1, MessageId m2) {
-                    return m1 == null ? (m2 == null ? 0 : -1) : (m2 == null ? 1
-                            : Long.compare(m1.getBrokerSequenceId(), 
m2.getBrokerSequenceId()));
-                }
-            });
+    //Used for inflight message size calculations
+    protected final Object dispatchLock = new Object();
+    protected final List<MessageReference> dispatched = new 
ArrayList<MessageReference>();
 
     public TopicSubscription(Broker broker,ConnectionContext context, 
ConsumerInfo info, SystemUsage usageManager) throws Exception {
         super(broker, context, info);
@@ -267,11 +255,13 @@ public class TopicSubscription extends 
AbstractSubscription {
                     MessageReference node = matched.next();
                     node.decrementReferenceCount();
                     if (node.getMessageId().equals(mdn.getMessageId())) {
-                        matched.remove();
-                        
getSubscriptionStatistics().getDispatched().increment();
-                        dispatched.put(node.getMessageId(), node);
-                        
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
-                        node.decrementReferenceCount();
+                        synchronized(dispatchLock) {
+                            matched.remove();
+                            
getSubscriptionStatistics().getDispatched().increment();
+                            dispatched.add(node);
+                            
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
+                            node.decrementReferenceCount();
+                        }
                         break;
                     }
                 }
@@ -403,23 +393,31 @@ public class TopicSubscription extends 
AbstractSubscription {
     }
 
     /**
-     * Update the inflight statistics on message ack.  Since a message ack 
could be a range,
-     * we need to grab a subtree of the dispatched map to acknowledge 
messages.  Finding the
-     * subMap is an O(log n) operation.
+     * Update the inflight statistics on message ack.
      * @param ack
      */
     private void updateInflightMessageSizeOnAck(final MessageAck ack) {
-        if (ack.getFirstMessageId() != null) {
-            NavigableMap<MessageId, MessageReference> acked = dispatched
-                    .subMap(ack.getFirstMessageId(), true, 
ack.getLastMessageId(), true);
-            Iterator<MessageId> i = acked.keySet().iterator();
-            while (i.hasNext()) {
-                
getSubscriptionStatistics().getInflightMessageSize().addSize(-acked.get(i.next()).getSize());
-                i.remove();
+        synchronized(dispatchLock) {
+            boolean inAckRange = false;
+            List<MessageReference> removeList = new 
ArrayList<MessageReference>();
+            for (final MessageReference node : dispatched) {
+                MessageId messageId = node.getMessageId();
+                if (ack.getFirstMessageId() == null
+                        || ack.getFirstMessageId().equals(messageId)) {
+                    inAckRange = true;
+                }
+                if (inAckRange) {
+                    removeList.add(node);
+                    if (ack.getLastMessageId().equals(messageId)) {
+                        break;
+                    }
+                }
+            }
+
+            for (final MessageReference node : removeList) {
+                dispatched.remove(node);
+                
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
             }
-        } else {
-            
getSubscriptionStatistics().getInflightMessageSize().addSize(-dispatched.get(ack.getLastMessageId()).getSize());
-            dispatched.remove(ack.getLastMessageId());
         }
     }
 
@@ -645,9 +643,12 @@ public class TopicSubscription extends 
AbstractSubscription {
         md.setConsumerId(info.getConsumerId());
         if (node != null) {
             
md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
-            getSubscriptionStatistics().getDispatched().increment();
-            dispatched.put(node.getMessageId(), node);
-            
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
+            synchronized(dispatchLock) {
+                getSubscriptionStatistics().getDispatched().increment();
+                dispatched.add(node);
+                
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
+            }
+
             // Keep track if this subscription is receiving messages from a 
single destination.
             if (singleDestination) {
                 if (destination == null) {
@@ -729,7 +730,9 @@ public class TopicSubscription extends AbstractSubscription 
{
             }
         }
         setSlowConsumer(false);
-        dispatched.clear();
+        synchronized(dispatchLock) {
+            dispatched.clear();
+        }
     }
 
     @Override

Reply via email to