Author: lquack
Date: Mon Nov 14 15:36:58 2016
New Revision: 1769654

URL: http://svn.apache.org/viewvc?rev=1769654&view=rev
Log:
Use own data structure in QueueConsumerManager and take consumer priority into 
account

Modified:
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
    
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
    
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java

Modified: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
 (original)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
 Mon Nov 14 15:36:58 2016
@@ -90,6 +90,11 @@ public interface Queue<X extends Queue<X
     @ManagedContextDefault( name = QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD)
     long DEFAULT_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = 1024l;
 
+    String QUEUE_SCAVANGE_COUNT = "qpid.queue.scavenge_count";
+    @ManagedContextDefault( name = QUEUE_SCAVANGE_COUNT)
+    int DEFAULT_QUEUE_SCAVANGE_COUNT = 50;
+
+
     String MIME_TYPE_TO_FILE_EXTENSION = "qpid.mimeTypeToFileExtension";
     @SuppressWarnings("unused")
     @ManagedContextDefault(name = MIME_TYPE_TO_FILE_EXTENSION, description = 
"A mapping of MIME types to file extensions.")

Modified: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Mon Nov 14 15:36:58 2016
@@ -147,7 +147,7 @@ public abstract class AbstractQueue<X ex
     private final QueueManagingVirtualHost<?> _virtualHost;
     private final DeletedChildListener _deletedChildListener = new 
DeletedChildListener();
 
-    private final QueueConsumerManagerImpl _queueConsumerManager;
+    private QueueConsumerManagerImpl _queueConsumerManager;
 
     @ManagedAttributeField( beforeSet = "preSetAlternateExchange", afterSet = 
"postSetAlternateExchange")
     private Exchange _alternateExchange;
@@ -331,7 +331,6 @@ public abstract class AbstractQueue<X ex
         super(parentsMap(virtualHost), attributes);
 
         _virtualHost = virtualHost;
-        _queueConsumerManager = new QueueConsumerManagerImpl(this);
     }
 
     @Override
@@ -397,6 +396,7 @@ public abstract class AbstractQueue<X ex
 
         _arguments = Collections.synchronizedMap(arguments);
 
+        _queueConsumerManager = new QueueConsumerManagerImpl(this);
         _logSubject = new QueueLogSubject(this);
         _queueHouseKeepingTask = new AdvanceConsumersTask();
         Subject activeSubject = 
Subject.getSubject(AccessController.getContext());
@@ -1924,7 +1924,7 @@ public abstract class AbstractQueue<X ex
             }
         }
 
-        // need to take account of priority and potentially and existing 
notified
+        // TODO need to take account of priority and potentially and existing 
notified
         // we don't want to notify lower priority consumers if there exists a 
consumer in the notified set
         // which can take the message (implies iterating such that you look at 
for each priority look at interested then at notified)
         final Iterator<QueueConsumer<?>> interestedIterator = 
_queueConsumerManager.getInterestedIterator();
@@ -2031,7 +2031,7 @@ public abstract class AbstractQueue<X ex
         QueueEntry node  = getNextAvailableEntry(sub);
         boolean subActive = sub.isActive() && !sub.isSuspended();
 
-        if (subActive && (sub.getPriority() == Integer.MAX_VALUE || 
noHigherPriorityWithCredit(sub, node)))
+        if (node != null && subActive && (sub.getPriority() == 
Integer.MAX_VALUE || noHigherPriorityWithCredit(sub, node)))
         {
 
             if (_virtualHost.getState() != State.ACTIVE)
@@ -2040,7 +2040,7 @@ public abstract class AbstractQueue<X ex
                                                            "virtualhost state 
" + _virtualHost.getState());
             }
 
