Author: rgodfrey
Date: Fri Aug 28 00:19:29 2015
New Revision: 1698248

URL: http://svn.apache.org/r1698248
Log:
QPID-6713 : Ensure messages being processed by management or browsers are not 
concurrently removed from the store

Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1698248&r1=1698247&r2=1698248&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Fri Aug 28 00:19:29 2015
@@ -1304,15 +1304,29 @@ public abstract class AbstractQueue<X ex
                     && mightAssign(sub, entry)
                     && !sub.wouldSuspend(entry))
                 {
-                    if (sub.acquires() && !assign(sub, entry))
+
+                    MessageReference messageReference = null;
+                    try
                     {
-                        // restore credit here that would have been taken away 
by wouldSuspend since we didn't manage
-                        // to acquire the entry for this consumer
-                        sub.restoreCredit(entry);
+
+                        if ((sub.acquires() && !assign(sub, entry))
+                            || (!sub.acquires() && (messageReference = 
entry.newMessageReference()) == null))
+                        {
+                            // restore credit here that would have been taken 
away by wouldSuspend since we didn't manage
+                            // to acquire the entry for this consumer
+                            sub.restoreCredit(entry);
+                        }
+                        else
+                        {
+                            deliverMessage(sub, entry, false);
+                        }
                     }
-                    else
+                    finally
                     {
-                        deliverMessage(sub, entry, false);
+                        if (messageReference != null)
+                        {
+                            messageReference.release();
+                        }
                     }
                 }
             }
@@ -1740,10 +1754,22 @@ public abstract class AbstractQueue<X ex
         while (queueListIterator.advance() && !filter.filterComplete())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (!node.isDeleted() && filter.accept(node))
+            MessageReference reference = node.newMessageReference();
+            if (reference != null)
             {
-                entryList.add(node);
+                try
+                {
+                    if (!node.isDeleted() && filter.accept(node))
+                    {
+                        entryList.add(node);
+                    }
+                }
+                finally
+                {
+                    reference.release();
+                }
             }
+
         }
         return entryList;
 
@@ -1756,12 +1782,21 @@ public abstract class AbstractQueue<X ex
         while(queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-
-            if(!node.isDeleted())
+            MessageReference reference = node.newMessageReference();
+            if(reference != null)
             {
-                if(visitor.visit(node))
+                try
                 {
-                    break;
+
+                    final boolean done = !node.isDeleted() && 
visitor.visit(node);
+                    if(done)
+                    {
+                        break;
+                    }
+                }
+                finally
+                {
+                    reference.release();
                 }
             }
         }
@@ -2149,17 +2184,29 @@ public abstract class AbstractQueue<X ex
                 {
                     if (!sub.wouldSuspend(node))
                     {
-                        if (sub.acquires() && !assign(sub, node))
+                        MessageReference messageReference = null;
+                        try
                         {
-                            // restore credit here that would have been taken 
away by wouldSuspend since we didn't manage
-                            // to acquire the entry for this consumer
-                            sub.restoreCredit(node);
+
+                            if ((sub.acquires() && !assign(sub, node))
+                                || (!sub.acquires() && (messageReference = 
node.newMessageReference()) == null))
+                            {
+                                // restore credit here that would have been 
taken away by wouldSuspend since we didn't manage
+                                // to acquire the entry for this consumer
+                                sub.restoreCredit(node);
+                            }
+                            else
+                            {
+                                deliverMessage(sub, node, batch);
+                            }
                         }
-                        else
+                        finally
                         {
-                            deliverMessage(sub, node, batch);
+                            if (messageReference != null)
+                            {
+                                messageReference.release();
+                            }
                         }
-
                     }
                     else // Not enough Credit for message and wouldSuspend
                     {

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1698248&r1=1698247&r2=1698248&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
 Fri Aug 28 00:19:29 2015
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageReference;
 
 public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
 {
@@ -36,4 +37,6 @@ public interface QueueEntry extends Mess
     QueueEntry getNextValidEntry();
 
     void setExpiration(long calculatedExpiration);
+
+    MessageReference newMessageReference();
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1698248&r1=1698247&r2=1698248&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 Fri Aug 28 00:19:29 2015
@@ -22,7 +22,6 @@ package org.apache.qpid.server.queue;
 
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -33,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDeletedException;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
@@ -612,6 +612,19 @@ public abstract class QueueEntryImpl imp
         return (_flags & REDELIVERED_FLAG) != 0;
     }
 
+    @Override
+    public MessageReference newMessageReference()
+    {
+        try
+        {
+            return getMessage().newReference();
+        }
+        catch (MessageDeletedException mde)
+        {
+            return null;
+        }
+    }
+
     private class EntryInstanceProperties implements InstanceProperties
     {
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to