Author: rgodfrey
Date: Mon Nov 16 09:52:43 2015
New Revision: 1714531

URL: http://svn.apache.org/viewvc?rev=1714531&view=rev
Log:
QPID-6848 : Ensure clearQueue deletes acquired messages. For messages where 
acquisition is locked, delete once the acquisition is unlocked

Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1714531&r1=1714530&r2=1714531&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
 Mon Nov 16 09:52:43 2015
@@ -29,6 +29,7 @@ import java.util.Map;
 
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.virtualhost.VirtualHostConnectionListener;
@@ -204,17 +205,17 @@ public interface VirtualHost<X extends V
     void registerConnection(AMQPConnection<?> connection);
     void deregisterConnection(AMQPConnection<?> connection);
 
-    public static interface Transaction
+    interface Transaction
     {
-        void dequeue(MessageInstance entry);
+        void dequeue(QueueEntry entry);
 
-        void copy(MessageInstance entry, Queue queue);
+        void copy(QueueEntry entry, Queue queue);
 
-        void move(MessageInstance entry, Queue queue);
+        void move(QueueEntry entry, Queue queue);
 
     }
 
-    public static interface TransactionalOperation
+    interface TransactionalOperation
     {
         void withinTransaction(Transaction txn);
     }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1714531&r1=1714530&r2=1714531&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Mon Nov 16 09:52:43 2015