-            if (node != null && node.isAvailable())
+            if (node.isAvailable())
             {
                 if (sub.hasInterest(node) && mightAssign(sub, node))
                 {
@@ -2083,8 +2083,6 @@ public abstract class AbstractQueue<X ex
 
     boolean noHigherPriorityWithCredit(final QueueConsumer<?> sub, final 
QueueEntry queueEntry)
     {
-        // TODO - should iterate over list of interested and notified
-        // *** TODO HERE MONDAY *** - can we simply interleave bewteen 
iterators over the two distinct lists of interested and notified
         Iterator<QueueConsumer<?>> consumerIterator = 
_queueConsumerManager.getAllIterator();
 
         while (consumerIterator.hasNext())
@@ -2097,6 +2095,10 @@ public abstract class AbstractQueue<X ex
                     return false;
                 }
             }
+            else
+            {
+                break;
+            }
         }
         return true;
     }
@@ -3040,7 +3042,9 @@ public abstract class AbstractQueue<X ex
         }
         else if(clazz == org.apache.qpid.server.model.Consumer.class)
         {
-            return (Collection<C>) 
Lists.newArrayList(_queueConsumerManager.getAllIterator());
+            return _queueConsumerManager == null
+                    ? Collections.<C>emptySet()
+                    : (Collection<C>) 
Lists.newArrayList(_queueConsumerManager.getAllIterator());
         }
         else return Collections.emptySet();
     }

Modified: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
 (original)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
 Mon Nov 14 15:36:58 2016
@@ -20,6 +20,8 @@
 */
 package org.apache.qpid.server.queue;
 
+import static org.apache.qpid.server.model.Queue.QUEUE_SCAVANGE_COUNT;
+
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -47,13 +49,14 @@ public abstract class OrderedQueueEntryL
                 _nextUpdater = OrderedQueueEntry._nextUpdater;
 
     private AtomicLong _scavenges = new AtomicLong(0L);
-    private final long _scavengeCount = 
Integer.getInteger("qpid.queue.scavenge_count", 50);
+    private final long _scavengeCount;
     private final AtomicReference<QueueEntry> _unscavengedHWM = new 
AtomicReference<QueueEntry>();
 
 
     public OrderedQueueEntryList(Queue<?> queue, HeadCreator headCreator)
     {
         _queue = queue;
+        _scavengeCount = _queue.getContextValue(Integer.class, 
QUEUE_SCAVANGE_COUNT);
         _head = headCreator.createHead(this);
         _tail = _head;
     }

Modified: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
 (original)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
 Mon Nov 14 15:36:58 2016
