Author: lquack
Date: Mon Nov 14 15:36:58 2016
New Revision: 1769654
URL: http://svn.apache.org/viewvc?rev=1769654&view=rev
Log:
Use own data structure in QueueConsumerManager and take consumer priority into
account
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
(original)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
Mon Nov 14 15:36:58 2016
@@ -90,6 +90,11 @@ public interface Queue<X extends Queue<X
@ManagedContextDefault( name = QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD)
long DEFAULT_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = 1024l;
+ String QUEUE_SCAVANGE_COUNT = "qpid.queue.scavenge_count";
+ @ManagedContextDefault( name = QUEUE_SCAVANGE_COUNT)
+ int DEFAULT_QUEUE_SCAVANGE_COUNT = 50;
+
+
String MIME_TYPE_TO_FILE_EXTENSION = "qpid.mimeTypeToFileExtension";
@SuppressWarnings("unused")
@ManagedContextDefault(name = MIME_TYPE_TO_FILE_EXTENSION, description =
"A mapping of MIME types to file extensions.")
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Mon Nov 14 15:36:58 2016
@@ -147,7 +147,7 @@ public abstract class AbstractQueue<X ex
private final QueueManagingVirtualHost<?> _virtualHost;
private final DeletedChildListener _deletedChildListener = new
DeletedChildListener();
- private final QueueConsumerManagerImpl _queueConsumerManager;
+ private QueueConsumerManagerImpl _queueConsumerManager;
@ManagedAttributeField( beforeSet = "preSetAlternateExchange", afterSet =
"postSetAlternateExchange")
private Exchange _alternateExchange;
@@ -331,7 +331,6 @@ public abstract class AbstractQueue<X ex
super(parentsMap(virtualHost), attributes);
_virtualHost = virtualHost;
- _queueConsumerManager = new QueueConsumerManagerImpl(this);
}
@Override
@@ -397,6 +396,7 @@ public abstract class AbstractQueue<X ex
_arguments = Collections.synchronizedMap(arguments);
+ _queueConsumerManager = new QueueConsumerManagerImpl(this);
_logSubject = new QueueLogSubject(this);
_queueHouseKeepingTask = new AdvanceConsumersTask();
Subject activeSubject =
Subject.getSubject(AccessController.getContext());
@@ -1924,7 +1924,7 @@ public abstract class AbstractQueue<X ex
}
}
- // need to take account of priority and potentially and existing
notified
+ // TODO need to take account of priority and potentially and existing
notified
// we don't want to notify lower priority consumers if there exists a
consumer in the notified set
// which can take the message (implies iterating such that you look at
for each priority look at interested then at notified)
final Iterator<QueueConsumer<?>> interestedIterator =
_queueConsumerManager.getInterestedIterator();
@@ -2031,7 +2031,7 @@ public abstract class AbstractQueue<X ex
QueueEntry node = getNextAvailableEntry(sub);
boolean subActive = sub.isActive() && !sub.isSuspended();
- if (subActive && (sub.getPriority() == Integer.MAX_VALUE ||
noHigherPriorityWithCredit(sub, node)))
+ if (node != null && subActive && (sub.getPriority() ==
Integer.MAX_VALUE || noHigherPriorityWithCredit(sub, node)))
{
if (_virtualHost.getState() != State.ACTIVE)
@@ -2040,7 +2040,7 @@ public abstract class AbstractQueue<X ex
"virtualhost state
" + _virtualHost.getState());
}
- if (node != null && node.isAvailable())
+ if (node.isAvailable())
{
if (sub.hasInterest(node) && mightAssign(sub, node))
{
@@ -2083,8 +2083,6 @@ public abstract class AbstractQueue<X ex
boolean noHigherPriorityWithCredit(final QueueConsumer<?> sub, final
QueueEntry queueEntry)
{
- // TODO - should iterate over list of interested and notified
- // *** TODO HERE MONDAY *** - can we simply interleave bewteen
iterators over the two distinct lists of interested and notified
Iterator<QueueConsumer<?>> consumerIterator =
_queueConsumerManager.getAllIterator();
while (consumerIterator.hasNext())
@@ -2097,6 +2095,10 @@ public abstract class AbstractQueue<X ex
return false;
}
}
+ else
+ {
+ break;
+ }
}
return true;
}
@@ -3040,7 +3042,9 @@ public abstract class AbstractQueue<X ex
}
else if(clazz == org.apache.qpid.server.model.Consumer.class)
{
- return (Collection<C>)
Lists.newArrayList(_queueConsumerManager.getAllIterator());
+ return _queueConsumerManager == null
+ ? Collections.<C>emptySet()
+ : (Collection<C>)
Lists.newArrayList(_queueConsumerManager.getAllIterator());
}
else return Collections.emptySet();
}
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
(original)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
Mon Nov 14 15:36:58 2016
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.queue;
+import static org.apache.qpid.server.model.Queue.QUEUE_SCAVANGE_COUNT;
+
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -47,13 +49,14 @@ public abstract class OrderedQueueEntryL
_nextUpdater = OrderedQueueEntry._nextUpdater;
private AtomicLong _scavenges = new AtomicLong(0L);
- private final long _scavengeCount =
Integer.getInteger("qpid.queue.scavenge_count", 50);
+ private final long _scavengeCount;
private final AtomicReference<QueueEntry> _unscavengedHWM = new
AtomicReference<QueueEntry>();
public OrderedQueueEntryList(Queue<?> queue, HeadCreator headCreator)
{
_queue = queue;
+ _scavengeCount = _queue.getContextValue(Integer.class,
QUEUE_SCAVANGE_COUNT);
_head = headCreator.createHead(this);
_tail = _head;
}
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
(original)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
Mon Nov 14 15:36:58 2016
@@ -20,51 +20,66 @@
*/
package org.apache.qpid.server.queue;
-import java.util.Collections;
+import java.util.EnumSet;
import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CopyOnWriteArrayList;
public class QueueConsumerManagerImpl implements QueueConsumerManager
{
private final AbstractQueue<?> _queue;
- private final ConcurrentLinkedDeque<QueueConsumerNode> _interested = new
ConcurrentLinkedDeque<QueueConsumerNode>();
- private final ConcurrentLinkedDeque<QueueConsumerNode> _notInterested =
new ConcurrentLinkedDeque<QueueConsumerNode>();
- private final ConcurrentLinkedDeque<QueueConsumerNode> _notified = new
ConcurrentLinkedDeque<QueueConsumerNode>();
- private final ConcurrentLinkedDeque<QueueConsumerNode> _nonAcquiring = new
ConcurrentLinkedDeque<QueueConsumerNode>();
+ private final List<PriorityConsumerListPair> _interested;
+ private final QueueConsumerNodeList _notInterested;
+ private final List<PriorityConsumerListPair> _notified;
+ private final QueueConsumerNodeList _nonAcquiring;
- private final Set<QueueConsumer<?>> _allConsumers =
Collections.newSetFromMap(new ConcurrentHashMap<QueueConsumer<?>, Boolean>());
+ private final List<PriorityConsumerListPair> _allConsumers;
private volatile int _count;
+ enum NodeState
+ {
+ REMOVED,
+ INTERESTED,
+ NOT_INTERESTED,
+ NOTIFIED,
+ NON_ACQUIRING;
+ }
+
public QueueConsumerManagerImpl(final AbstractQueue<?> queue)
{
_queue = queue;
+ _notInterested = new QueueConsumerNodeList(queue);
+ _interested = new CopyOnWriteArrayList<>();
+ _notified = new CopyOnWriteArrayList<>();
+ _nonAcquiring = new QueueConsumerNodeList(queue);
+ _allConsumers = new CopyOnWriteArrayList<>();
}
// Always in the config thread
@Override
public void addConsumer(final QueueConsumer<?> consumer)
{
- _allConsumers.add(consumer);
- QueueConsumerNode node = new QueueConsumerNode(consumer);
+ QueueConsumerNode node = new QueueConsumerNode(this, consumer);
+ addToAll(node);
+
consumer.setQueueConsumerNode(node);
- if(consumer.isNotifyWorkDesired())
+ if (consumer.isNotifyWorkDesired())
{
if (consumer.acquires())
{
- _interested.add(node);
+ node.moveFromTo(NodeState.REMOVED, NodeState.INTERESTED);
}
else
{
- _nonAcquiring.add(node);
+ node.moveFromTo(NodeState.REMOVED, NodeState.NON_ACQUIRING);
}
}
else
{
- _notInterested.add(node);
+ node.moveFromTo(NodeState.REMOVED, NodeState.NOT_INTERESTED);
}
_count++;
}
@@ -73,30 +88,13 @@ public class QueueConsumerManagerImpl im
@Override
public boolean removeConsumer(final QueueConsumer<?> consumer)
{
- _allConsumers.remove(consumer);
+ removeFromAll(consumer);
QueueConsumerNode node = consumer.getQueueConsumerNode();
- while(!node.isRemoved())
- {
- boolean removed = _notInterested.remove(node);
- if(!removed)
- {
- if(consumer.acquires())
- {
- removed = _interested.remove(node);
- removed = removed || _notified.remove(node);
- }
- else
- {
- removed = _nonAcquiring.remove(node);
- }
- }
- if(removed)
- {
- node.setRemoved();
- _count--;
- return true;
- }
+ if
(node.moveFromTo(EnumSet.complementOf(EnumSet.of(NodeState.REMOVED)),
NodeState.REMOVED))
+ {
+ _count--;
+ return true;
}
return false;
}
@@ -106,39 +104,26 @@ public class QueueConsumerManagerImpl im
public void setInterest(final QueueConsumer consumer, final boolean
interested)
{
QueueConsumerNode node = consumer.getQueueConsumerNode();
- if(interested)
+ if (interested)
{
- if(_notInterested.remove(node))
+ if (consumer.acquires())
{
- if(consumer.acquires())
- {
- _interested.add(node);
- }
- else
- {
- _nonAcquiring.add(node);
- }
+ node.moveFromTo(NodeState.NOT_INTERESTED,
NodeState.INTERESTED);
+ }
+ else
+ {
+ node.moveFromTo(NodeState.NOT_INTERESTED,
NodeState.NON_ACQUIRING);
}
}
else
{
- if(consumer.acquires())
+ if (consumer.acquires())
{
- while(!node.isRemoved())
- {
- if(_interested.remove(node) || _notified.remove(node))
- {
- _notInterested.add(node);
- break;
- }
- }
+ node.moveFromTo(EnumSet.of(NodeState.INTERESTED,
NodeState.NOTIFIED), NodeState.NOT_INTERESTED);
}
else
{
- if(_nonAcquiring.remove(node))
- {
- _notInterested.add(node);
- }
+ node.moveFromTo(EnumSet.of(NodeState.NON_ACQUIRING),
NodeState.NOT_INTERESTED);
}
}
}
@@ -148,18 +133,16 @@ public class QueueConsumerManagerImpl im
public boolean setNotified(final QueueConsumer consumer, final boolean
notified)
{
QueueConsumerNode node = consumer.getQueueConsumerNode();
- if(consumer.acquires())
+ if (consumer.acquires())
{
- if(notified)
+ if (notified)
{
// TODO - Fix responsibility
QueueEntry queueEntry;
- if((queueEntry = _queue.getNextAvailableEntry(consumer)) !=
null
- && _queue.noHigherPriorityWithCredit(consumer, queueEntry)
- && _interested.remove(node))
+ if ((queueEntry = _queue.getNextAvailableEntry(consumer)) !=
null
+ && _queue.noHigherPriorityWithCredit(consumer, queueEntry))
{
- _notified.add(node);
- return true;
+ return node.moveFromTo(NodeState.INTERESTED,
NodeState.NOTIFIED);
}
else
{
@@ -168,15 +151,7 @@ public class QueueConsumerManagerImpl im
}
else
{
- if(_notified.remove(node))
- {
- _interested.add(node);
- return true;
- }
- else
- {
- return false;
- }
+ return node.moveFromTo(NodeState.NOTIFIED,
NodeState.INTERESTED);
}
}
else
@@ -188,13 +163,13 @@ public class QueueConsumerManagerImpl im
@Override
public Iterator<QueueConsumer<?>> getInterestedIterator()
{
- return new QueueConsumerIterator(_interested.iterator());
+ return new QueueConsumerIterator(new
PrioritisedQueueConsumerNodeIterator(_interested));
}
@Override
public Iterator<QueueConsumer<?>> getAllIterator()
{
- return _allConsumers.iterator();
+ return new QueueConsumerIterator(new
PrioritisedQueueConsumerNodeIterator(_allConsumers));
}
@Override
@@ -221,6 +196,46 @@ public class QueueConsumerManagerImpl im
return _notified.size();
}
+ QueueConsumerNodeListEntry addNodeToInterestList(final QueueConsumerNode
queueConsumerNode)
+ {
+ QueueConsumerNodeListEntry newListEntry;
+ switch (queueConsumerNode.getState())
+ {
+ case INTERESTED:
+ newListEntry = null;
+ for (PriorityConsumerListPair pair : _interested)
+ {
+ if (pair._priority ==
queueConsumerNode.getQueueConsumer().getPriority())
+ {
+ newListEntry = pair._consumers.add(queueConsumerNode);
+ break;
+ }
+ }
+ break;
+ case NOT_INTERESTED:
+ newListEntry = _notInterested.add(queueConsumerNode);
+ break;
+ case NOTIFIED:
+ newListEntry = null;
+ for (PriorityConsumerListPair pair : _notified)
+ {
+ if (pair._priority ==
queueConsumerNode.getQueueConsumer().getPriority())
+ {
+ newListEntry = pair._consumers.add(queueConsumerNode);
+ break;
+ }
+ }
+ break;
+ case NON_ACQUIRING:
+ newListEntry = _nonAcquiring.add(queueConsumerNode);
+ break;
+ default:
+ newListEntry = null;
+ break;
+ }
+ return newListEntry;
+ }
+
private static class QueueConsumerIterator implements
Iterator<QueueConsumer<?>>
{
private final Iterator<QueueConsumerNode> _underlying;
@@ -248,4 +263,120 @@ public class QueueConsumerManagerImpl im
_underlying.remove();
}
}
+
+ private void addToAll(final QueueConsumerNode consumerNode)
+ {
+ int consumerPriority = consumerNode.getQueueConsumer().getPriority();
+ int i;
+ for (i = 0; i < _allConsumers.size(); ++i)
+ {
+ final PriorityConsumerListPair priorityConsumerListPair =
_allConsumers.get(i);
+ if (priorityConsumerListPair._priority == consumerPriority)
+ {
+ final QueueConsumerNodeListEntry entry =
priorityConsumerListPair._consumers.add(consumerNode);
+ consumerNode.setAllEntry(entry);
+ return;
+ }
+ else if (priorityConsumerListPair._priority < consumerPriority)
+ {
+ break;
+ }
+ }
+
+ PriorityConsumerListPair newPriorityConsumerListPair = new
PriorityConsumerListPair(consumerPriority);
+ final QueueConsumerNodeListEntry entry =
newPriorityConsumerListPair._consumers.add(consumerNode);
+ consumerNode.setAllEntry(entry);
+ _allConsumers.add(i, newPriorityConsumerListPair);
+ _notified.add(i, new PriorityConsumerListPair(consumerPriority));
+ _interested.add(i, new PriorityConsumerListPair(consumerPriority));
+ }
+
+ private void removeFromAll(final QueueConsumer<?> consumer)
+ {
+ final QueueConsumerNode node = consumer.getQueueConsumerNode();
+ int consumerPriority = consumer.getPriority();
+ for (int i = 0; i < _allConsumers.size(); ++i)
+ {
+ final PriorityConsumerListPair priorityConsumerListPair =
_allConsumers.get(i);
+ if (priorityConsumerListPair._priority == consumerPriority)
+ {
+
priorityConsumerListPair._consumers.removeEntry(node.getAllEntry());
+ if (priorityConsumerListPair._consumers.isEmpty())
+ {
+ _allConsumers.remove(i);
+ _notified.remove(i);
+ _interested.remove(i);
+ }
+ return;
+ }
+ else if (priorityConsumerListPair._priority < consumerPriority)
+ {
+ break;
+ }
+ }
+ }
+
+
+ private class PriorityConsumerListPair
+ {
+ final int _priority;
+ final QueueConsumerNodeList _consumers;
+
+ private PriorityConsumerListPair(final int priority)
+ {
+ _priority = priority;
+ _consumers = new QueueConsumerNodeList(_queue);
+ }
+ }
+
+ private class PrioritisedQueueConsumerNodeIterator implements
Iterator<QueueConsumerNode>
+ {
+ final Iterator<PriorityConsumerListPair> _outerIterator;
+ Iterator<QueueConsumerNode> _innerIterator;
+
+ private
PrioritisedQueueConsumerNodeIterator(List<PriorityConsumerListPair> list)
+ {
+ _outerIterator = list.iterator();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ while (true)
+ {
+ if (_innerIterator != null && _innerIterator.hasNext())
+ {
+ return true;
+ }
+ else if (_outerIterator.hasNext())
+ {
+ final PriorityConsumerListPair priorityConsumersPair =
_outerIterator.next();
+ _innerIterator =
priorityConsumersPair._consumers.iterator();
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public QueueConsumerNode next()
+ {
+ if (hasNext())
+ {
+ return _innerIterator.next();
+ }
+ else
+ {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
}
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
(original)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
Mon Nov 14 15:36:58 2016
@@ -20,115 +20,68 @@
*/
package org.apache.qpid.server.queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Collection;
+import java.util.EnumSet;
final class QueueConsumerNode
{
- private final AtomicBoolean _deleted = new AtomicBoolean();
- private final AtomicReference<QueueConsumerNode> _next = new
AtomicReference<>();
-
+ private final QueueConsumerManagerImpl _queueConsumerManager;
private final QueueConsumer<?> _queueConsumer;
- private volatile boolean _removed;
private QueueConsumerNodeListEntry _listEntry;
+ private QueueConsumerManagerImpl.NodeState _state =
QueueConsumerManagerImpl.NodeState.REMOVED;
+ private QueueConsumerNodeListEntry _allEntry;
- QueueConsumerNode(final QueueConsumer<?> queueConsumer)
+ QueueConsumerNode(final QueueConsumerManagerImpl queueConsumerManager,
final QueueConsumer<?> queueConsumer)
{
+ _queueConsumerManager = queueConsumerManager;
_queueConsumer = queueConsumer;
}
- public QueueConsumerNode()
- {
- //used for sentinel head and dummy node construction
- _queueConsumer = null;
- _deleted.set(true);
- }
-
public QueueConsumer<?> getQueueConsumer()
{
return _queueConsumer;
}
- public boolean isRemoved()
+ public QueueConsumerNodeListEntry getListEntry()
{
- return _removed;
+ return _listEntry;
}
- public void setRemoved()
+ public boolean moveFromTo(QueueConsumerManagerImpl.NodeState fromState,
QueueConsumerManagerImpl.NodeState toState)
{
- _removed = true;
+ return moveFromTo(EnumSet.of(fromState), toState);
}
+ public QueueConsumerManagerImpl.NodeState getState()
+ {
+ return _state;
+ }
-
- /**
- * Retrieves the first non-deleted node following the current node.
- * Any deleted non-tail nodes encountered during the search are unlinked.
- *
- * @return the next non-deleted node, or null if none was found.
- */
- public QueueConsumerNode findNext()
+ public synchronized boolean
moveFromTo(Collection<QueueConsumerManagerImpl.NodeState> fromStates,
QueueConsumerManagerImpl.NodeState toState)
{
- QueueConsumerNode next = nextNode();
- while(next != null && next.isDeleted())
+ if (fromStates.contains(_state))
{
- final QueueConsumerNode newNext = next.nextNode();
- if(newNext != null)
- {
- //try to move our _next reference forward to the 'newNext'
- //node to unlink the deleted node
- _next.compareAndSet(next, newNext);
- next = nextNode();
- }
- else
+ if (_listEntry != null)
{
- //'newNext' is null, meaning 'next' is the current tail. Can't
unlink
- //the tail node for thread safety reasons, just use the null.
- next = null;
+ _listEntry.remove();
}
+ _state = toState;
+ _listEntry = _queueConsumerManager.addNodeToInterestList(this);
+ return true;
+ }
+ else
+ {
+ return false;
}
-
- return next;
- }
-
- /**
- * Gets the immediately next referenced node in the structure.
- *
- * @return the immediately next node in the structure, or null if at the
tail.
- */
- protected QueueConsumerNode nextNode()
- {
- return _next.get();
- }
-
- /**
- * Used to initialise the 'next' reference. Will only succeed if the
reference was not previously set.
- *
- * @param node the ConsumerNode to set as 'next'
- * @return whether the operation succeeded
- */
- boolean setNext(final QueueConsumerNode node)
- {
- return _next.compareAndSet(null, node);
- }
-
- public boolean isDeleted()
- {
- return _deleted.get();
- }
-
- public boolean delete()
- {
- return _deleted.compareAndSet(false,true);
}
- public void setListEntry(final QueueConsumerNodeListEntry listEntry)
+ public QueueConsumerNodeListEntry getAllEntry()
{
- _listEntry = listEntry;
+ return _allEntry;
}
- public QueueConsumerNodeListEntry getListEntry()
+ public void setAllEntry(final QueueConsumerNodeListEntry allEntry)
{
- return _listEntry;
+ _allEntry = allEntry;
}
}
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
(original)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
Mon Nov 14 15:36:58 2016
@@ -25,14 +25,12 @@ import java.util.NoSuchElementException;
public class QueueConsumerNodeIterator implements Iterator<QueueConsumerNode>
{
- private final QueueConsumerNodeList _queueConsumerNodeList;
private QueueConsumerNodeListEntry _previous;
private QueueConsumerNodeListEntry _next;
QueueConsumerNodeIterator(QueueConsumerNodeList list)
{
_previous = list.getHead();
- _queueConsumerNodeList = list;
}
@Override
@@ -61,10 +59,7 @@ public class QueueConsumerNodeIterator i
@Override
public void remove()
{
- if(_previous.isDeleted())
- {
- throw new IllegalStateException();
- }
- _queueConsumerNodeList.removeEntry(_previous);
+ // code should use QueueConsumerNodeListEntry#remove instead
+ throw new UnsupportedOperationException();
}
}
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
(original)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
Mon Nov 14 15:36:58 2016
@@ -20,15 +20,28 @@
*/
package org.apache.qpid.server.queue;
+import static org.apache.qpid.server.model.Queue.QUEUE_SCAVANGE_COUNT;
+
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.qpid.server.model.Queue;
+
class QueueConsumerNodeList
{
- private final QueueConsumerNodeListEntry _head = new
QueueConsumerNodeListEntry();
+ private final QueueConsumerNodeListEntry _head;
- private final AtomicReference<QueueConsumerNodeListEntry> _tail = new
AtomicReference<>(_head);
+ private final AtomicReference<QueueConsumerNodeListEntry> _tail;
private final AtomicInteger _size = new AtomicInteger();
+ private final AtomicInteger _scavengeCount = new AtomicInteger();
+ private final int _scavengeCountThreshold;
+
+ QueueConsumerNodeList(final Queue<?> queue)
+ {
+ _head = new QueueConsumerNodeListEntry(this);
+ _tail = new AtomicReference<>(_head);
+ _scavengeCountThreshold = queue.getContextValue(Integer.class,
QUEUE_SCAVANGE_COUNT);
+ }
private void insert(final QueueConsumerNodeListEntry node, final boolean
count)
{
@@ -58,22 +71,22 @@ class QueueConsumerNodeList
}
}
- public void add(final QueueConsumerNode node)
+ public QueueConsumerNodeListEntry add(final QueueConsumerNode node)
{
- QueueConsumerNodeListEntry entry = new
QueueConsumerNodeListEntry(node);
+ QueueConsumerNodeListEntry entry = new
QueueConsumerNodeListEntry(this, node);
insert(entry, true);
- }
-
- public boolean remove(final QueueConsumerNode consumerNode)
- {
- return removeEntry(consumerNode.getListEntry());
+ return entry;
}
boolean removeEntry(final QueueConsumerNodeListEntry entry)
{
- if (entry.delete())
+ if (entry.setDeleted())
{
_size.decrementAndGet();
+ if (_scavengeCount.incrementAndGet() > _scavengeCountThreshold)
+ {
+ scavenge();
+ }
return true;
}
else
@@ -82,6 +95,16 @@ class QueueConsumerNodeList
}
}
+ private void scavenge()
+ {
+ _scavengeCount.set(0);
+ QueueConsumerNodeListEntry node = _head;
+ while (node != null)
+ {
+ node = node.findNext();
+ }
+ }
+
public QueueConsumerNodeIterator iterator()
{
return new QueueConsumerNodeIterator(this);
@@ -96,7 +119,9 @@ class QueueConsumerNodeList
{
return _size.get();
}
-}
-
-
+ public boolean isEmpty()
+ {
+ return _size.get() == 0;
+ }
+}
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
(original)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
Mon Nov 14 15:36:58 2016
@@ -29,16 +29,17 @@ final class QueueConsumerNodeListEntry
private final AtomicReference<QueueConsumerNodeListEntry> _next = new
AtomicReference<>();
private final QueueConsumerNode _queueConsumerNode;
- private volatile boolean _removed;
+ private final QueueConsumerNodeList _list;
- QueueConsumerNodeListEntry(final QueueConsumerNode queueConsumerNode)
+ QueueConsumerNodeListEntry(final QueueConsumerNodeList list, final
QueueConsumerNode queueConsumerNode)
{
+ _list = list;
_queueConsumerNode = queueConsumerNode;
- queueConsumerNode.setListEntry(this);
}
- public QueueConsumerNodeListEntry()
+ public QueueConsumerNodeListEntry(QueueConsumerNodeList list)
{
+ _list = list;
//used for sentinel head and dummy node construction
_queueConsumerNode = null;
_deleted.set(true);
@@ -49,17 +50,6 @@ final class QueueConsumerNodeListEntry
return _queueConsumerNode;
}
- public boolean isRemoved()
- {
- return _removed;
- }
-
- public void setRemoved()
- {
- _removed = true;
- }
-
-
/**
* Retrieves the first non-deleted node following the current node.
@@ -112,14 +102,19 @@ final class QueueConsumerNodeListEntry
return _next.compareAndSet(null, node);
}
+ public void remove()
+ {
+ _list.removeEntry(this);
+ }
+
public boolean isDeleted()
{
return _deleted.get();
}
- public boolean delete()
+ boolean setDeleted()
{
- return _deleted.compareAndSet(false,true);
+ return _deleted.compareAndSet(false, true);
}
}
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1769654&r1=1769653&r2=1769654&view=diff
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
(original)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
Mon Nov 14 15:36:58 2016
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.apache.qpid.server.model.Queue.QUEUE_SCAVANGE_COUNT;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -48,15 +49,11 @@ public class StandardQueueEntryListTest
private StandardQueueImpl _testQueue;
private StandardQueueEntryList _sqel;
- private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count";
- private String oldScavengeValue = null;
private ConfiguredObjectFactoryImpl _factory;
@Override
protected void setUp()
{
- oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9");
-
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
@@ -87,19 +84,6 @@ public class StandardQueueEntryListTest
}
@Override
- protected void tearDown()
- {
- if(oldScavengeValue != null)
- {
- System.setProperty(SCAVENGE_PROP, oldScavengeValue);
- }
- else
- {
- System.clearProperty(SCAVENGE_PROP);
- }
- }
-
- @Override
public StandardQueueEntryList getTestList()
{
return getTestList(false);
@@ -160,7 +144,9 @@ public class StandardQueueEntryListTest
public void testScavenge() throws Exception
{
- OrderedQueueEntryList sqel = new
StandardQueueEntryList(mock(StandardQueueImpl.class));
+ StandardQueueImpl mockQueue = mock(StandardQueueImpl.class);
+ when(mockQueue.getContextValue(Integer.class,
QUEUE_SCAVANGE_COUNT)).thenReturn(9);
+ OrderedQueueEntryList sqel = new StandardQueueEntryList(mockQueue);
ConcurrentMap<Integer,QueueEntry> entriesMap = new
ConcurrentHashMap<Integer,QueueEntry>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]