Author: rgodfrey
Date: Tue May 12 22:09:14 2015
New Revision: 1679113

URL: http://svn.apache.org/r1679113
Log:
QPID-6541 : [Java Broker] race condition when queue is transitioning from 
recovering to recovered

Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1679113&r1=1679112&r2=1679113&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Tue May 12 22:09:14 2015
@@ -258,7 +258,13 @@ public abstract class AbstractQueue<X ex
     @ManagedAttributeField
     private boolean _ensureNondestructiveConsumers;
 
-    private final AtomicBoolean _recovering = new AtomicBoolean(true);
+    private static final int RECOVERING = 1;
+    private static final int COMPLETING_RECOVERY = 2;
+    private static final int RECOVERED = 3;
+
+    private final AtomicInteger _recovering = new AtomicInteger(RECOVERING);
+    private final AtomicInteger _enqueuingWhileRecovering = new 
AtomicInteger(0);
+
     private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = 
new ConcurrentLinkedQueue<>();
 
     private final QueueRunner _queueRunner = new QueueRunner(this);
@@ -308,7 +314,7 @@ public abstract class AbstractQueue<X ex
                          });
         }
 
-        _recovering.set(false);
+        _recovering.set(RECOVERED);
     }
 
     @Override
@@ -1058,14 +1064,36 @@ public abstract class AbstractQueue<X ex
 
         _totalMessagesReceived.incrementAndGet();
 
-        if(_recovering.get())
+        if(_recovering.get() != RECOVERED)
         {
-            EnqueueRequest request = new EnqueueRequest(message, action, 
enqueueRecord);
-            _postRecoveryQueue.add(request);
+            _enqueuingWhileRecovering.incrementAndGet();
 
-            // deal with the case the recovering status changed just as we 
added to the post recovery queue
-            if(!_recovering.get() && _postRecoveryQueue.remove(request))
+            boolean enqueueImmediately;
+            try
+            {
+                if(_recovering.get() == RECOVERING)
+                {
+                    EnqueueRequest request = new EnqueueRequest(message, 
action, enqueueRecord);
+                    _postRecoveryQueue.add(request);
+                    // deal with the case the recovering status changed just 
as we added to the post recovery queue
+                    enqueueImmediately = (_recovering.get() != RECOVERING) && 
_postRecoveryQueue.remove(request);
+                }
+                else
+                {
+                    enqueueImmediately = true;
+                }
+            }
+            finally
             {
+                _enqueuingWhileRecovering.decrementAndGet();
+            }
+
+            if(enqueueImmediately)
+            {
+                while(_recovering.get() != RECOVERED)
+                {
+                    Thread.yield();
+                }
                 doEnqueue(message, action, enqueueRecord);
             }
         }
@@ -1090,14 +1118,21 @@ public abstract class AbstractQueue<X ex
     @Override
     public final void completeRecovery()
     {
-        if(_recovering.get())
+        if(_recovering.compareAndSet(RECOVERING, COMPLETING_RECOVERY))
         {
-            enqueueFromPostRecoveryQueue();
+            while(_enqueuingWhileRecovering.get() != 0)
+            {
+                Thread.yield();
+            }
 
-            _recovering.set(false);
+            // at this point we can assert that any new enqueue to the queue 
will not try to put into the post recovery
+            // queue (because the state is no longer RECOVERING, but also no 
threads are currently trying to enqueue
+            // because the _enqueuingWhileRecovering count is 0.
 
-            // deal with any enqueues that occurred just as we cleared the 
queue
             enqueueFromPostRecoveryQueue();
+
+            _recovering.set(RECOVERED);
+
         }
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to