@@ -20,51 +20,66 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 public class QueueConsumerManagerImpl implements QueueConsumerManager
 {
     private final AbstractQueue<?> _queue;
 
-    private final ConcurrentLinkedDeque<QueueConsumerNode> _interested = new 
ConcurrentLinkedDeque<QueueConsumerNode>();
-    private final ConcurrentLinkedDeque<QueueConsumerNode> _notInterested = 
new ConcurrentLinkedDeque<QueueConsumerNode>();
-    private final ConcurrentLinkedDeque<QueueConsumerNode> _notified = new 
ConcurrentLinkedDeque<QueueConsumerNode>();
-    private final ConcurrentLinkedDeque<QueueConsumerNode> _nonAcquiring = new 
ConcurrentLinkedDeque<QueueConsumerNode>();
+    private final List<PriorityConsumerListPair> _interested;
+    private final QueueConsumerNodeList _notInterested;
+    private final List<PriorityConsumerListPair> _notified;
+    private final QueueConsumerNodeList _nonAcquiring;
 
-    private final Set<QueueConsumer<?>> _allConsumers = 
Collections.newSetFromMap(new ConcurrentHashMap<QueueConsumer<?>, Boolean>());
+    private final List<PriorityConsumerListPair> _allConsumers;
 
     private volatile int _count;
 
+    enum NodeState
+    {
+        REMOVED,
+        INTERESTED,
+        NOT_INTERESTED,
+        NOTIFIED,
+        NON_ACQUIRING;
+    }
+
     public QueueConsumerManagerImpl(final AbstractQueue<?> queue)
     {
         _queue = queue;
+        _notInterested = new QueueConsumerNodeList(queue);
+        _interested = new CopyOnWriteArrayList<>();
+        _notified = new CopyOnWriteArrayList<>();
+        _nonAcquiring = new QueueConsumerNodeList(queue);
+        _allConsumers = new CopyOnWriteArrayList<>();
     }
 
     // Always in the config thread
     @Override
     public void addConsumer(final QueueConsumer<?> consumer)
     {
-        _allConsumers.add(consumer);
-        QueueConsumerNode node = new QueueConsumerNode(consumer);
+        QueueConsumerNode node = new QueueConsumerNode(this, consumer);
+        addToAll(node);
+
         consumer.setQueueConsumerNode(node);
-        if(consumer.isNotifyWorkDesired())
+        if (consumer.isNotifyWorkDesired())
         {
             if (consumer.acquires())
             {
-                _interested.add(node);
+                node.moveFromTo(NodeState.REMOVED, NodeState.INTERESTED);
             }
             else
             {
-                _nonAcquiring.add(node);
+                node.moveFromTo(NodeState.REMOVED, NodeState.NON_ACQUIRING);
             }
         }
         else
         {
-            _notInterested.add(node);
+            node.moveFromTo(NodeState.REMOVED, NodeState.NOT_INTERESTED);
         }
         _count++;
     }
@@ -73,30 +88,13 @@ public class QueueConsumerManagerImpl im
     @Override
     public boolean removeConsumer(final QueueConsumer<?> consumer)
     {
-        _allConsumers.remove(consumer);
+        removeFromAll(consumer);
         QueueConsumerNode node = consumer.getQueueConsumerNode();
-        while(!node.isRemoved())
-        {
-            boolean removed = _notInterested.remove(node);
-            if(!removed)
-            {
-                if(consumer.acquires())
-                {
-                    removed = _interested.remove(node);
-                    removed = removed || _notified.remove(node);
-                }
-                else
-                {
-                    removed = _nonAcquiring.remove(node);
-                }
-            }
 
-            if(removed)
-            {
-                node.setRemoved();
-                _count--;
-                return true;
-            }
+        if 
(node.moveFromTo(EnumSet.complementOf(EnumSet.of(NodeState.REMOVED)), 
NodeState.REMOVED))
+        {
+            _count--;
+            return true;
         }
         return false;
     }
@@ -106,39 +104,26 @@ public class QueueConsumerManagerImpl im
     public void setInterest(final QueueConsumer consumer, final boolean 
interested)
     {
         QueueConsumerNode node = consumer.getQueueConsumerNode();
-        if(interested)
+        if (interested)
         {
-            if(_notInterested.remove(node))
+            if (consumer.acquires())
             {
-                if(consumer.acquires())
-                {
-                    _interested.add(node);
-                }
-                else
-                {
-                    _nonAcquiring.add(node);
-                }
+                node.moveFromTo(NodeState.NOT_INTERESTED, 
NodeState.INTERESTED);
+            }
+            else
+            {
+                node.moveFromTo(NodeState.NOT_INTERESTED, 
NodeState.NON_ACQUIRING);
             }
         }
         else
         {
-            if(consumer.acquires())
+            if (consumer.acquires())
             {
-                while(!node.isRemoved())
-                {
-                    if(_interested.remove(node) || _notified.remove(node))
-                    {
-                        _notInterested.add(node);
-                        break;
-                    }
-                }
+                node.moveFromTo(EnumSet.of(NodeState.INTERESTED, 
NodeState.NOTIFIED), NodeState.NOT_INTERESTED);
             }
             else
             {
-                if(_nonAcquiring.remove(node))
-                {
-                    _notInterested.add(node);
-                }
+                node.moveFromTo(EnumSet.of(NodeState.NON_ACQUIRING), 
NodeState.NOT_INTERESTED);
             }
         }
     }
@@ -148,18 +133,16 @@ public class QueueConsumerManagerImpl im
     public boolean setNotified(final QueueConsumer consumer, final boolean 
notified)
     {
         QueueConsumerNode node = consumer.getQueueConsumerNode();
-        if(consumer.acquires())
+        if (consumer.acquires())
         {
-            if(notified)
+            if (notified)
             {
                 // TODO - Fix responsibility
                 QueueEntry queueEntry;
-                if((queueEntry = _queue.getNextAvailableEntry(consumer)) != 
null
-                   && _queue.noHigherPriorityWithCredit(consumer, queueEntry)
-                   && _interested.remove(node))
+                if ((queueEntry = _queue.getNextAvailableEntry(consumer)) != 
null
+                    && _queue.noHigherPriorityWithCredit(consumer, queueEntry))
                 {
-                    _notified.add(node);
-                    return true;
+                    return node.moveFromTo(NodeState.INTERESTED, 
NodeState.NOTIFIED);
                 }
                 else
                 {
@@ -168,15 +151,7 @@ public class QueueConsumerManagerImpl im
             }
             else
             {
-                if(_notified.remove(node))
-                {
-                    _interested.add(node);
-                    return true;
-                }
-                else
-                {
-                    return false;
-                }
+                return node.moveFromTo(NodeState.NOTIFIED, 
NodeState.INTERESTED);
             }
         }
         else
@@ -188,13 +163,13 @@ public class QueueConsumerManagerImpl im
     @Override
     public Iterator<QueueConsumer<?>> getInterestedIterator()
     {
-        return new QueueConsumerIterator(_interested.iterator());
+        return new QueueConsumerIterator(new 
PrioritisedQueueConsumerNodeIterator(_interested));
     }
 
     @Override
     public Iterator<QueueConsumer<?>> getAllIterator()
     {
-        return _allConsumers.iterator();
+        return new QueueConsumerIterator(new 
PrioritisedQueueConsumerNodeIterator(_allConsumers));
     }
 
     @Override
@@ -221,6 +196,46 @@ public class QueueConsumerManagerImpl im
         return _notified.size();
     }
 
+    QueueConsumerNodeListEntry addNodeToInterestList(final QueueConsumerNode 
queueConsumerNode)
+    {
+        QueueConsumerNodeListEntry newListEntry;
+        switch (queueConsumerNode.getState())
+        {
+            case INTERESTED:
+                newListEntry = null;
+                for (PriorityConsumerListPair pair : _interested)
+                {
+                    if (pair._priority == 
queueConsumerNode.getQueueConsumer().getPriority())
+                    {
+                        newListEntry = pair._consumers.add(queueConsumerNode);
+                        break;
+                    }
+                }
+                break;
+            case NOT_INTERESTED:
+                newListEntry = _notInterested.add(queueConsumerNode);
+                break;
+            case NOTIFIED:
+                newListEntry = null;
+                for (PriorityConsumerListPair pair : _notified)
+                {
+                    if (pair._priority == 
queueConsumerNode.getQueueConsumer().getPriority())
+                    {
+                        newListEntry = pair._consumers.add(queueConsumerNode);
+                        break;
+                    }
+                }
+                break;
+            case NON_ACQUIRING:
+                newListEntry = _nonAcquiring.add(queueConsumerNode);
+                break;
+            default:
+                newListEntry = null;
+                break;
+        }
+        return newListEntry;
+    }
+
     private static class QueueConsumerIterator implements 
Iterator<QueueConsumer<?>>
     {
         private final Iterator<QueueConsumerNode> _underlying;
@@ -248,4 +263,120 @@ public class QueueConsumerManagerImpl im
             _underlying.remove();
         }
     }
+
+    private void addToAll(final QueueConsumerNode consumerNode)
+    {
+        int consumerPriority = consumerNode.getQueueConsumer().getPriority();
+        int i;
+        for (i = 0; i < _allConsumers.size(); ++i)
+        {
+            final PriorityConsumerListPair priorityConsumerListPair = 
_allConsumers.get(i);
+            if (priorityConsumerListPair._priority == consumerPriority)
+            {
+                final QueueConsumerNodeListEntry entry = 
priorityConsumerListPair._consumers.add(consumerNode);
+                consumerNode.setAllEntry(entry);
+                return;
+            }
+            else if (priorityConsumerListPair._priority < consumerPriority)
+            {
+                break;
+            }
+        }
+
+        PriorityConsumerListPair newPriorityConsumerListPair = new 
PriorityConsumerListPair(consumerPriority);
+        final QueueConsumerNodeListEntry entry = 
newPriorityConsumerListPair._consumers.add(consumerNode);
+        consumerNode.setAllEntry(entry);
+        _allConsumers.add(i, newPriorityConsumerListPair);
+        _notified.add(i, new PriorityConsumerListPair(consumerPriority));
+        _interested.add(i, new PriorityConsumerListPair(consumerPriority));
+    }
+
+    private void removeFromAll(final QueueConsumer<?> consumer)
+    {
+        final QueueConsumerNode node = consumer.getQueueConsumerNode();
+        int consumerPriority = consumer.getPriority();
+        for (int i = 0; i < _allConsumers.size(); ++i)
+        {
+            final PriorityConsumerListPair priorityConsumerListPair = 
_allConsumers.get(i);
+            if (priorityConsumerListPair._priority == consumerPriority)
+            {
+                
priorityConsumerListPair._consumers.removeEntry(node.getAllEntry());
+                if (priorityConsumerListPair._consumers.isEmpty())
+                {
+                    _allConsumers.remove(i);
+                    _notified.remove(i);
+                    _interested.remove(i);
+                }
+                return;
+            }
+            else if (priorityConsumerListPair._priority < consumerPriority)
+            {
+                break;
+            }
+        }
+    }
+
+
+    private class PriorityConsumerListPair
+    {
+        final int _priority;
+        final QueueConsumerNodeList _consumers;
+
+        private PriorityConsumerListPair(final int priority)
+        {
+            _priority = priority;
+            _consumers = new QueueConsumerNodeList(_queue);
+        }
+    }
+
+    private class PrioritisedQueueConsumerNodeIterator implements 
Iterator<QueueConsumerNode>
+    {
+        final Iterator<PriorityConsumerListPair> _outerIterator;
+        Iterator<QueueConsumerNode> _innerIterator;
+
+        private 
PrioritisedQueueConsumerNodeIterator(List<PriorityConsumerListPair> list)
+        {
+            _outerIterator = list.iterator();
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            while (true)
+            {
+                if (_innerIterator != null && _innerIterator.hasNext())
+                {
+                    return true;
+                }
+                else if (_outerIterator.hasNext())
+                {
+                    final PriorityConsumerListPair priorityConsumersPair = 
_outerIterator.next();
+                    _innerIterator = 
priorityConsumersPair._consumers.iterator();
+                }
+                else
+                {
+                    return false;
+                }
+            }
+        }
+
+        @Override
+        public QueueConsumerNode next()
+        {
+            if (hasNext())
+            {
+                return _innerIterator.next();
+            }
+            else
+            {
+                throw new NoSuchElementException();
+            }
+        }
+
+        @Override
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
 }

Modified: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
 (original)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
 Mon Nov 14 15:36:58 2016
@@ -20,115 +20,68 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Collection;
+import java.util.EnumSet;
 
 final class QueueConsumerNode
 {
-    private final AtomicBoolean _deleted = new AtomicBoolean();
-    private final AtomicReference<QueueConsumerNode> _next = new 
AtomicReference<>();
-
+    private final QueueConsumerManagerImpl _queueConsumerManager;
     private final QueueConsumer<?> _queueConsumer;
-    private volatile boolean _removed;
     private QueueConsumerNodeListEntry _listEntry;
+    private QueueConsumerManagerImpl.NodeState _state = 
QueueConsumerManagerImpl.NodeState.REMOVED;
+    private QueueConsumerNodeListEntry _allEntry;
 
-    QueueConsumerNode(final QueueConsumer<?> queueConsumer)
+    QueueConsumerNode(final QueueConsumerManagerImpl queueConsumerManager, 
final QueueConsumer<?> queueConsumer)
     {
+        _queueConsumerManager = queueConsumerManager;
         _queueConsumer = queueConsumer;
     }
 
-    public QueueConsumerNode()
-    {
-        //used for sentinel head and dummy node construction
-        _queueConsumer = null;
-        _deleted.set(true);
-    }
-
     public QueueConsumer<?> getQueueConsumer()
     {
         return _queueConsumer;
     }
 
-    public boolean isRemoved()
+    public QueueConsumerNodeListEntry getListEntry()
     {
-        return _removed;
+        return _listEntry;
     }
 
-    public void setRemoved()
+    public boolean moveFromTo(QueueConsumerManagerImpl.NodeState fromState, 
QueueConsumerManagerImpl.NodeState toState)
     {
-        _removed = true;
+        return moveFromTo(EnumSet.of(fromState), toState);
     }
 
+    public QueueConsumerManagerImpl.NodeState getState()
+    {
+        return _state;
+    }
 
-
-    /**
-     * Retrieves the first non-deleted node following the current node.
-     * Any deleted non-tail nodes encountered during the search are unlinked.
-     *
-     * @return the next non-deleted node, or null if none was found.
-     */
-    public QueueConsumerNode findNext()
+    public synchronized boolean 
moveFromTo(Collection<QueueConsumerManagerImpl.NodeState> fromStates, 
QueueConsumerManagerImpl.NodeState toState)
     {
-        QueueConsumerNode next = nextNode();
-        while(next != null && next.isDeleted())
+        if (fromStates.contains(_state))
         {
-            final QueueConsumerNode newNext = next.nextNode();
-            if(newNext != null)
-            {
-                //try to move our _next reference forward to the 'newNext'
-                //node to unlink the deleted node
-                _next.compareAndSet(next, newNext);
-                next = nextNode();
-            }
-            else
+            if (_listEntry != null)
             {
-                //'newNext' is null, meaning 'next' is the current tail. Can't 
unlink
-                //the tail node for thread safety reasons, just use the null.
-                next = null;
+                _listEntry.remove();
             }
+            _state = toState;
+            _listEntry = _queueConsumerManager.addNodeToInterestList(this);
+            return true;
+        }
+        else
+        {
+            return false;
         }
-
-        return next;
-    }
-
-    /**
-     * Gets the immediately next referenced node in the structure.
-     *
-     * @return the immediately next node in the structure, or null if at the 
tail.
-     */
-    protected QueueConsumerNode nextNode()
-    {
-        return _next.get();
-    }
-
-    /**
-     * Used to initialise the 'next' reference. Will only succeed if the 
reference was not previously set.
-     *
-     * @param node the ConsumerNode to set as 'next'
-     * @return whether the operation succeeded
-     */
-    boolean setNext(final QueueConsumerNode node)
-    {
-        return _next.compareAndSet(null, node);
-    }
-
-    public boolean isDeleted()
-    {
-        return _deleted.get();
-    }
-
-    public boolean delete()
-    {
-        return _deleted.compareAndSet(false,true);
     }
 
-    public void setListEntry(final QueueConsumerNodeListEntry listEntry)
+    public QueueConsumerNodeListEntry getAllEntry()
     {
-        _listEntry = listEntry;
+        return _allEntry;
     }
 
-    public QueueConsumerNodeListEntry getListEntry()
+    public void setAllEntry(final QueueConsumerNodeListEntry allEntry)
     {
-        return _listEntry;
+        _allEntry = allEntry;
     }
 }

Modified: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
 (original)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
 Mon Nov 14 15:36:58 2016
@@ -25,14 +25,12 @@ import java.util.NoSuchElementException;
 
 public class QueueConsumerNodeIterator implements Iterator<QueueConsumerNode>
 {
-    private final QueueConsumerNodeList _queueConsumerNodeList;
     private QueueConsumerNodeListEntry _previous;
     private QueueConsumerNodeListEntry _next;
 
     QueueConsumerNodeIterator(QueueConsumerNodeList list)
     {
         _previous = list.getHead();
-        _queueConsumerNodeList = list;
     }
 
     @Override
@@ -61,10 +59,7 @@ public class QueueConsumerNodeIterator i
     @Override
     public void remove()
     {
-        if(_previous.isDeleted())
-        {
-            throw new IllegalStateException();
-        }
-        _queueConsumerNodeList.removeEntry(_previous);
+        // code should use QueueConsumerNodeListEntry#remove instead
+        throw new UnsupportedOperationException();
     }
 }

Modified: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
 (original)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
 Mon Nov 14 15:36:58 2016
@@ -20,15 +20,28 @@
 */
 package org.apache.qpid.server.queue;
 
+import static org.apache.qpid.server.model.Queue.QUEUE_SCAVANGE_COUNT;
+
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.qpid.server.model.Queue;
+
 class QueueConsumerNodeList
 {
-    private final QueueConsumerNodeListEntry _head = new 
QueueConsumerNodeListEntry();
+    private final QueueConsumerNodeListEntry _head;
 
-    private final AtomicReference<QueueConsumerNodeListEntry> _tail = new 
AtomicReference<>(_head);
+    private final AtomicReference<QueueConsumerNodeListEntry> _tail;
     private final AtomicInteger _size = new AtomicInteger();
+    private final AtomicInteger _scavengeCount = new AtomicInteger();
+    private final int _scavengeCountThreshold;
+
+    QueueConsumerNodeList(final Queue<?> queue)
+    {
+        _head = new QueueConsumerNodeListEntry(this);
+        _tail = new AtomicReference<>(_head);
+        _scavengeCountThreshold = queue.getContextValue(Integer.class, 
QUEUE_SCAVANGE_COUNT);
+    }
 
     private void insert(final QueueConsumerNodeListEntry node, final boolean 
count)
     {
@@ -58,22 +71,22 @@ class QueueConsumerNodeList
         }
     }
 
-    public void add(final QueueConsumerNode node)
+    public QueueConsumerNodeListEntry add(final QueueConsumerNode node)
     {
-        QueueConsumerNodeListEntry entry = new 
QueueConsumerNodeListEntry(node);
+        QueueConsumerNodeListEntry entry = new 
QueueConsumerNodeListEntry(this, node);
         insert(entry, true);
-    }
-
-    public boolean remove(final QueueConsumerNode consumerNode)
-    {
-        return removeEntry(consumerNode.getListEntry());
+        return entry;
     }
 
     boolean removeEntry(final QueueConsumerNodeListEntry entry)
     {
-        if (entry.delete())
+        if (entry.setDeleted())
         {
             _size.decrementAndGet();
+            if (_scavengeCount.incrementAndGet() > _scavengeCountThreshold)
+            {
+                scavenge();
+            }
             return true;
         }
         else
@@ -82,6 +95,16 @@ class QueueConsumerNodeList
         }
     }
 
+    private void scavenge()
+    {
+        _scavengeCount.set(0);
+        QueueConsumerNodeListEntry node = _head;
+        while (node != null)
+        {
+            node = node.findNext();
+        }
+    }
+
     public QueueConsumerNodeIterator iterator()
     {
         return new QueueConsumerNodeIterator(this);
@@ -96,7 +119,9 @@ class QueueConsumerNodeList
     {
         return _size.get();
     }
-}
-
-
 
+    public boolean isEmpty()
+    {
+        return _size.get() == 0;
+    }
+}