@@ -1859,8 +1859,17 @@ public abstract class AbstractQueue<X ex
 
         while (queueListIterator.advance())
         {
-            QueueEntry node = queueListIterator.getNode();
-            if (node.acquire())
+            final QueueEntry node = queueListIterator.getNode();
+            boolean acquired = node.acquireOrSteal(new Runnable()
+                                                    {
+                                                        @Override
+                                                        public void run()
+                                                        {
+                                                            dequeueEntry(node);
+                                                        }
+                                                    });
+
+            if (acquired)
             {
                 dequeueEntry(node, txn);
                 if(++count == request)
@@ -2457,23 +2466,21 @@ public abstract class AbstractQueue<X ex
         long cumulativeQueueSize = 0;
         while (queueListIterator.advance())
         {
-            QueueEntry node = queueListIterator.getNode();
+            final QueueEntry node = queueListIterator.getNode();
             // Only process nodes that are not currently deleted and not 
dequeued
             if (!node.isDeleted())
             {
                 // If the node has expired then acquire it
                 if (node.expired())
                 {
-                    boolean acquiredForDequeueing = node.acquire();
-                    if(!acquiredForDequeueing && node.getDeliveredToConsumer())
+                    boolean acquiredForDequeueing = node.acquireOrSteal(new 
Runnable()
                     {
-                        QueueConsumer consumer = (QueueConsumer) 
node.getDeliveredConsumer();
-                        acquiredForDequeueing = 
node.removeAcquisitionFromConsumer(consumer);
-                        if(acquiredForDequeueing)
+                        @Override
+                        public void run()
                         {
-                            consumer.acquisitionRemoved(node);
+                            dequeueEntry(node);
                         }
-                    }
+                    });
 
                     if(acquiredForDequeueing)
                     {

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1714531&r1=1714530&r2=1714531&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
 Mon Nov 16 09:52:43 2015
@@ -30,6 +30,10 @@ public interface QueueEntry extends Mess
 
     long getSize();
 
+    boolean acquireOrSteal(final Runnable delayedAcquisitionTask);
+
+    QueueConsumer getDeliveredConsumer();
+
     boolean isQueueDeleted();
 
     QueueEntry getNextNode();

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1714531&r1=1714530&r2=1714531&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 Mon Nov 16 09:52:43 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -208,6 +209,64 @@ public abstract class QueueEntryImpl imp
         return acquire(NON_CONSUMER_ACQUIRED_STATE);
     }
 
+    private class DelayedAcquisitionStateListener implements 
StateChangeListener<MessageInstance, State>
+    {
+        private final Runnable _task;
+        private final AtomicBoolean _run = new AtomicBoolean();
+
+        private DelayedAcquisitionStateListener(final Runnable task)
+        {
+            _task = task;
+        }
+
+        @Override
+        public void stateChanged(final MessageInstance object, final State 
oldState, final State newState)
+        {
+            if(newState == State.DELETED || newState == State.DEQUEUED)
+            {
+                QueueEntryImpl.this.removeStateChangeListener(this);
+            }
+            else if(acquireOrSteal(null))
+            {
+                runTask();
+            }
+        }
+
+        void runTask()
+        {
+            QueueEntryImpl.this.removeStateChangeListener(this);
+            if(_run.compareAndSet(false,true))
+            {
+                _task.run();
+            }
+        }
+    }
+
+    @Override
+    public boolean acquireOrSteal(final Runnable delayedAcquisitionTask)
+    {
+        boolean acquired = acquire();
+        if(!acquired)
+        {
+            QueueConsumer consumer = getDeliveredConsumer();
+            acquired = removeAcquisitionFromConsumer(consumer);
+            if(acquired)
+            {
+                consumer.acquisitionRemoved(this);
+            }
+            else if(delayedAcquisitionTask != null)
+            {
+                DelayedAcquisitionStateListener listener = new 
DelayedAcquisitionStateListener(delayedAcquisitionTask);
+                addStateChangeListener(listener);
+                if(acquireOrSteal(null))
+                {
+                    listener.runTask();
+                }
+            }
+        }
+        return acquired;
+    }
+
     private boolean acquire(final EntryState state)
     {
         boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE, 
state);
@@ -237,7 +296,13 @@ public abstract class QueueEntryImpl imp
         EntryState state = _state;
         if(state instanceof ConsumerAcquiredState)
         {
-            return _stateUpdater.compareAndSet(this, state, 
((ConsumerAcquiredState)state).getLockedState());
+            LockedAcquiredState lockedState = ((ConsumerAcquiredState) 
state).getLockedState();
+            boolean updated = _stateUpdater.compareAndSet(this, state, 
lockedState);
+            if(updated)
+            {
+                notifyStateChange(state.getState(), lockedState.getState());
+            }
+            return updated;
         }
         return state instanceof LockedAcquiredState;
     }
@@ -248,7 +313,13 @@ public abstract class QueueEntryImpl imp
         EntryState state = _state;
         if(state instanceof LockedAcquiredState)
         {
-            return _stateUpdater.compareAndSet(this, state, 
((LockedAcquiredState)state).getUnlockedState());
+            ConsumerAcquiredState unlockedState = ((LockedAcquiredState) 
state).getUnlockedState();
+            boolean updated = _stateUpdater.compareAndSet(this, state, 
unlockedState);
+            if(updated)
+            {
+                notifyStateChange(state.getState(),unlockedState.getState());
+            }
+            return updated;
         }
         return false;
     }
@@ -334,6 +405,7 @@ public abstract class QueueEntryImpl imp
     }
 
 
+    @Override
     public QueueConsumer getDeliveredConsumer()
     {
         EntryState state = _state;

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1714531&r1=1714530&r2=1714531&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 Mon Nov 16 09:52:43 2015
@@ -67,7 +67,6 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.messages.VirtualHostMessages;
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageNode;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
@@ -79,7 +78,6 @@ import org.apache.qpid.server.plugin.Sys
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueConsumer;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.stats.StatisticsCounter;
@@ -95,6 +93,7 @@ import org.apache.qpid.server.store.Stor
 import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -1304,41 +1303,41 @@ public abstract class AbstractVirtualHos
 
     public void executeTransaction(TransactionalOperation op)
     {
-        MessageStore store = getMessageStore();
+        final MessageStore store = getMessageStore();
         final LocalTransaction txn = new LocalTransaction(store);
 
         op.withinTransaction(new Transaction()
         {
-            public void dequeue(final MessageInstance messageInstance)
+            public void dequeue(final QueueEntry messageInstance)
             {
-                boolean acquired = messageInstance.acquire();
-                if(!acquired && messageInstance instanceof QueueEntry)
+                final ServerTransaction.Action deleteAction = new 
ServerTransaction.Action()
                 {
-                    QueueEntry entry = (QueueEntry) messageInstance;
-                    QueueConsumer consumer = (QueueConsumer) 
entry.getDeliveredConsumer();
-                    acquired = 
messageInstance.removeAcquisitionFromConsumer(consumer);
-                    if(acquired)
+                    public void postCommit()
                     {
-                        
consumer.acquisitionRemoved((QueueEntry)messageInstance);
+                        messageInstance.delete();
                     }
-                }
-                if(acquired)
-                {
-                    txn.dequeue(messageInstance.getEnqueueRecord(), new 
ServerTransaction.Action()
+
+                    public void onRollback()
                     {
-                        public void postCommit()
-                        {
-                            messageInstance.delete();
-                        }
+                    }
+                };
 
-                        public void onRollback()
-                        {
-                        }
-                    });
+                boolean acquired = messageInstance.acquireOrSteal(new 
Runnable()
+                                                                    {
+                                                                        
@Override
+                                                                        public 
void run()
+                                                                        {
+                                                                            
ServerTransaction txn = new AutoCommitTransaction(store);
+                                                                            
txn.dequeue(messageInstance.getEnqueueRecord(), deleteAction);
+                                                                        }
+                                                                    });
+                if(acquired)
+                {
+                    txn.dequeue(messageInstance.getEnqueueRecord(), 
deleteAction);
                 }
             }
 
-            public void copy(MessageInstance entry, Queue queue)
+            public void copy(QueueEntry entry, Queue queue)
             {
                 final ServerMessage message = entry.getMessage();
                 final AMQQueue toQueue = (AMQQueue)queue;
@@ -1357,7 +1356,7 @@ public abstract class AbstractVirtualHos
 
             }
 
-            public void move(final MessageInstance entry, Queue queue)
+            public void move(final QueueEntry entry, Queue queue)
             {
                 final ServerMessage message = entry.getMessage();
                 final AMQQueue toQueue = (AMQQueue)queue;
@@ -1377,23 +1376,21 @@ public abstract class AbstractVirtualHos
                                         entry.release();
                                     }
                                 });
-                    if(entry instanceof QueueEntry)
-                    {
-                        txn.dequeue(entry.getEnqueueRecord(),
-                                    new ServerTransaction.Action()
+                    txn.dequeue(entry.getEnqueueRecord(),
+                                new ServerTransaction.Action()
+                                {
+
+                                    public void postCommit()
                                     {
+                                        entry.delete();
+                                    }
 
-                                        public void postCommit()
-                                        {
-                                            entry.delete();
-                                        }
+                                    public void onRollback()
+                                    {
 
-                                        public void onRollback()
-                                        {
+                                    }
+                                });
 
-                                        }
-                                    });
-                    }
                 }
             }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to