Author: rgodfrey
Date: Mon Nov 14 09:59:26 2016
New Revision: 1769585

URL: http://svn.apache.org/viewvc?rev=1769585&view=rev
Log:
Replace QueueConsumerList with QueueConsumerManager

Added:
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
   (with props)
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
   (with props)
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
   (with props)
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
      - copied, changed from r1769267, 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/ConsumerNodeIterator.java
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
      - copied, changed from r1769267, 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
   (with props)
Removed:
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/ConsumerNode.java
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/ConsumerNodeIterator.java
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
    
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
Modified:
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java

Modified: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769585&r1=1769584&r2=1769585&view=diff
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Mon Nov 14 09:59:26 2016
@@ -37,6 +37,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -57,6 +58,7 @@ import java.util.zip.GZIPOutputStream;
 
 import javax.security.auth.Subject;
 
+import com.google.common.collect.Lists;
 import com.google.common.io.ByteStreams;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -113,7 +115,6 @@ import org.apache.qpid.server.util.Conne
 import org.apache.qpid.server.util.Deletable;
 import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.HouseKeepingTask;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
@@ -146,18 +147,13 @@ public abstract class AbstractQueue<X ex
     private final QueueManagingVirtualHost<?> _virtualHost;
     private final DeletedChildListener _deletedChildListener = new 
DeletedChildListener();
 
-    private final AccessControlContext _immediateDeliveryContext;
+    private final QueueConsumerManagerImpl _queueConsumerManager;
 
     @ManagedAttributeField( beforeSet = "preSetAlternateExchange", afterSet = 
"postSetAlternateExchange")
     private Exchange _alternateExchange;
 
-
-    private final QueueConsumerList _consumerList = new QueueConsumerList();
-
     private volatile QueueConsumer<?> _exclusiveSubscriber;
 
-
-
     private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
 
     private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