Modified: 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
 (original)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
 Mon Nov 14 15:36:58 2016
@@ -29,16 +29,17 @@ final class QueueConsumerNodeListEntry
     private final AtomicReference<QueueConsumerNodeListEntry> _next = new 
AtomicReference<>();
 
     private final QueueConsumerNode _queueConsumerNode;
-    private volatile boolean _removed;
+    private final QueueConsumerNodeList _list;
 
-    QueueConsumerNodeListEntry(final QueueConsumerNode queueConsumerNode)
+    QueueConsumerNodeListEntry(final QueueConsumerNodeList list, final 
QueueConsumerNode queueConsumerNode)
     {
+        _list = list;
         _queueConsumerNode = queueConsumerNode;
-        queueConsumerNode.setListEntry(this);
     }
 
-    public QueueConsumerNodeListEntry()
+    public QueueConsumerNodeListEntry(QueueConsumerNodeList list)
     {
+        _list = list;
         //used for sentinel head and dummy node construction
         _queueConsumerNode = null;
         _deleted.set(true);
@@ -49,17 +50,6 @@ final class QueueConsumerNodeListEntry
         return _queueConsumerNode;
     }
 
-    public boolean isRemoved()
-    {
-        return _removed;
-    }
-
-    public void setRemoved()
-    {
-        _removed = true;
-    }
-
-
 
     /**
      * Retrieves the first non-deleted node following the current node.
@@ -112,14 +102,19 @@ final class QueueConsumerNodeListEntry
         return _next.compareAndSet(null, node);
     }
 
+    public void remove()
+    {
+        _list.removeEntry(this);
+    }
+
     public boolean isDeleted()
     {
         return _deleted.get();
     }
 
-    public boolean delete()
+    boolean setDeleted()
     {
-        return _deleted.compareAndSet(false,true);
+        return _deleted.compareAndSet(false, true);
     }
 
 }

Modified: 
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
--- 
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
 (original)
+++ 
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
 Mon Nov 14 15:36:58 2016
@@ -20,6 +20,7 @@
 */
 package org.apache.qpid.server.queue;
 
