Author: rgodfrey
Date: Mon Nov 16 09:52:43 2015
New Revision: 1714531
URL: http://svn.apache.org/viewvc?rev=1714531&view=rev
Log:
QPID-6848 : Ensure clearQueue deletes acquired messages. For messages where
acquisition is locked, delete once the acquisition is unlocked
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1714531&r1=1714530&r2=1714531&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
Mon Nov 16 09:52:43 2015
@@ -29,6 +29,7 @@ import java.util.Map;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.virtualhost.VirtualHostConnectionListener;
@@ -204,17 +205,17 @@ public interface VirtualHost<X extends V
void registerConnection(AMQPConnection<?> connection);
void deregisterConnection(AMQPConnection<?> connection);
- public static interface Transaction
+ interface Transaction
{
- void dequeue(MessageInstance entry);
+ void dequeue(QueueEntry entry);
- void copy(MessageInstance entry, Queue queue);
+ void copy(QueueEntry entry, Queue queue);
- void move(MessageInstance entry, Queue queue);
+ void move(QueueEntry entry, Queue queue);
}
- public static interface TransactionalOperation
+ interface TransactionalOperation
{
void withinTransaction(Transaction txn);
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1714531&r1=1714530&r2=1714531&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Mon Nov 16 09:52:43 2015
@@ -1859,8 +1859,17 @@ public abstract class AbstractQueue<X ex
while (queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
- if (node.acquire())
+ final QueueEntry node = queueListIterator.getNode();
+ boolean acquired = node.acquireOrSteal(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ dequeueEntry(node);
+ }
+ });
+
+ if (acquired)
{
dequeueEntry(node, txn);
if(++count == request)
@@ -2457,23 +2466,21 @@ public abstract class AbstractQueue<X ex
long cumulativeQueueSize = 0;
while (queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
+ final QueueEntry node = queueListIterator.getNode();
// Only process nodes that are not currently deleted and not
dequeued
if (!node.isDeleted())
{
// If the node has expired then acquire it
if (node.expired())
{
- boolean acquiredForDequeueing = node.acquire();
- if(!acquiredForDequeueing && node.getDeliveredToConsumer())
+ boolean acquiredForDequeueing = node.acquireOrSteal(new
Runnable()
{
- QueueConsumer consumer = (QueueConsumer)
node.getDeliveredConsumer();
- acquiredForDequeueing =
node.removeAcquisitionFromConsumer(consumer);
- if(acquiredForDequeueing)
+ @Override
+ public void run()
{
- consumer.acquisitionRemoved(node);
+ dequeueEntry(node);
}
- }
+ });
if(acquiredForDequeueing)
{
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1714531&r1=1714530&r2=1714531&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
Mon Nov 16 09:52:43 2015
@@ -30,6 +30,10 @@ public interface QueueEntry extends Mess
long getSize();
+ boolean acquireOrSteal(final Runnable delayedAcquisitionTask);
+
+ QueueConsumer getDeliveredConsumer();
+
boolean isQueueDeleted();
QueueEntry getNextNode();
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1714531&r1=1714530&r2=1714531&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
Mon Nov 16 09:52:43 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -208,6 +209,64 @@ public abstract class QueueEntryImpl imp
return acquire(NON_CONSUMER_ACQUIRED_STATE);
}
+ private class DelayedAcquisitionStateListener implements
StateChangeListener<MessageInstance, State>
+ {
+ private final Runnable _task;
+ private final AtomicBoolean _run = new AtomicBoolean();
+
+ private DelayedAcquisitionStateListener(final Runnable task)
+ {
+ _task = task;
+ }
+
+ @Override
+ public void stateChanged(final MessageInstance object, final State
oldState, final State newState)
+ {
+ if(newState == State.DELETED || newState == State.DEQUEUED)
+ {
+ QueueEntryImpl.this.removeStateChangeListener(this);
+ }
+ else if(acquireOrSteal(null))
+ {
+ runTask();
+ }
+ }
+
+ void runTask()
+ {
+ QueueEntryImpl.this.removeStateChangeListener(this);
+ if(_run.compareAndSet(false,true))
+ {
+ _task.run();
+ }
+ }
+ }
+
+ @Override
+ public boolean acquireOrSteal(final Runnable delayedAcquisitionTask)
+ {
+ boolean acquired = acquire();
+ if(!acquired)
+ {
+ QueueConsumer consumer = getDeliveredConsumer();
+ acquired = removeAcquisitionFromConsumer(consumer);
+ if(acquired)
+ {
+ consumer.acquisitionRemoved(this);
+ }
+ else if(delayedAcquisitionTask != null)
+ {
+ DelayedAcquisitionStateListener listener = new
DelayedAcquisitionStateListener(delayedAcquisitionTask);
+ addStateChangeListener(listener);
+ if(acquireOrSteal(null))
+ {
+ listener.runTask();
+ }
+ }
+ }
+ return acquired;
+ }
+
private boolean acquire(final EntryState state)
{
boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE,
state);
@@ -237,7 +296,13 @@ public abstract class QueueEntryImpl imp
EntryState state = _state;
if(state instanceof ConsumerAcquiredState)
{
- return _stateUpdater.compareAndSet(this, state,
((ConsumerAcquiredState)state).getLockedState());
+ LockedAcquiredState lockedState = ((ConsumerAcquiredState)
state).getLockedState();
+ boolean updated = _stateUpdater.compareAndSet(this, state,
lockedState);
+ if(updated)
+ {
+ notifyStateChange(state.getState(), lockedState.getState());
+ }
+ return updated;
}
return state instanceof LockedAcquiredState;
}
@@ -248,7 +313,13 @@ public abstract class QueueEntryImpl imp
EntryState state = _state;
if(state instanceof LockedAcquiredState)
{
- return _stateUpdater.compareAndSet(this, state,
((LockedAcquiredState)state).getUnlockedState());
+ ConsumerAcquiredState unlockedState = ((LockedAcquiredState)
state).getUnlockedState();
+ boolean updated = _stateUpdater.compareAndSet(this, state,
unlockedState);
+ if(updated)
+ {
+ notifyStateChange(state.getState(),unlockedState.getState());
+ }
+ return updated;
}
return false;
}
@@ -334,6 +405,7 @@ public abstract class QueueEntryImpl imp
}
+ @Override
public QueueConsumer getDeliveredConsumer()
{
EntryState state = _state;
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1714531&r1=1714530&r2=1714531&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
Mon Nov 16 09:52:43 2015
@@ -67,7 +67,6 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageNode;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
@@ -79,7 +78,6 @@ import org.apache.qpid.server.plugin.Sys
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueConsumer;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -95,6 +93,7 @@ import org.apache.qpid.server.store.Stor
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -1304,41 +1303,41 @@ public abstract class AbstractVirtualHos
public void executeTransaction(TransactionalOperation op)
{
- MessageStore store = getMessageStore();
+ final MessageStore store = getMessageStore();
final LocalTransaction txn = new LocalTransaction(store);
op.withinTransaction(new Transaction()
{
- public void dequeue(final MessageInstance messageInstance)
+ public void dequeue(final QueueEntry messageInstance)
{
- boolean acquired = messageInstance.acquire();
- if(!acquired && messageInstance instanceof QueueEntry)
+ final ServerTransaction.Action deleteAction = new
ServerTransaction.Action()
{
- QueueEntry entry = (QueueEntry) messageInstance;
- QueueConsumer consumer = (QueueConsumer)
entry.getDeliveredConsumer();
- acquired =
messageInstance.removeAcquisitionFromConsumer(consumer);
- if(acquired)
+ public void postCommit()
{
-
consumer.acquisitionRemoved((QueueEntry)messageInstance);
+ messageInstance.delete();
}
- }
- if(acquired)
- {
- txn.dequeue(messageInstance.getEnqueueRecord(), new
ServerTransaction.Action()
+
+ public void onRollback()
{
- public void postCommit()
- {
- messageInstance.delete();
- }
+ }
+ };
- public void onRollback()
- {
- }
- });
+ boolean acquired = messageInstance.acquireOrSteal(new
Runnable()
+ {
+
@Override
+ public
void run()
+ {
+
ServerTransaction txn = new AutoCommitTransaction(store);
+
txn.dequeue(messageInstance.getEnqueueRecord(), deleteAction);
+ }
+ });
+ if(acquired)
+ {
+ txn.dequeue(messageInstance.getEnqueueRecord(),
deleteAction);
}
}
- public void copy(MessageInstance entry, Queue queue)
+ public void copy(QueueEntry entry, Queue queue)
{
final ServerMessage message = entry.getMessage();
final AMQQueue toQueue = (AMQQueue)queue;
@@ -1357,7 +1356,7 @@ public abstract class AbstractVirtualHos
}
- public void move(final MessageInstance entry, Queue queue)
+ public void move(final QueueEntry entry, Queue queue)
{
final ServerMessage message = entry.getMessage();
final AMQQueue toQueue = (AMQQueue)queue;
@@ -1377,23 +1376,21 @@ public abstract class AbstractVirtualHos
entry.release();
}
});
- if(entry instanceof QueueEntry)
- {
- txn.dequeue(entry.getEnqueueRecord(),
- new ServerTransaction.Action()
+ txn.dequeue(entry.getEnqueueRecord(),
+ new ServerTransaction.Action()
+ {
+
+ public void postCommit()
{
+ entry.delete();
+ }
- public void postCommit()
- {
- entry.delete();
- }
+ public void onRollback()
+ {
- public void onRollback()
- {
+ }
+ });
- }
- });
- }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]