@@ -287,6 +283,8 @@ public abstract class AbstractQueue<X ex
 
     void setNotifyWorkDesired(final QueueConsumer consumer, final boolean 
desired)
     {
+        _queueConsumerManager.setInterest(consumer, desired);
+
         if (desired)
         {
             _activeSubscriberCount.incrementAndGet();
@@ -295,15 +293,31 @@ public abstract class AbstractQueue<X ex
         {
             _activeSubscriberCount.decrementAndGet();
 
-            final ConsumerNodeIterator consumerNodeIterator = 
_consumerList.iterator();
-            while (consumerNodeIterator.advance())
-            {
-                final QueueConsumer s = 
consumerNodeIterator.getNode().getConsumer();
-                if (s != null && s.getPriority() < consumer.getPriority())
+            // iterate over interested and notify one as long as its priority 
is higher than any notified
+            final Iterator<QueueConsumer<?>> consumerIterator = 
_queueConsumerManager.getInterestedIterator();
+            while(consumerIterator.hasNext())
+            {
+                QueueConsumer<?> queueConsumer = consumerIterator.next();
+                //TODO - break here if the consumer has lower priority than 
the highest notified (presuming iterator is priority ordered)
+                if(notifyConsumer(queueConsumer))
                 {
-                    s.notifyWork();
+                    break;
                 }
             }
+
+        }
+    }
+
+    private boolean notifyConsumer(final QueueConsumer<?> consumer)
+    {
+        if(_queueConsumerManager.setNotified(consumer, true))
+        {
+            consumer.notifyWork();
+            return true;
+        }
+        else
+        {
+            return false;
         }
     }
 
@@ -316,10 +330,8 @@ public abstract class AbstractQueue<X ex
     {
         super(parentsMap(virtualHost), attributes);
 
-
         _virtualHost = virtualHost;
-        _immediateDeliveryContext = getSystemTaskControllerContext("Immediate 
Delivery", virtualHost.getPrincipal());
-
+        _queueConsumerManager = new QueueConsumerManagerImpl(this);
     }
 
     @Override
@@ -963,7 +975,7 @@ public abstract class AbstractQueue<X ex
         }
         consumer.setQueueContext(queueContext);
 
-        _consumerList.add(consumer);
+        _queueConsumerManager.addConsumer(consumer);
 
         childAdded(consumer);
         consumer.addChangeListener(_deletedChildListener);
@@ -987,7 +999,7 @@ public abstract class AbstractQueue<X ex
             throw new NullPointerException("consumer argument is null");
         }
 
-        boolean removed = _consumerList.remove(consumer);
+        boolean removed = _queueConsumerManager.removeConsumer(consumer);
 
         if (removed)
         {
@@ -1041,16 +1053,10 @@ public abstract class AbstractQueue<X ex
     @Override
     public Collection<QueueConsumer<?>> getConsumers()
     {
-        List<QueueConsumer<?>> consumers = new ArrayList<QueueConsumer<?>>();
-        ConsumerNodeIterator iter = _consumerList.iterator();
-        while(iter.advance())
-        {
-            consumers.add(iter.getNode().getConsumer());
-        }
-        return consumers;
-
+        return Lists.newArrayList(_queueConsumerManager.getAllIterator());
     }
 
+
     public void resetSubPointersForGroups(QueueConsumer<?> consumer)
     {
         QueueEntry entry = 
_messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
@@ -1062,22 +1068,6 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    @Override
-    public void resetSubPointersForGroups(final QueueEntry entry)
-    {
-        ConsumerNodeIterator subscriberIter = _consumerList.iterator();
-        // iterate over all the subscribers, and if they are in advance of 
this queue entry then move them backwards
-        while (subscriberIter.advance())
-        {
-            QueueConsumer<?> sub = subscriberIter.getNode().getConsumer();
-
-            // we don't make browsers send the same stuff twice
-            if (sub.seesRequeues())
-            {
-                updateSubRequeueEntry(sub, entry);
-            }
-        }
-    }
 
     public void addBinding(final Binding<?> binding)
     {
@@ -1209,7 +1199,7 @@ public abstract class AbstractQueue<X ex
             if (entry.isAvailable())
             {
                 checkConsumersNotAheadOfDelivery(entry);
-                notifyAllConsumers();
+                notifyConsumers(entry);
             }
 
             checkForNotificationOnNewMessage(entry.getMessage());
@@ -1337,7 +1327,6 @@ public abstract class AbstractQueue<X ex
 
     private void updateSubRequeueEntry(final QueueConsumer<?> sub, final 
QueueEntry entry)
     {
-
         QueueContext subContext = sub.getQueueContext();
         if(subContext != null)
         {
@@ -1347,20 +1336,33 @@ public abstract class AbstractQueue<X ex
             {
                 if(QueueContext._releasedUpdater.compareAndSet(subContext, 
oldEntry, entry))
                 {
-                    sub.notifyWork();
+                    notifyConsumer(sub);
                     break;
                 }
             }
         }
     }
 
+
+    @Override
+    public void resetSubPointersForGroups(final QueueEntry entry)
+    {
+        resetSubPointers(entry, true);
+    }
+
+    @Override
     public void requeue(QueueEntry entry)
     {
-        ConsumerNodeIterator subscriberIter = _consumerList.iterator();
+        resetSubPointers(entry, false);
+    }
+
+    private void resetSubPointers(final QueueEntry entry, final boolean 
ignoreAvailable)
+    {
+        Iterator<QueueConsumer<?>> consumerIterator = 
_queueConsumerManager.getAllIterator();
         // iterate over all the subscribers, and if they are in advance of 
this queue entry then move them backwards
-        while (subscriberIter.advance() && entry.isAvailable())
+        while (consumerIterator.hasNext() && (ignoreAvailable || 
entry.isAvailable()))
         {
-            QueueConsumer<?> sub = subscriberIter.getNode().getConsumer();
+            QueueConsumer<?> sub = consumerIterator.next();
 
             // we don't make browsers send the same stuff twice
             if (sub.seesRequeues())
@@ -1370,6 +1372,7 @@ public abstract class AbstractQueue<X ex
         }
     }
 
+
     @Override
     public void dequeue(QueueEntry entry)
     {
@@ -1400,7 +1403,7 @@ public abstract class AbstractQueue<X ex
 
     public int getConsumerCount()
     {
-        return _consumerList.size();
+        return _queueConsumerManager.getAllSize();
     }
 
     public int getConsumerCountWithCredit()
@@ -1527,9 +1530,9 @@ public abstract class AbstractQueue<X ex
     /** Used to track bindings to exchanges so that on deletion they can 
easily be cancelled. */
     abstract QueueEntryList getEntries();
 
-    protected QueueConsumerList getConsumerList()
+    protected final QueueConsumerManagerImpl getQueueConsumerManager()
     {
-        return _consumerList;
+        return _queueConsumerManager;
     }
 
     public EventLogger getEventLogger()
@@ -1771,14 +1774,16 @@ public abstract class AbstractQueue<X ex
                 {
                     try
                     {
-                        final ConsumerNodeIterator consumerNodeIterator = 
_consumerList.iterator();
 
-                        while (consumerNodeIterator.advance())
+                        Iterator<QueueConsumer<?>> consumerIterator = 
_queueConsumerManager.getAllIterator();
+
+                        while (consumerIterator.hasNext())
                         {
-                            final QueueConsumer s = 
consumerNodeIterator.getNode().getConsumer();
-                            if (s != null)
+                            QueueConsumer<?> consumer = 
consumerIterator.next();
+
+                            if (consumer != null)
                             {
-                                s.queueDeleted();
+                                consumer.queueDeleted();
                             }
                         }
 
@@ -1906,31 +1911,66 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    void notifyAllConsumers()
+    void notifyConsumers(QueueEntry entry)
     {
-        ConsumerNode consumerNode = _consumerList.getHead().findNext();
-        while (consumerNode != null)
+
+        Iterator<QueueConsumer<?>> nonAcquiringIterator = 
_queueConsumerManager.getNonAcquiringIterator();
+        while (nonAcquiringIterator.hasNext())
+        {
+            QueueConsumer<?> consumer = nonAcquiringIterator.next();
+            if(consumer.hasInterest(entry))
+            {
+                notifyConsumer(consumer);
+            }
+        }
+
+        // need to take account of priority and potentially and existing 
notified
+        // we don't want to notify lower priority consumers if there exists a 
consumer in the notified set
+        // which can take the message (implies iterating such that you look at 
for each priority look at interested then at notified)
+        final Iterator<QueueConsumer<?>> interestedIterator = 
_queueConsumerManager.getInterestedIterator();
+        while (interestedIterator.hasNext())
         {
-            QueueConsumer<?> consumer = consumerNode.getConsumer();
-            if (consumer.isActive() && getNextAvailableEntry(consumer) != null)
+            QueueConsumer<?> consumer = interestedIterator.next();
+            if(consumer.hasInterest(entry) && notifyConsumer(consumer))
             {
-                consumer.notifyWork();
+                break;
             }
-            consumerNode = consumerNode.findNext();
         }
     }
 
-    MessageContainer deliverSingleMessage(QueueConsumer<?> sub)
+    void notifyConsumers()
+    {
+        final Iterator<QueueConsumer<?>> interestedIterator = 
_queueConsumerManager.getInterestedIterator();
+        while (interestedIterator.hasNext())
+        {
+            QueueConsumer<?> consumer = interestedIterator.next();
+
+            if(notifyConsumer(consumer))
+            {
+                break;
+            }
+        }
+    }
+
+
+    MessageContainer deliverSingleMessage(QueueConsumer<?> consumer)
     {
         boolean queueEmpty = false;
         MessageContainer messageContainer = null;
 
+        _queueConsumerManager.setNotified(consumer, false);
+
         try
         {
-            if (!sub.isSuspended())
+            if (!consumer.isSuspended())
             {
-                messageContainer = attemptDelivery(sub);
-                if (messageContainer == null && getNextAvailableEntry(sub) == 
null)
+                messageContainer = attemptDelivery(consumer);
+                if(messageContainer != null)
+                {
+                    _queueConsumerManager.setNotified(consumer, true);
+                }
+
+                if (messageContainer == null && 
getNextAvailableEntry(consumer) == null)
                 {
                     queueEmpty = true;
                 }
@@ -1938,20 +1978,27 @@ public abstract class AbstractQueue<X ex
             else
             {
                 // avoid referring old deleted queue entry in 
sub._queueContext._lastSeen
-                getNextAvailableEntry(sub);
+                getNextAvailableEntry(consumer);
             }
         }
         finally
         {
             if(queueEmpty)
             {
-                sub.queueEmpty();
+                consumer.queueEmpty();
             }
 
-            sub.flushBatched();
+            consumer.flushBatched();
 
         }
-
+        if(messageContainer == null && consumer.acquires())
+        {
+            // TODO - Should be only checking for available messages
+            if(!isEmpty())
+            {
+                notifyConsumers();
+            }
+        }
         return messageContainer;
     }
 
@@ -1984,7 +2031,7 @@ public abstract class AbstractQueue<X ex
         QueueEntry node  = getNextAvailableEntry(sub);
         boolean subActive = sub.isActive() && !sub.isSuspended();
 
-        if (subActive && (sub.getPriority() == Integer.MAX_VALUE || 
noHigherPriorityWithCredit(sub)))
+        if (subActive && (sub.getPriority() == Integer.MAX_VALUE || 
noHigherPriorityWithCredit(sub, node)))
         {
 
             if (_virtualHost.getState() != State.ACTIVE)
@@ -2034,16 +2081,18 @@ public abstract class AbstractQueue<X ex
         return messageContainer;
     }
 
-    private boolean noHigherPriorityWithCredit(final QueueConsumer<?> sub)
+    boolean noHigherPriorityWithCredit(final QueueConsumer<?> sub, final 
QueueEntry queueEntry)
     {
-        ConsumerNodeIterator iterator = _consumerList.iterator();
-        while(iterator.advance())
+        // TODO - should iterate over list of interested and notified
+        // *** TODO HERE MONDAY *** - can we simply interleave bewteen 
iterators over the two distinct lists of interested and notified
+        Iterator<QueueConsumer<?>> consumerIterator = 
_queueConsumerManager.getAllIterator();
+
+        while (consumerIterator.hasNext())
         {
-            final ConsumerNode node = iterator.getNode();
-            final QueueConsumer consumer = node.getConsumer();
+            QueueConsumer<?> consumer = consumerIterator.next();
             if(consumer.getPriority() > sub.getPriority())
             {
-                if(getNextAvailableEntry(consumer) != null && 
consumer.isNotifyWorkDesired())
+                if(consumer.isNotifyWorkDesired() && 
consumer.hasInterest(queueEntry) && getNextAvailableEntry(consumer) != null)
                 {
                     return false;
                 }
@@ -2053,7 +2102,7 @@ public abstract class AbstractQueue<X ex
     }
 
 
-    private QueueEntry getNextAvailableEntry(final QueueConsumer sub)
+    QueueEntry getNextAvailableEntry(final QueueConsumer sub)
     {
         QueueContext context = sub.getQueueContext();
         if(context != null)
@@ -2732,7 +2781,12 @@ public abstract class AbstractQueue<X ex
         switch (getConsumerCount())
         {
             case 1:
-                _exclusiveSubscriber = 
getConsumerList().getHead().getConsumer();
+                Iterator<QueueConsumer<?>> consumerIterator = 
_queueConsumerManager.getAllIterator();
+
+                if (consumerIterator.hasNext())
+                {
+                    _exclusiveSubscriber = consumerIterator.next();
+                }
                 // deliberate fall through
             case 0:
                 _exclusiveOwner = null;
@@ -2753,8 +2807,11 @@ public abstract class AbstractQueue<X ex
             case CONTAINER:
             case CONNECTION:
                 AMQSessionModel session = null;
-                for(ConsumerImpl c : getConsumers())
+                Iterator<QueueConsumer<?>> queueConsumerIterator = 
_queueConsumerManager.getAllIterator();
+                while(queueConsumerIterator.hasNext())
                 {
+                    QueueConsumer<?> c = queueConsumerIterator.next();
+
                     if(session == null)
                     {
                         session = c.getSessionModel();
@@ -2779,8 +2836,10 @@ public abstract class AbstractQueue<X ex
             case CONTAINER:
             case PRINCIPAL:
                 AMQPConnection con = null;
-                for(ConsumerImpl c : getConsumers())
+                Iterator<QueueConsumer<?>> queueConsumerIterator = 
_queueConsumerManager.getAllIterator();
+                while(queueConsumerIterator.hasNext())
                 {
+                    QueueConsumer<?> c = queueConsumerIterator.next();
                     if(con == null)
                     {
                         con = c.getSessionModel().getAMQPConnection();
@@ -2807,8 +2866,10 @@ public abstract class AbstractQueue<X ex
             case NONE:
             case PRINCIPAL:
                 String containerID = null;
-                for(ConsumerImpl c : getConsumers())
+                Iterator<QueueConsumer<?>> queueConsumerIterator = 
_queueConsumerManager.getAllIterator();
+                while(queueConsumerIterator.hasNext())
                 {
+                    QueueConsumer<?> c = queueConsumerIterator.next();
                     if(containerID == null)
                     {
                         containerID = 
c.getSessionModel().getAMQPConnection().getRemoteContainerName();
@@ -2838,8 +2899,10 @@ public abstract class AbstractQueue<X ex
             case NONE:
             case CONTAINER:
                 Principal principal = null;
-                for(ConsumerImpl c : getConsumers())
+                Iterator<QueueConsumer<?>> queueConsumerIterator = 
_queueConsumerManager.getAllIterator();
+                while(queueConsumerIterator.hasNext())
                 {
+                    QueueConsumer<?> c = queueConsumerIterator.next();
                     if(principal == null)
                     {
                         principal = 
c.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
@@ -2977,7 +3040,7 @@ public abstract class AbstractQueue<X ex
         }
         else if(clazz == org.apache.qpid.server.model.Consumer.class)
         {
-            return (Collection<C>) getConsumers();
+            return (Collection<C>) 
Lists.newArrayList(_queueConsumerManager.getAllIterator());
         }
         else return Collections.emptySet();
     }
@@ -3429,24 +3492,19 @@ public abstract class AbstractQueue<X ex
         @Override
         public void execute()
         {
-
             // if there's (potentially) more than one consumer the others will 
potentially not have been advanced to the
             // next entry they are interested in yet.  This would lead to 
holding on to references to expired messages, etc
             // which would give us memory "leak".
 
-            ConsumerNodeIterator consumerNodeIterator = 
_consumerList.iterator();
-            while (consumerNodeIterator.advance() && !isDeleted())
+            Iterator<QueueConsumer<?>> consumerIterator = 
_queueConsumerManager.getAllIterator();
+
+            while (consumerIterator.hasNext() && !isDeleted())
             {
-                ConsumerNode subNode = consumerNodeIterator.getNode();
-                QueueConsumer sub = subNode.getConsumer();
+                QueueConsumer<?> sub = consumerIterator.next();
                 if(sub.acquires())
                 {
                     getNextAvailableEntry(sub);
                 }
-                else
-                {
-                    // TODO
-                }
             }
         }
     }

Modified: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1769585&r1=1769584&r2=1769585&view=diff
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
 (original)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
 Mon Nov 14 09:59:26 2016
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@@ -36,10 +37,12 @@ public abstract class OutOfOrderQueue<X
     protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
     {
         // check that all consumers are not in advance of the entry
-        ConsumerNodeIterator subIter = getConsumerList().iterator();
-        while(subIter.advance() && !entry.isAcquired())
+        Iterator<QueueConsumer<?>> consumerIterator = 
getQueueConsumerManager().getAllIterator();
+
+        while (consumerIterator.hasNext() && !entry.isAcquired())
         {
-            final QueueConsumer<?> consumer = subIter.getNode().getConsumer();
+            QueueConsumer<?> consumer = consumerIterator.next();
+
             if(!consumer.isClosed())
             {
                 QueueContext context = consumer.getQueueContext();

Modified: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1769585&r1=1769584&r2=1769585&view=diff
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
 (original)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
 Mon Nov 14 09:59:26 2016
@@ -38,6 +38,8 @@ public interface QueueConsumer<X extends
 
     void acquisitionRemoved(QueueEntry node);
 
+    QueueConsumerNode getQueueConsumerNode();
+
     void queueDeleted();
 
     Queue<?> getQueue();
@@ -51,4 +53,6 @@ public interface QueueConsumer<X extends
     boolean isNotifyWorkDesired();
 
     void notifyWork();
+
+    void setQueueConsumerNode(QueueConsumerNode node);
 }

Modified: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1769585&r1=1769584&r2=1769585&view=diff
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
 (original)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
 Mon Nov 14 09:59:26 2016
@@ -97,6 +97,8 @@ class QueueConsumerImpl
     @ManagedAttributeField
     private int _priority;
 
+    private QueueConsumerNode _queueConsumerNode;
+
     QueueConsumerImpl(final AbstractQueue<?> queue,
                       ConsumerTarget target,
                       final String consumerName,
@@ -257,6 +259,18 @@ class QueueConsumerImpl
         _target.notifyWork();
     }
 
+    @Override
+    public void setQueueConsumerNode(final QueueConsumerNode node)
+    {
+        _queueConsumerNode = node;
+    }
+
+    @Override
+    public QueueConsumerNode getQueueConsumerNode()
+    {
+        return _queueConsumerNode;
+    }
+
     public void queueDeleted()
     {
         _target.consumerRemoved(this);

Added: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java?rev=1769585&view=auto
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
 (added)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
 Mon Nov 14 09:59:26 2016
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Iterator;
+
+public interface QueueConsumerManager
+{
+    void addConsumer(QueueConsumer<?> consumer);
+    boolean removeConsumer(QueueConsumer<?> consumer);
+    /*public*/ void setInterest(QueueConsumer<?> consumer, boolean 
interested); // called from Consumer
+    /*private*/ boolean setNotified(QueueConsumer<?> consumer, boolean 
notified); // called from Queue
+
+    // should be priority and then insertion order
+    Iterator<QueueConsumer<?>> getInterestedIterator();
+
+    Iterator<QueueConsumer<?>> getAllIterator();
+    Iterator<QueueConsumer<?>> getNonAcquiringIterator();
+
+    Iterator<QueueConsumer<?>> getPrioritySortedNotifiedOrInterestedIterator();
+
+    int getAllSize();
+    //        int getInterestedSize();
+    int getNotifiedAcquiringSize();
+
+
+}

Propchange: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1769585&view=auto
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
 (added)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
 Mon Nov 14 09:59:26 2016
@@ -0,0 +1,251 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+public class QueueConsumerManagerImpl implements QueueConsumerManager
+{
+    private final AbstractQueue<?> _queue;
+
+    private final ConcurrentLinkedDeque<QueueConsumerNode> _interested = new 
ConcurrentLinkedDeque<QueueConsumerNode>();
+    private final ConcurrentLinkedDeque<QueueConsumerNode> _notInterested = 
new ConcurrentLinkedDeque<QueueConsumerNode>();
+    private final ConcurrentLinkedDeque<QueueConsumerNode> _notified = new 
ConcurrentLinkedDeque<QueueConsumerNode>();
+    private final ConcurrentLinkedDeque<QueueConsumerNode> _nonAcquiring = new 
ConcurrentLinkedDeque<QueueConsumerNode>();
+
+    private final Set<QueueConsumer<?>> _allConsumers = 
Collections.newSetFromMap(new ConcurrentHashMap<QueueConsumer<?>, Boolean>());
+
+    private volatile int _count;
+
+    public QueueConsumerManagerImpl(final AbstractQueue<?> queue)
+    {
+        _queue = queue;
+    }
+
+    // Always in the config thread
+    @Override
+    public void addConsumer(final QueueConsumer<?> consumer)
+    {
+        _allConsumers.add(consumer);
+        QueueConsumerNode node = new QueueConsumerNode(consumer);
+        consumer.setQueueConsumerNode(node);
+        if(consumer.isNotifyWorkDesired())
+        {
+            if (consumer.acquires())
+            {
+                _interested.add(node);
+            }
+            else
+            {
+                _nonAcquiring.add(node);
+            }
+        }
+        else
+        {
+            _notInterested.add(node);
+        }
+        _count++;
+    }
+
+    // Always in the config thread
+    @Override
+    public boolean removeConsumer(final QueueConsumer<?> consumer)
+    {
+        _allConsumers.remove(consumer);
+        QueueConsumerNode node = consumer.getQueueConsumerNode();
+        while(!node.isRemoved())
+        {
+            boolean removed = _notInterested.remove(node);
+            if(!removed)
+            {
+                if(consumer.acquires())
+                {
+                    removed = _interested.remove(node);
+                    removed = removed || _notified.remove(node);
+                }
+                else
+                {
+                    removed = _nonAcquiring.remove(node);
+                }
+            }
+
+            if(removed)
+            {
+                node.setRemoved();
+                _count--;
+                return true;
+            }
+        }
+        return false;
+    }
+
+    // Set by the consumer always in the IO thread
+    @Override
+    public void setInterest(final QueueConsumer consumer, final boolean 
interested)
+    {
+        QueueConsumerNode node = consumer.getQueueConsumerNode();
+        if(interested)
+        {
+            if(_notInterested.remove(node))
+            {
+                if(consumer.acquires())
+                {
+                    _interested.add(node);
+                }
+                else
+                {
+                    _nonAcquiring.add(node);
+                }
+            }
+        }
+        else
+        {
+            if(consumer.acquires())
+            {
+                while(!node.isRemoved())
+                {
+                    if(_interested.remove(node) || _notified.remove(node))
+                    {
+                        _notInterested.add(node);
+                        break;
+                    }
+                }
+            }
+            else
+            {
+                if(_nonAcquiring.remove(node))
+                {
+                    _notInterested.add(node);
+                }
+            }
+        }
+    }
+
+    // Set by the Queue any IO thread
+    @Override
+    public boolean setNotified(final QueueConsumer consumer, final boolean 
notified)
+    {
+        QueueConsumerNode node = consumer.getQueueConsumerNode();
+        if(consumer.acquires())
+        {
+            if(notified)
+            {
+                // TODO - Fix responsibility
+                QueueEntry queueEntry;
+                if((queueEntry = _queue.getNextAvailableEntry(consumer)) != 
null
+                   && _queue.noHigherPriorityWithCredit(consumer, queueEntry)
+                   && _interested.remove(node))
+                {
+                    _notified.add(node);
+                    return true;
+                }
+                else
+                {
+                    return false;
+                }
+            }
+            else
+            {
+                if(_notified.remove(node))
+                {
+                    _interested.add(node);
+                    return true;
+                }
+                else
+                {
+                    return false;
+                }
+            }
+        }
+        else
+        {
+            return true;
+        }
+    }
+
+    @Override
+    public Iterator<QueueConsumer<?>> getInterestedIterator()
+    {
+        return new QueueConsumerIterator(_interested.iterator());
+    }
+
+    @Override
+    public Iterator<QueueConsumer<?>> getAllIterator()
+    {
+        return _allConsumers.iterator();
+    }
+
+    @Override
+    public Iterator<QueueConsumer<?>> getNonAcquiringIterator()
+    {
+        return new QueueConsumerIterator(_nonAcquiring.iterator());
+    }
+
+    @Override
+    public Iterator<QueueConsumer<?>> 
getPrioritySortedNotifiedOrInterestedIterator()
+    {
+        return null;
+    }
+
+    @Override
+    public int getAllSize()
+    {
+        return _count;
+    }
+
+    @Override
+    public int getNotifiedAcquiringSize()
+    {
+        return _notified.size();
+    }
+
+    private static class QueueConsumerIterator implements 
Iterator<QueueConsumer<?>>
+    {
+        private final Iterator<QueueConsumerNode> _underlying;
+
+        private QueueConsumerIterator(final Iterator<QueueConsumerNode> 
underlying)
+        {
+            _underlying = underlying;
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            return _underlying.hasNext();
+        }
+
+        @Override
+        public QueueConsumer<?> next()
+        {
+            return _underlying.next().getQueueConsumer();
+        }
+
+        @Override
+        public void remove()
+        {
+            _underlying.remove();
+        }
+    }
+}

Propchange: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java?rev=1769585&view=auto
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
 (added)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
 Mon Nov 14 09:59:26 2016
@@ -0,0 +1,134 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+final class QueueConsumerNode
+{
+    private final AtomicBoolean _deleted = new AtomicBoolean();
+    private final AtomicReference<QueueConsumerNode> _next = new 
AtomicReference<>();
+
+    private final QueueConsumer<?> _queueConsumer;
+    private volatile boolean _removed;
+    private QueueConsumerNodeListEntry _listEntry;
+
+    QueueConsumerNode(final QueueConsumer<?> queueConsumer)
+    {
+        _queueConsumer = queueConsumer;
+    }
+
+    public QueueConsumerNode()
+    {
+        //used for sentinel head and dummy node construction
+        _queueConsumer = null;
+        _deleted.set(true);
+    }
+
+    public QueueConsumer<?> getQueueConsumer()
+    {
+        return _queueConsumer;
+    }
+
+    public boolean isRemoved()
+    {
+        return _removed;
+    }
+
+    public void setRemoved()
+    {
+        _removed = true;
+    }
+
+
+
+    /**
+     * Retrieves the first non-deleted node following the current node.
+     * Any deleted non-tail nodes encountered during the search are unlinked.
+     *
+     * @return the next non-deleted node, or null if none was found.
+     */
+    public QueueConsumerNode findNext()
+    {
+        QueueConsumerNode next = nextNode();
+        while(next != null && next.isDeleted())
+        {
+            final QueueConsumerNode newNext = next.nextNode();
+            if(newNext != null)
+            {
+                //try to move our _next reference forward to the 'newNext'
+                //node to unlink the deleted node
+                _next.compareAndSet(next, newNext);
+                next = nextNode();
+            }
+            else
+            {
+                //'newNext' is null, meaning 'next' is the current tail. Can't 
unlink
+                //the tail node for thread safety reasons, just use the null.
+                next = null;
+            }
+        }
+
+        return next;
+    }
+
+    /**
+     * Gets the immediately next referenced node in the structure.
+     *
+     * @return the immediately next node in the structure, or null if at the 
tail.
+     */
+    protected QueueConsumerNode nextNode()
+    {
+        return _next.get();
+    }
+
+    /**
+     * Used to initialise the 'next' reference. Will only succeed if the 
reference was not previously set.
+     *
+     * @param node the ConsumerNode to set as 'next'
+     * @return whether the operation succeeded
+     */
+    boolean setNext(final QueueConsumerNode node)
+    {
+        return _next.compareAndSet(null, node);
+    }
+
+    public boolean isDeleted()
+    {
+        return _deleted.get();
+    }
+
+    public boolean delete()
+    {
+        return _deleted.compareAndSet(false,true);
+    }
+
+    public void setListEntry(final QueueConsumerNodeListEntry listEntry)
+    {
+        _listEntry = listEntry;
+    }
+
+    public QueueConsumerNodeListEntry getListEntry()
+    {
+        return _listEntry;
+    }
+}

Propchange: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
 (from r1769267, 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/ConsumerNodeIterator.java)
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java?p2=qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java&p1=qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/ConsumerNodeIterator.java&r1=1769267&r2=1769585&rev=1769585&view=diff
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/ConsumerNodeIterator.java
 (original)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
 Mon Nov 14 09:59:26 2016
@@ -20,24 +20,51 @@
  */
 package org.apache.qpid.server.queue;
 
-public class ConsumerNodeIterator
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class QueueConsumerNodeIterator implements Iterator<QueueConsumerNode>
 {
-    private ConsumerNode _lastNode;
+    private final QueueConsumerNodeList _queueConsumerNodeList;
+    private QueueConsumerNodeListEntry _previous;
+    private QueueConsumerNodeListEntry _next;
 
-    ConsumerNodeIterator(ConsumerNode startNode)
+    QueueConsumerNodeIterator(QueueConsumerNodeList list)
     {
-        _lastNode = startNode;
+        _previous = list.getHead();
+        _queueConsumerNodeList = list;
     }
 
-    public ConsumerNode getNode()
+    @Override
+    public boolean hasNext()
     {
-        return _lastNode;
+        _next = _previous.findNext();
+        return _next != null;
     }
 
-    public boolean advance()
+    @Override
+    public QueueConsumerNode next()
     {
-        _lastNode = _lastNode.findNext();
+        if(_next == null)
+        {
+            _next = _previous.findNext();
+        }
+        if(_next == null)
+        {
+            throw new NoSuchElementException();
+        }
+        _previous = _next;
+        _next = null;
+        return _previous.getQueueConsumerNode();
+    }
 
-        return _lastNode != null;
+    @Override
+    public void remove()
+    {
+        if(_previous.isDeleted())
+        {
+            throw new IllegalStateException();
+        }
+        _queueConsumerNodeList.removeEntry(_previous);
     }
 }

Copied: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
 (from r1769267, 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java)
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java?p2=qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java&p1=qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java&r1=1769267&r2=1769585&rev=1769585&view=diff
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
 (original)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
 Mon Nov 14 09:59:26 2016
@@ -23,20 +23,19 @@ package org.apache.qpid.server.queue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-class QueueConsumerList
+class QueueConsumerNodeList
 {
-    private final ConsumerNode _head = new ConsumerNode();
+    private final QueueConsumerNodeListEntry _head = new 
QueueConsumerNodeListEntry();
 
-    private final AtomicReference<ConsumerNode> _tail = new 
AtomicReference<ConsumerNode>(_head);
-    private final AtomicReference<ConsumerNode> _subNodeMarker = new 
AtomicReference<ConsumerNode>(_head);
+    private final AtomicReference<QueueConsumerNodeListEntry> _tail = new 
AtomicReference<>(_head);
     private final AtomicInteger _size = new AtomicInteger();
 
-    private void insert(final ConsumerNode node, final boolean count)
+    private void insert(final QueueConsumerNodeListEntry node, final boolean 
count)
     {
         for (;;)
         {
-            ConsumerNode tail = _tail.get();
-            ConsumerNode next = tail.nextNode();
+            QueueConsumerNodeListEntry tail = _tail.get();
+            QueueConsumerNodeListEntry next = tail.nextNode();
             if (tail == _tail.get())
             {
                 if (next == null)
@@ -59,103 +58,36 @@ class QueueConsumerList
         }
     }
 
-    public void add(final QueueConsumer<?> sub)
+    public void add(final QueueConsumerNode node)
     {
-        ConsumerNode node = new ConsumerNode(sub);
-        insert(node, true);
+        QueueConsumerNodeListEntry entry = new 
QueueConsumerNodeListEntry(node);
+        insert(entry, true);
     }
 
-    public boolean remove(final QueueConsumer<?> sub)
+    public boolean remove(final QueueConsumerNode consumerNode)
     {
-        ConsumerNode prevNode = _head;
-        ConsumerNode node = _head.nextNode();
-
-        while(node != null)
-        {
-            if(sub.equals(node.getConsumer()) && node.delete())
-            {
-                _size.decrementAndGet();
-
-                ConsumerNode tail = _tail.get();
-                if(node == tail)
-                {
-                    //we cant remove the last node from the structure for
-                    //correctness reasons, however we have just 'deleted'
-                    //the tail. Inserting an empty dummy node after it will
-                    //let us scavenge the node containing the Consumer.
-                    insert(new ConsumerNode(), false);
-                }
-
-                //advance the next node reference in the 'prevNode' to scavenge
-                //the newly 'deleted' node for the Consumer.
-                prevNode.findNext();
-
-                nodeMarkerCleanup(node);
-
-                return true;
-            }
-
-            prevNode = node;
-            node = node.findNext();
-        }
-
-        return false;
+        return removeEntry(consumerNode.getListEntry());
     }
 
-    private void nodeMarkerCleanup(final ConsumerNode node)
+    boolean removeEntry(final QueueConsumerNodeListEntry entry)
     {
-        ConsumerNode markedNode = _subNodeMarker.get();
-        if(node == markedNode)
+        if (entry.delete())
         {
-            //if the marked node is the one we are removing, then
-            //replace it with a dummy pointing at the next node.
-            //this is OK as the marked node is only used to index
-            //into the list and find the next node to use.
-            //Because we inserted a dummy if node was the
-            //tail, markedNode.nextNode() can never be null.
-            ConsumerNode dummy = new ConsumerNode();
-            dummy.setNext(markedNode.nextNode());
-
-            //if the CAS fails the marked node has changed, thus
-            //we don't care about the dummy and just forget it
-            _subNodeMarker.compareAndSet(markedNode, dummy);
+            _size.decrementAndGet();
+            return true;
         }
-        else if(markedNode != null)
+        else
         {
-            //if the marked node was already deleted then it could
-            //hold subsequently removed nodes after it in the list 
-            //in memory. Scavenge it to ensure their actual removal.
-            if(markedNode != _head && markedNode.isDeleted())
-            {
-                markedNode.findNext();
-            }
+            return false;
         }
     }
 
-    public boolean updateMarkedNode(final ConsumerNode expected, final 
ConsumerNode nextNode)
-    {
-        return _subNodeMarker.compareAndSet(expected, nextNode);
-    }
-
-    /**
-     * Get the current marked ConsumerNode. This should only be used only to 
index into the list and find the next node
-     * after the mark, since if the previously marked node was subsequently 
deleted the item returned may be a dummy node
-     * with reference to the next node.
-     *
-     * @return the previously marked node (or a dummy if it was subsequently 
deleted)
-     */
-    public ConsumerNode getMarkedNode()
-    {
-        return _subNodeMarker.get();
-    }
-
-
-    public ConsumerNodeIterator iterator()
+    public QueueConsumerNodeIterator iterator()
     {
-        return new ConsumerNodeIterator(_head);
+        return new QueueConsumerNodeIterator(this);
     }
 
-    public ConsumerNode getHead()
+    public QueueConsumerNodeListEntry getHead()
     {
         return _head;
     }

Added: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java?rev=1769585&view=auto
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
 (added)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
 Mon Nov 14 09:59:26 2016
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+final class QueueConsumerNodeListEntry
+{
+    private final AtomicBoolean _deleted = new AtomicBoolean();
+    private final AtomicReference<QueueConsumerNodeListEntry> _next = new 
AtomicReference<>();
+
+    private final QueueConsumerNode _queueConsumerNode;
+    private volatile boolean _removed;
+
+    QueueConsumerNodeListEntry(final QueueConsumerNode queueConsumerNode)
+    {
+        _queueConsumerNode = queueConsumerNode;
+        queueConsumerNode.setListEntry(this);
+    }
+
+    public QueueConsumerNodeListEntry()
+    {
+        //used for sentinel head and dummy node construction
+        _queueConsumerNode = null;
+        _deleted.set(true);
+    }
+
+    public QueueConsumerNode getQueueConsumerNode()
+    {
+        return _queueConsumerNode;
+    }
+
+    public boolean isRemoved()
+    {
+        return _removed;
+    }
+
+    public void setRemoved()
+    {
+        _removed = true;
+    }
+
+
+
+    /**
+     * Retrieves the first non-deleted node following the current node.
+     * Any deleted non-tail nodes encountered during the search are unlinked.
+     *
+     * @return the next non-deleted node, or null if none was found.
+     */
+    public QueueConsumerNodeListEntry findNext()
+    {
+        QueueConsumerNodeListEntry next = nextNode();
+        while(next != null && next.isDeleted())
+        {
+            final QueueConsumerNodeListEntry newNext = next.nextNode();
+            if(newNext != null)
+            {
+                //try to move our _next reference forward to the 'newNext'
+                //node to unlink the deleted node
+                _next.compareAndSet(next, newNext);
+                next = nextNode();
+            }
+            else
+            {
+                //'newNext' is null, meaning 'next' is the current tail. Can't 
unlink
+                //the tail node for thread safety reasons, just use the null.
+                next = null;
+            }
+        }
+
+        return next;
+    }
+
+    /**
+     * Gets the immediately next referenced node in the structure.
+     *
+     * @return the immediately next node in the structure, or null if at the 
tail.
+     */
+    protected QueueConsumerNodeListEntry nextNode()
+    {
+        return _next.get();
+    }
+
+    /**
+     * Used to initialise the 'next' reference. Will only succeed if the 
reference was not previously set.
+     *
+     * @param node the ConsumerNode to set as 'next'
+     * @return whether the operation succeeded
+     */
+    boolean setNext(final QueueConsumerNodeListEntry node)
+    {
+        return _next.compareAndSet(null, node);
+    }
+
+    public boolean isDeleted()
+    {
+        return _deleted.get();
+    }
+
+    public boolean delete()
+    {
+        return _deleted.compareAndSet(false,true);
+    }
+
+}

Propchange: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java?rev=1769585&r1=1769584&r2=1769585&view=diff
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
 (original)
+++ 
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
 Mon Nov 14 09:59:26 2016
@@ -1391,7 +1391,6 @@ public class AMQPConnection_0_8Impl
         final Action<ProtocolEngine> listener = _workListener.get();
         if(listener != null)
         {
-
             listener.performAction(this);
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to