+import static org.apache.qpid.server.model.Queue.QUEUE_SCAVANGE_COUNT;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -48,15 +49,11 @@ public class StandardQueueEntryListTest
     private StandardQueueImpl _testQueue;
     private StandardQueueEntryList _sqel;
 
-    private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count";
-    private String oldScavengeValue = null;
     private ConfiguredObjectFactoryImpl _factory;
 
     @Override
     protected void setUp()
     {
-        oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9");
-
         Map<String,Object> queueAttributes = new HashMap<String, Object>();
         queueAttributes.put(Queue.ID, UUID.randomUUID());
         queueAttributes.put(Queue.NAME, getName());
@@ -87,19 +84,6 @@ public class StandardQueueEntryListTest
     }
 
     @Override
-    protected void tearDown()
-    {
-        if(oldScavengeValue != null)
-        {
-            System.setProperty(SCAVENGE_PROP, oldScavengeValue);
-        }
-        else
-        {
-            System.clearProperty(SCAVENGE_PROP);
-        }
-    }
-
-    @Override
     public StandardQueueEntryList getTestList()
     {
         return getTestList(false);
@@ -160,7 +144,9 @@ public class StandardQueueEntryListTest
 
     public void testScavenge() throws Exception
     {
-        OrderedQueueEntryList sqel = new 
StandardQueueEntryList(mock(StandardQueueImpl.class));
+        StandardQueueImpl mockQueue = mock(StandardQueueImpl.class);
+        when(mockQueue.getContextValue(Integer.class, 
QUEUE_SCAVANGE_COUNT)).thenReturn(9);
+        OrderedQueueEntryList sqel = new StandardQueueEntryList(mockQueue);
         ConcurrentMap<Integer,QueueEntry> entriesMap = new 
ConcurrentHashMap<Integer,QueueEntry>();
 
 




---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to