Author: rgodfrey
Date: Mon Nov 14 09:59:26 2016
New Revision: 1769585
URL: http://svn.apache.org/viewvc?rev=1769585&view=rev
Log:
Replace QueueConsumerList with QueueConsumerManager
Added:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
(with props)
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
(with props)
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
(with props)
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
- copied, changed from r1769267,
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/ConsumerNodeIterator.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
- copied, changed from r1769267,
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
(with props)
Removed:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/ConsumerNode.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/ConsumerNodeIterator.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769585&r1=1769584&r2=1769585&view=diff
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Mon Nov 14 09:59:26 2016
@@ -37,6 +37,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -57,6 +58,7 @@ import java.util.zip.GZIPOutputStream;
import javax.security.auth.Subject;
+import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -113,7 +115,6 @@ import org.apache.qpid.server.util.Conne
import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.HouseKeepingTask;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
@@ -146,18 +147,13 @@ public abstract class AbstractQueue<X ex
private final QueueManagingVirtualHost<?> _virtualHost;
private final DeletedChildListener _deletedChildListener = new
DeletedChildListener();
- private final AccessControlContext _immediateDeliveryContext;
+ private final QueueConsumerManagerImpl _queueConsumerManager;
@ManagedAttributeField( beforeSet = "preSetAlternateExchange", afterSet =
"postSetAlternateExchange")
private Exchange _alternateExchange;
-
- private final QueueConsumerList _consumerList = new QueueConsumerList();
-
private volatile QueueConsumer<?> _exclusiveSubscriber;
-
-
private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
@@ -287,6 +283,8 @@ public abstract class AbstractQueue<X ex
void setNotifyWorkDesired(final QueueConsumer consumer, final boolean
desired)
{
+ _queueConsumerManager.setInterest(consumer, desired);
+
if (desired)
{
_activeSubscriberCount.incrementAndGet();
@@ -295,15 +293,31 @@ public abstract class AbstractQueue<X ex
{
_activeSubscriberCount.decrementAndGet();
- final ConsumerNodeIterator consumerNodeIterator =
_consumerList.iterator();
- while (consumerNodeIterator.advance())
- {
- final QueueConsumer s =
consumerNodeIterator.getNode().getConsumer();
- if (s != null && s.getPriority() < consumer.getPriority())
+ // iterate over interested and notify one as long as its priority
is higher than any notified
+ final Iterator<QueueConsumer<?>> consumerIterator =
_queueConsumerManager.getInterestedIterator();
+ while(consumerIterator.hasNext())
+ {
+ QueueConsumer<?> queueConsumer = consumerIterator.next();
+ //TODO - break here if the consumer has lower priority than
the highest notified (presuming iterator is priority ordered)
+ if(notifyConsumer(queueConsumer))
{
- s.notifyWork();
+ break;
}
}
+
+ }
+ }
+
+ private boolean notifyConsumer(final QueueConsumer<?> consumer)
+ {
+ if(_queueConsumerManager.setNotified(consumer, true))
+ {
+ consumer.notifyWork();
+ return true;
+ }
+ else
+ {
+ return false;
}
}
@@ -316,10 +330,8 @@ public abstract class AbstractQueue<X ex
{
super(parentsMap(virtualHost), attributes);
-
_virtualHost = virtualHost;
- _immediateDeliveryContext = getSystemTaskControllerContext("Immediate
Delivery", virtualHost.getPrincipal());
-
+ _queueConsumerManager = new QueueConsumerManagerImpl(this);
}
@Override
@@ -963,7 +975,7 @@ public abstract class AbstractQueue<X ex
}
consumer.setQueueContext(queueContext);
- _consumerList.add(consumer);
+ _queueConsumerManager.addConsumer(consumer);
childAdded(consumer);
consumer.addChangeListener(_deletedChildListener);
@@ -987,7 +999,7 @@ public abstract class AbstractQueue<X ex
throw new NullPointerException("consumer argument is null");
}
- boolean removed = _consumerList.remove(consumer);
+ boolean removed = _queueConsumerManager.removeConsumer(consumer);
if (removed)
{
@@ -1041,16 +1053,10 @@ public abstract class AbstractQueue<X ex
@Override
public Collection<QueueConsumer<?>> getConsumers()
{
- List<QueueConsumer<?>> consumers = new ArrayList<QueueConsumer<?>>();
- ConsumerNodeIterator iter = _consumerList.iterator();
- while(iter.advance())
- {
- consumers.add(iter.getNode().getConsumer());
- }
- return consumers;
-
+ return Lists.newArrayList(_queueConsumerManager.getAllIterator());
}
+
public void resetSubPointersForGroups(QueueConsumer<?> consumer)
{
QueueEntry entry =
_messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
@@ -1062,22 +1068,6 @@ public abstract class AbstractQueue<X ex
}
}
- @Override
- public void resetSubPointersForGroups(final QueueEntry entry)
- {
- ConsumerNodeIterator subscriberIter = _consumerList.iterator();
- // iterate over all the subscribers, and if they are in advance of
this queue entry then move them backwards
- while (subscriberIter.advance())
- {
- QueueConsumer<?> sub = subscriberIter.getNode().getConsumer();
-
- // we don't make browsers send the same stuff twice
- if (sub.seesRequeues())
- {
- updateSubRequeueEntry(sub, entry);
- }
- }
- }
public void addBinding(final Binding<?> binding)
{
@@ -1209,7 +1199,7 @@ public abstract class AbstractQueue<X ex
if (entry.isAvailable())
{
checkConsumersNotAheadOfDelivery(entry);
- notifyAllConsumers();
+ notifyConsumers(entry);
}
checkForNotificationOnNewMessage(entry.getMessage());
@@ -1337,7 +1327,6 @@ public abstract class AbstractQueue<X ex
private void updateSubRequeueEntry(final QueueConsumer<?> sub, final
QueueEntry entry)
{
-
QueueContext subContext = sub.getQueueContext();
if(subContext != null)
{
@@ -1347,20 +1336,33 @@ public abstract class AbstractQueue<X ex
{
if(QueueContext._releasedUpdater.compareAndSet(subContext,
oldEntry, entry))
{
- sub.notifyWork();
+ notifyConsumer(sub);
break;
}
}
}
}
+
+ @Override
+ public void resetSubPointersForGroups(final QueueEntry entry)
+ {
+ resetSubPointers(entry, true);
+ }
+
+ @Override
public void requeue(QueueEntry entry)
{
- ConsumerNodeIterator subscriberIter = _consumerList.iterator();
+ resetSubPointers(entry, false);
+ }
+
+ private void resetSubPointers(final QueueEntry entry, final boolean
ignoreAvailable)
+ {
+ Iterator<QueueConsumer<?>> consumerIterator =
_queueConsumerManager.getAllIterator();
// iterate over all the subscribers, and if they are in advance of
this queue entry then move them backwards
- while (subscriberIter.advance() && entry.isAvailable())
+ while (consumerIterator.hasNext() && (ignoreAvailable ||
entry.isAvailable()))
{
- QueueConsumer<?> sub = subscriberIter.getNode().getConsumer();
+ QueueConsumer<?> sub = consumerIterator.next();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -1370,6 +1372,7 @@ public abstract class AbstractQueue<X ex
}
}
+
@Override
public void dequeue(QueueEntry entry)
{
@@ -1400,7 +1403,7 @@ public abstract class AbstractQueue<X ex
public int getConsumerCount()
{
- return _consumerList.size();
+ return _queueConsumerManager.getAllSize();
}
public int getConsumerCountWithCredit()
@@ -1527,9 +1530,9 @@ public abstract class AbstractQueue<X ex
/** Used to track bindings to exchanges so that on deletion they can
easily be cancelled. */
abstract QueueEntryList getEntries();
- protected QueueConsumerList getConsumerList()
+ protected final QueueConsumerManagerImpl getQueueConsumerManager()
{
- return _consumerList;
+ return _queueConsumerManager;
}
public EventLogger getEventLogger()
@@ -1771,14 +1774,16 @@ public abstract class AbstractQueue<X ex
{
try
{
- final ConsumerNodeIterator consumerNodeIterator =
_consumerList.iterator();
- while (consumerNodeIterator.advance())
+ Iterator<QueueConsumer<?>> consumerIterator =
_queueConsumerManager.getAllIterator();
+
+ while (consumerIterator.hasNext())
{
- final QueueConsumer s =
consumerNodeIterator.getNode().getConsumer();
- if (s != null)
+ QueueConsumer<?> consumer =
consumerIterator.next();
+
+ if (consumer != null)
{
- s.queueDeleted();
+ consumer.queueDeleted();
}
}
@@ -1906,31 +1911,66 @@ public abstract class AbstractQueue<X ex
}
}
- void notifyAllConsumers()
+ void notifyConsumers(QueueEntry entry)
{
- ConsumerNode consumerNode = _consumerList.getHead().findNext();
- while (consumerNode != null)
+
+ Iterator<QueueConsumer<?>> nonAcquiringIterator =
_queueConsumerManager.getNonAcquiringIterator();
+ while (nonAcquiringIterator.hasNext())
+ {
+ QueueConsumer<?> consumer = nonAcquiringIterator.next();
+ if(consumer.hasInterest(entry))
+ {
+ notifyConsumer(consumer);
+ }
+ }
+
+ // need to take account of priority and potentially and existing
notified
+ // we don't want to notify lower priority consumers if there exists a
consumer in the notified set
+ // which can take the message (implies iterating such that you look at
for each priority look at interested then at notified)
+ final Iterator<QueueConsumer<?>> interestedIterator =
_queueConsumerManager.getInterestedIterator();
+ while (interestedIterator.hasNext())
{
- QueueConsumer<?> consumer = consumerNode.getConsumer();
- if (consumer.isActive() && getNextAvailableEntry(consumer) != null)
+ QueueConsumer<?> consumer = interestedIterator.next();
+ if(consumer.hasInterest(entry) && notifyConsumer(consumer))
{
- consumer.notifyWork();
+ break;
}
- consumerNode = consumerNode.findNext();
}
}
- MessageContainer deliverSingleMessage(QueueConsumer<?> sub)
+ void notifyConsumers()
+ {
+ final Iterator<QueueConsumer<?>> interestedIterator =
_queueConsumerManager.getInterestedIterator();
+ while (interestedIterator.hasNext())
+ {
+ QueueConsumer<?> consumer = interestedIterator.next();
+
+ if(notifyConsumer(consumer))
+ {
+ break;
+ }
+ }
+ }
+
+
+ MessageContainer deliverSingleMessage(QueueConsumer<?> consumer)
{
boolean queueEmpty = false;
MessageContainer messageContainer = null;
+ _queueConsumerManager.setNotified(consumer, false);
+
try
{
- if (!sub.isSuspended())
+ if (!consumer.isSuspended())
{
- messageContainer = attemptDelivery(sub);
- if (messageContainer == null && getNextAvailableEntry(sub) ==
null)
+ messageContainer = attemptDelivery(consumer);
+ if(messageContainer != null)
+ {
+ _queueConsumerManager.setNotified(consumer, true);
+ }
+
+ if (messageContainer == null &&
getNextAvailableEntry(consumer) == null)
{
queueEmpty = true;
}
@@ -1938,20 +1978,27 @@ public abstract class AbstractQueue<X ex
else
{
// avoid referring old deleted queue entry in
sub._queueContext._lastSeen
- getNextAvailableEntry(sub);
+ getNextAvailableEntry(consumer);
}
}
finally
{
if(queueEmpty)
{
- sub.queueEmpty();
+ consumer.queueEmpty();
}
- sub.flushBatched();
+ consumer.flushBatched();
}
-
+ if(messageContainer == null && consumer.acquires())
+ {
+ // TODO - Should be only checking for available messages
+ if(!isEmpty())
+ {
+ notifyConsumers();
+ }
+ }
return messageContainer;
}
@@ -1984,7 +2031,7 @@ public abstract class AbstractQueue<X ex
QueueEntry node = getNextAvailableEntry(sub);
boolean subActive = sub.isActive() && !sub.isSuspended();
- if (subActive && (sub.getPriority() == Integer.MAX_VALUE ||
noHigherPriorityWithCredit(sub)))
+ if (subActive && (sub.getPriority() == Integer.MAX_VALUE ||
noHigherPriorityWithCredit(sub, node)))
{
if (_virtualHost.getState() != State.ACTIVE)
@@ -2034,16 +2081,18 @@ public abstract class AbstractQueue<X ex
return messageContainer;
}
- private boolean noHigherPriorityWithCredit(final QueueConsumer<?> sub)
+ boolean noHigherPriorityWithCredit(final QueueConsumer<?> sub, final
QueueEntry queueEntry)
{
- ConsumerNodeIterator iterator = _consumerList.iterator();
- while(iterator.advance())
+ // TODO - should iterate over list of interested and notified
+ // *** TODO HERE MONDAY *** - can we simply interleave bewteen
iterators over the two distinct lists of interested and notified
+ Iterator<QueueConsumer<?>> consumerIterator =
_queueConsumerManager.getAllIterator();
+
+ while (consumerIterator.hasNext())
{
- final ConsumerNode node = iterator.getNode();
- final QueueConsumer consumer = node.getConsumer();
+ QueueConsumer<?> consumer = consumerIterator.next();
if(consumer.getPriority() > sub.getPriority())
{
- if(getNextAvailableEntry(consumer) != null &&
consumer.isNotifyWorkDesired())
+ if(consumer.isNotifyWorkDesired() &&
consumer.hasInterest(queueEntry) && getNextAvailableEntry(consumer) != null)
{
return false;
}
@@ -2053,7 +2102,7 @@ public abstract class AbstractQueue<X ex
}
- private QueueEntry getNextAvailableEntry(final QueueConsumer sub)
+ QueueEntry getNextAvailableEntry(final QueueConsumer sub)
{
QueueContext context = sub.getQueueContext();
if(context != null)
@@ -2732,7 +2781,12 @@ public abstract class AbstractQueue<X ex
switch (getConsumerCount())
{
case 1:
- _exclusiveSubscriber =
getConsumerList().getHead().getConsumer();
+ Iterator<QueueConsumer<?>> consumerIterator =
_queueConsumerManager.getAllIterator();
+
+ if (consumerIterator.hasNext())
+ {
+ _exclusiveSubscriber = consumerIterator.next();
+ }
// deliberate fall through
case 0:
_exclusiveOwner = null;
@@ -2753,8 +2807,11 @@ public abstract class AbstractQueue<X ex
case CONTAINER:
case CONNECTION:
AMQSessionModel session = null;
- for(ConsumerImpl c : getConsumers())
+ Iterator<QueueConsumer<?>> queueConsumerIterator =
_queueConsumerManager.getAllIterator();
+ while(queueConsumerIterator.hasNext())
{
+ QueueConsumer<?> c = queueConsumerIterator.next();
+
if(session == null)
{
session = c.getSessionModel();
@@ -2779,8 +2836,10 @@ public abstract class AbstractQueue<X ex
case CONTAINER:
case PRINCIPAL:
AMQPConnection con = null;
- for(ConsumerImpl c : getConsumers())
+ Iterator<QueueConsumer<?>> queueConsumerIterator =
_queueConsumerManager.getAllIterator();
+ while(queueConsumerIterator.hasNext())
{
+ QueueConsumer<?> c = queueConsumerIterator.next();
if(con == null)
{
con = c.getSessionModel().getAMQPConnection();
@@ -2807,8 +2866,10 @@ public abstract class AbstractQueue<X ex
case NONE:
case PRINCIPAL:
String containerID = null;
- for(ConsumerImpl c : getConsumers())
+ Iterator<QueueConsumer<?>> queueConsumerIterator =
_queueConsumerManager.getAllIterator();
+ while(queueConsumerIterator.hasNext())
{
+ QueueConsumer<?> c = queueConsumerIterator.next();
if(containerID == null)
{
containerID =
c.getSessionModel().getAMQPConnection().getRemoteContainerName();
@@ -2838,8 +2899,10 @@ public abstract class AbstractQueue<X ex
case NONE:
case CONTAINER:
Principal principal = null;
- for(ConsumerImpl c : getConsumers())
+ Iterator<QueueConsumer<?>> queueConsumerIterator =
_queueConsumerManager.getAllIterator();
+ while(queueConsumerIterator.hasNext())
{
+ QueueConsumer<?> c = queueConsumerIterator.next();
if(principal == null)
{
principal =
c.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
@@ -2977,7 +3040,7 @@ public abstract class AbstractQueue<X ex
}
else if(clazz == org.apache.qpid.server.model.Consumer.class)
{
- return (Collection<C>) getConsumers();
+ return (Collection<C>)
Lists.newArrayList(_queueConsumerManager.getAllIterator());
}
else return Collections.emptySet();
}
@@ -3429,24 +3492,19 @@ public abstract class AbstractQueue<X ex
@Override
public void execute()
{
-
// if there's (potentially) more than one consumer the others will
potentially not have been advanced to the
// next entry they are interested in yet. This would lead to
holding on to references to expired messages, etc
// which would give us memory "leak".
- ConsumerNodeIterator consumerNodeIterator =
_consumerList.iterator();
- while (consumerNodeIterator.advance() && !isDeleted())
+ Iterator<QueueConsumer<?>> consumerIterator =
_queueConsumerManager.getAllIterator();
+
+ while (consumerIterator.hasNext() && !isDeleted())
{
- ConsumerNode subNode = consumerNodeIterator.getNode();
- QueueConsumer sub = subNode.getConsumer();
+ QueueConsumer<?> sub = consumerIterator.next();
if(sub.acquires())
{
getNextAvailableEntry(sub);
}
- else
- {
- // TODO
- }
}
}
}
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1769585&r1=1769584&r2=1769585&view=diff
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
(original)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
Mon Nov 14 09:59:26 2016
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import java.util.Iterator;
import java.util.Map;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@@ -36,10 +37,12 @@ public abstract class OutOfOrderQueue<X
protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
{
// check that all consumers are not in advance of the entry
- ConsumerNodeIterator subIter = getConsumerList().iterator();
- while(subIter.advance() && !entry.isAcquired())
+ Iterator<QueueConsumer<?>> consumerIterator =
getQueueConsumerManager().getAllIterator();
+
+ while (consumerIterator.hasNext() && !entry.isAcquired())
{
- final QueueConsumer<?> consumer = subIter.getNode().getConsumer();
+ QueueConsumer<?> consumer = consumerIterator.next();
+
if(!consumer.isClosed())
{
QueueContext context = consumer.getQueueContext();
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1769585&r1=1769584&r2=1769585&view=diff
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
(original)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
Mon Nov 14 09:59:26 2016
@@ -38,6 +38,8 @@ public interface QueueConsumer<X extends
void acquisitionRemoved(QueueEntry node);
+ QueueConsumerNode getQueueConsumerNode();
+
void queueDeleted();
Queue<?> getQueue();
@@ -51,4 +53,6 @@ public interface QueueConsumer<X extends
boolean isNotifyWorkDesired();
void notifyWork();
+
+ void setQueueConsumerNode(QueueConsumerNode node);
}
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1769585&r1=1769584&r2=1769585&view=diff
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
(original)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
Mon Nov 14 09:59:26 2016
@@ -97,6 +97,8 @@ class QueueConsumerImpl
@ManagedAttributeField
private int _priority;
+ private QueueConsumerNode _queueConsumerNode;
+
QueueConsumerImpl(final AbstractQueue<?> queue,
ConsumerTarget target,
final String consumerName,
@@ -257,6 +259,18 @@ class QueueConsumerImpl
_target.notifyWork();
}
+ @Override
+ public void setQueueConsumerNode(final QueueConsumerNode node)
+ {
+ _queueConsumerNode = node;
+ }
+
+ @Override
+ public QueueConsumerNode getQueueConsumerNode()
+ {
+ return _queueConsumerNode;
+ }
+
public void queueDeleted()
{
_target.consumerRemoved(this);
Added:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java?rev=1769585&view=auto
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
(added)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
Mon Nov 14 09:59:26 2016
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Iterator;
+
+public interface QueueConsumerManager
+{
+ void addConsumer(QueueConsumer<?> consumer);
+ boolean removeConsumer(QueueConsumer<?> consumer);
+ /*public*/ void setInterest(QueueConsumer<?> consumer, boolean
interested); // called from Consumer
+ /*private*/ boolean setNotified(QueueConsumer<?> consumer, boolean
notified); // called from Queue
+
+ // should be priority and then insertion order
+ Iterator<QueueConsumer<?>> getInterestedIterator();
+
+ Iterator<QueueConsumer<?>> getAllIterator();
+ Iterator<QueueConsumer<?>> getNonAcquiringIterator();
+
+ Iterator<QueueConsumer<?>> getPrioritySortedNotifiedOrInterestedIterator();
+
+ int getAllSize();
+ // int getInterestedSize();
+ int getNotifiedAcquiringSize();
+
+
+}
Propchange:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1769585&view=auto
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
(added)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
Mon Nov 14 09:59:26 2016
@@ -0,0 +1,251 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+public class QueueConsumerManagerImpl implements QueueConsumerManager
+{
+ private final AbstractQueue<?> _queue;
+
+ private final ConcurrentLinkedDeque<QueueConsumerNode> _interested = new
ConcurrentLinkedDeque<QueueConsumerNode>();
+ private final ConcurrentLinkedDeque<QueueConsumerNode> _notInterested =
new ConcurrentLinkedDeque<QueueConsumerNode>();
+ private final ConcurrentLinkedDeque<QueueConsumerNode> _notified = new
ConcurrentLinkedDeque<QueueConsumerNode>();
+ private final ConcurrentLinkedDeque<QueueConsumerNode> _nonAcquiring = new
ConcurrentLinkedDeque<QueueConsumerNode>();
+
+ private final Set<QueueConsumer<?>> _allConsumers =
Collections.newSetFromMap(new ConcurrentHashMap<QueueConsumer<?>, Boolean>());
+
+ private volatile int _count;
+
+ public QueueConsumerManagerImpl(final AbstractQueue<?> queue)
+ {
+ _queue = queue;
+ }
+
+ // Always in the config thread
+ @Override
+ public void addConsumer(final QueueConsumer<?> consumer)
+ {
+ _allConsumers.add(consumer);
+ QueueConsumerNode node = new QueueConsumerNode(consumer);
+ consumer.setQueueConsumerNode(node);
+ if(consumer.isNotifyWorkDesired())
+ {
+ if (consumer.acquires())
+ {
+ _interested.add(node);
+ }
+ else
+ {
+ _nonAcquiring.add(node);
+ }
+ }
+ else
+ {
+ _notInterested.add(node);
+ }
+ _count++;
+ }
+
+ // Always in the config thread
+ @Override
+ public boolean removeConsumer(final QueueConsumer<?> consumer)
+ {
+ _allConsumers.remove(consumer);
+ QueueConsumerNode node = consumer.getQueueConsumerNode();
+ while(!node.isRemoved())
+ {
+ boolean removed = _notInterested.remove(node);
+ if(!removed)
+ {
+ if(consumer.acquires())
+ {
+ removed = _interested.remove(node);
+ removed = removed || _notified.remove(node);
+ }
+ else
+ {
+ removed = _nonAcquiring.remove(node);
+ }
+ }
+
+ if(removed)
+ {
+ node.setRemoved();
+ _count--;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // Set by the consumer always in the IO thread
+ @Override
+ public void setInterest(final QueueConsumer consumer, final boolean
interested)
+ {
+ QueueConsumerNode node = consumer.getQueueConsumerNode();
+ if(interested)
+ {
+ if(_notInterested.remove(node))
+ {
+ if(consumer.acquires())
+ {
+ _interested.add(node);
+ }
+ else
+ {
+ _nonAcquiring.add(node);
+ }
+ }
+ }
+ else
+ {
+ if(consumer.acquires())
+ {
+ while(!node.isRemoved())
+ {
+ if(_interested.remove(node) || _notified.remove(node))
+ {
+ _notInterested.add(node);
+ break;
+ }
+ }
+ }
+ else
+ {
+ if(_nonAcquiring.remove(node))
+ {
+ _notInterested.add(node);
+ }
+ }
+ }
+ }
+
+ // Set by the Queue any IO thread
+ @Override
+ public boolean setNotified(final QueueConsumer consumer, final boolean
notified)
+ {
+ QueueConsumerNode node = consumer.getQueueConsumerNode();
+ if(consumer.acquires())
+ {
+ if(notified)
+ {
+ // TODO - Fix responsibility
+ QueueEntry queueEntry;
+ if((queueEntry = _queue.getNextAvailableEntry(consumer)) !=
null
+ && _queue.noHigherPriorityWithCredit(consumer, queueEntry)
+ && _interested.remove(node))
+ {
+ _notified.add(node);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ else
+ {
+ if(_notified.remove(node))
+ {
+ _interested.add(node);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ @Override
+ public Iterator<QueueConsumer<?>> getInterestedIterator()
+ {
+ return new QueueConsumerIterator(_interested.iterator());
+ }
+
+ @Override
+ public Iterator<QueueConsumer<?>> getAllIterator()
+ {
+ return _allConsumers.iterator();
+ }
+
+ @Override
+ public Iterator<QueueConsumer<?>> getNonAcquiringIterator()
+ {
+ return new QueueConsumerIterator(_nonAcquiring.iterator());
+ }
+
+ @Override
+ public Iterator<QueueConsumer<?>>
getPrioritySortedNotifiedOrInterestedIterator()
+ {
+ return null;
+ }
+
+ @Override
+ public int getAllSize()
+ {
+ return _count;
+ }
+
+ @Override
+ public int getNotifiedAcquiringSize()
+ {
+ return _notified.size();
+ }
+
+ private static class QueueConsumerIterator implements
Iterator<QueueConsumer<?>>
+ {
+ private final Iterator<QueueConsumerNode> _underlying;
+
+ private QueueConsumerIterator(final Iterator<QueueConsumerNode>
underlying)
+ {
+ _underlying = underlying;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return _underlying.hasNext();
+ }
+
+ @Override
+ public QueueConsumer<?> next()
+ {
+ return _underlying.next().getQueueConsumer();
+ }
+
+ @Override
+ public void remove()
+ {
+ _underlying.remove();
+ }
+ }
+}
Propchange:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java?rev=1769585&view=auto
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
(added)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
Mon Nov 14 09:59:26 2016
@@ -0,0 +1,134 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+final class QueueConsumerNode
+{
+ private final AtomicBoolean _deleted = new AtomicBoolean();
+ private final AtomicReference<QueueConsumerNode> _next = new
AtomicReference<>();
+
+ private final QueueConsumer<?> _queueConsumer;
+ private volatile boolean _removed;
+ private QueueConsumerNodeListEntry _listEntry;
+
+ QueueConsumerNode(final QueueConsumer<?> queueConsumer)
+ {
+ _queueConsumer = queueConsumer;
+ }
+
+ public QueueConsumerNode()
+ {
+ //used for sentinel head and dummy node construction
+ _queueConsumer = null;
+ _deleted.set(true);
+ }
+
+ public QueueConsumer<?> getQueueConsumer()
+ {
+ return _queueConsumer;
+ }
+
+ public boolean isRemoved()
+ {
+ return _removed;
+ }
+
+ public void setRemoved()
+ {
+ _removed = true;
+ }
+
+
+
+ /**
+ * Retrieves the first non-deleted node following the current node.
+ * Any deleted non-tail nodes encountered during the search are unlinked.
+ *
+ * @return the next non-deleted node, or null if none was found.
+ */
+ public QueueConsumerNode findNext()
+ {
+ QueueConsumerNode next = nextNode();
+ while(next != null && next.isDeleted())
+ {
+ final QueueConsumerNode newNext = next.nextNode();
+ if(newNext != null)
+ {
+ //try to move our _next reference forward to the 'newNext'
+ //node to unlink the deleted node
+ _next.compareAndSet(next, newNext);
+ next = nextNode();
+ }
+ else
+ {
+ //'newNext' is null, meaning 'next' is the current tail. Can't
unlink
+ //the tail node for thread safety reasons, just use the null.
+ next = null;
+ }
+ }
+
+ return next;
+ }
+
+ /**
+ * Gets the immediately next referenced node in the structure.
+ *
+ * @return the immediately next node in the structure, or null if at the
tail.
+ */
+ protected QueueConsumerNode nextNode()
+ {
+ return _next.get();
+ }
+
+ /**
+ * Used to initialise the 'next' reference. Will only succeed if the
reference was not previously set.
+ *
+ * @param node the ConsumerNode to set as 'next'
+ * @return whether the operation succeeded
+ */
+ boolean setNext(final QueueConsumerNode node)
+ {
+ return _next.compareAndSet(null, node);
+ }
+
+ public boolean isDeleted()
+ {
+ return _deleted.get();
+ }
+
+ public boolean delete()
+ {
+ return _deleted.compareAndSet(false,true);
+ }
+
+ public void setListEntry(final QueueConsumerNodeListEntry listEntry)
+ {
+ _listEntry = listEntry;
+ }
+
+ public QueueConsumerNodeListEntry getListEntry()
+ {
+ return _listEntry;
+ }
+}
Propchange:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
(from r1769267,
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/ConsumerNodeIterator.java)
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java?p2=qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java&p1=qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/ConsumerNodeIterator.java&r1=1769267&r2=1769585&rev=1769585&view=diff
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/ConsumerNodeIterator.java
(original)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeIterator.java
Mon Nov 14 09:59:26 2016
@@ -20,24 +20,51 @@
*/
package org.apache.qpid.server.queue;
-public class ConsumerNodeIterator
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class QueueConsumerNodeIterator implements Iterator<QueueConsumerNode>
{
- private ConsumerNode _lastNode;
+ private final QueueConsumerNodeList _queueConsumerNodeList;
+ private QueueConsumerNodeListEntry _previous;
+ private QueueConsumerNodeListEntry _next;
- ConsumerNodeIterator(ConsumerNode startNode)
+ QueueConsumerNodeIterator(QueueConsumerNodeList list)
{
- _lastNode = startNode;
+ _previous = list.getHead();
+ _queueConsumerNodeList = list;
}
- public ConsumerNode getNode()
+ @Override
+ public boolean hasNext()
{
- return _lastNode;
+ _next = _previous.findNext();
+ return _next != null;
}
- public boolean advance()
+ @Override
+ public QueueConsumerNode next()
{
- _lastNode = _lastNode.findNext();
+ if(_next == null)
+ {
+ _next = _previous.findNext();
+ }
+ if(_next == null)
+ {
+ throw new NoSuchElementException();
+ }
+ _previous = _next;
+ _next = null;
+ return _previous.getQueueConsumerNode();
+ }
- return _lastNode != null;
+ @Override
+ public void remove()
+ {
+ if(_previous.isDeleted())
+ {
+ throw new IllegalStateException();
+ }
+ _queueConsumerNodeList.removeEntry(_previous);
}
}
Copied:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
(from r1769267,
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java)
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java?p2=qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java&p1=qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java&r1=1769267&r2=1769585&rev=1769585&view=diff
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
(original)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeList.java
Mon Nov 14 09:59:26 2016
@@ -23,20 +23,19 @@ package org.apache.qpid.server.queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-class QueueConsumerList
+class QueueConsumerNodeList
{
- private final ConsumerNode _head = new ConsumerNode();
+ private final QueueConsumerNodeListEntry _head = new
QueueConsumerNodeListEntry();
- private final AtomicReference<ConsumerNode> _tail = new
AtomicReference<ConsumerNode>(_head);
- private final AtomicReference<ConsumerNode> _subNodeMarker = new
AtomicReference<ConsumerNode>(_head);
+ private final AtomicReference<QueueConsumerNodeListEntry> _tail = new
AtomicReference<>(_head);
private final AtomicInteger _size = new AtomicInteger();
- private void insert(final ConsumerNode node, final boolean count)
+ private void insert(final QueueConsumerNodeListEntry node, final boolean
count)
{
for (;;)
{
- ConsumerNode tail = _tail.get();
- ConsumerNode next = tail.nextNode();
+ QueueConsumerNodeListEntry tail = _tail.get();
+ QueueConsumerNodeListEntry next = tail.nextNode();
if (tail == _tail.get())
{
if (next == null)
@@ -59,103 +58,36 @@ class QueueConsumerList
}
}
- public void add(final QueueConsumer<?> sub)
+ public void add(final QueueConsumerNode node)
{
- ConsumerNode node = new ConsumerNode(sub);
- insert(node, true);
+ QueueConsumerNodeListEntry entry = new
QueueConsumerNodeListEntry(node);
+ insert(entry, true);
}
- public boolean remove(final QueueConsumer<?> sub)
+ public boolean remove(final QueueConsumerNode consumerNode)
{
- ConsumerNode prevNode = _head;
- ConsumerNode node = _head.nextNode();
-
- while(node != null)
- {
- if(sub.equals(node.getConsumer()) && node.delete())
- {
- _size.decrementAndGet();
-
- ConsumerNode tail = _tail.get();
- if(node == tail)
- {
- //we cant remove the last node from the structure for
- //correctness reasons, however we have just 'deleted'
- //the tail. Inserting an empty dummy node after it will
- //let us scavenge the node containing the Consumer.
- insert(new ConsumerNode(), false);
- }
-
- //advance the next node reference in the 'prevNode' to scavenge
- //the newly 'deleted' node for the Consumer.
- prevNode.findNext();
-
- nodeMarkerCleanup(node);
-
- return true;
- }
-
- prevNode = node;
- node = node.findNext();
- }
-
- return false;
+ return removeEntry(consumerNode.getListEntry());
}
- private void nodeMarkerCleanup(final ConsumerNode node)
+ boolean removeEntry(final QueueConsumerNodeListEntry entry)
{
- ConsumerNode markedNode = _subNodeMarker.get();
- if(node == markedNode)
+ if (entry.delete())
{
- //if the marked node is the one we are removing, then
- //replace it with a dummy pointing at the next node.
- //this is OK as the marked node is only used to index
- //into the list and find the next node to use.
- //Because we inserted a dummy if node was the
- //tail, markedNode.nextNode() can never be null.
- ConsumerNode dummy = new ConsumerNode();
- dummy.setNext(markedNode.nextNode());
-
- //if the CAS fails the marked node has changed, thus
- //we don't care about the dummy and just forget it
- _subNodeMarker.compareAndSet(markedNode, dummy);
+ _size.decrementAndGet();
+ return true;
}
- else if(markedNode != null)
+ else
{
- //if the marked node was already deleted then it could
- //hold subsequently removed nodes after it in the list
- //in memory. Scavenge it to ensure their actual removal.
- if(markedNode != _head && markedNode.isDeleted())
- {
- markedNode.findNext();
- }
+ return false;
}
}
- public boolean updateMarkedNode(final ConsumerNode expected, final
ConsumerNode nextNode)
- {
- return _subNodeMarker.compareAndSet(expected, nextNode);
- }
-
- /**
- * Get the current marked ConsumerNode. This should only be used only to
index into the list and find the next node
- * after the mark, since if the previously marked node was subsequently
deleted the item returned may be a dummy node
- * with reference to the next node.
- *
- * @return the previously marked node (or a dummy if it was subsequently
deleted)
- */
- public ConsumerNode getMarkedNode()
- {
- return _subNodeMarker.get();
- }
-
-
- public ConsumerNodeIterator iterator()
+ public QueueConsumerNodeIterator iterator()
{
- return new ConsumerNodeIterator(_head);
+ return new QueueConsumerNodeIterator(this);
}
- public ConsumerNode getHead()
+ public QueueConsumerNodeListEntry getHead()
{
return _head;
}
Added:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java?rev=1769585&view=auto
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
(added)
+++
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
Mon Nov 14 09:59:26 2016
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+final class QueueConsumerNodeListEntry
+{
+ private final AtomicBoolean _deleted = new AtomicBoolean();
+ private final AtomicReference<QueueConsumerNodeListEntry> _next = new
AtomicReference<>();
+
+ private final QueueConsumerNode _queueConsumerNode;
+ private volatile boolean _removed;
+
+ QueueConsumerNodeListEntry(final QueueConsumerNode queueConsumerNode)
+ {
+ _queueConsumerNode = queueConsumerNode;
+ queueConsumerNode.setListEntry(this);
+ }
+
+ public QueueConsumerNodeListEntry()
+ {
+ //used for sentinel head and dummy node construction
+ _queueConsumerNode = null;
+ _deleted.set(true);
+ }
+
+ public QueueConsumerNode getQueueConsumerNode()
+ {
+ return _queueConsumerNode;
+ }
+
+ public boolean isRemoved()
+ {
+ return _removed;
+ }
+
+ public void setRemoved()
+ {
+ _removed = true;
+ }
+
+
+
+ /**
+ * Retrieves the first non-deleted node following the current node.
+ * Any deleted non-tail nodes encountered during the search are unlinked.
+ *
+ * @return the next non-deleted node, or null if none was found.
+ */
+ public QueueConsumerNodeListEntry findNext()
+ {
+ QueueConsumerNodeListEntry next = nextNode();
+ while(next != null && next.isDeleted())
+ {
+ final QueueConsumerNodeListEntry newNext = next.nextNode();
+ if(newNext != null)
+ {
+ //try to move our _next reference forward to the 'newNext'
+ //node to unlink the deleted node
+ _next.compareAndSet(next, newNext);
+ next = nextNode();
+ }
+ else
+ {
+ //'newNext' is null, meaning 'next' is the current tail. Can't
unlink
+ //the tail node for thread safety reasons, just use the null.
+ next = null;
+ }
+ }
+
+ return next;
+ }
+
+ /**
+ * Gets the immediately next referenced node in the structure.
+ *
+ * @return the immediately next node in the structure, or null if at the
tail.
+ */
+ protected QueueConsumerNodeListEntry nextNode()
+ {
+ return _next.get();
+ }
+
+ /**
+ * Used to initialise the 'next' reference. Will only succeed if the
reference was not previously set.
+ *
+ * @param node the ConsumerNode to set as 'next'
+ * @return whether the operation succeeded
+ */
+ boolean setNext(final QueueConsumerNodeListEntry node)
+ {
+ return _next.compareAndSet(null, node);
+ }
+
+ public boolean isDeleted()
+ {
+ return _deleted.get();
+ }
+
+ public boolean delete()
+ {
+ return _deleted.compareAndSet(false,true);
+ }
+
+}
Propchange:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNodeListEntry.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java?rev=1769585&r1=1769584&r2=1769585&view=diff
==============================================================================
---
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
(original)
+++
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
Mon Nov 14 09:59:26 2016
@@ -1391,7 +1391,6 @@ public class AMQPConnection_0_8Impl
final Action<ProtocolEngine> listener = _workListener.get();
if(listener != null)
{
-
listener.performAction(this);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]