Author: rgodfrey
Date: Tue May 12 22:09:14 2015
New Revision: 1679113
URL: http://svn.apache.org/r1679113
Log:
QPID-6541 : [Java Broker] race condition when queue is transitioning from
recovering to recovered
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1679113&r1=1679112&r2=1679113&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Tue May 12 22:09:14 2015
@@ -258,7 +258,13 @@ public abstract class AbstractQueue<X ex
@ManagedAttributeField
private boolean _ensureNondestructiveConsumers;
- private final AtomicBoolean _recovering = new AtomicBoolean(true);
+ private static final int RECOVERING = 1;
+ private static final int COMPLETING_RECOVERY = 2;
+ private static final int RECOVERED = 3;
+
+ private final AtomicInteger _recovering = new AtomicInteger(RECOVERING);
+ private final AtomicInteger _enqueuingWhileRecovering = new
AtomicInteger(0);
+
private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue =
new ConcurrentLinkedQueue<>();
private final QueueRunner _queueRunner = new QueueRunner(this);
@@ -308,7 +314,7 @@ public abstract class AbstractQueue<X ex
});
}
- _recovering.set(false);
+ _recovering.set(RECOVERED);
}
@Override
@@ -1058,14 +1064,36 @@ public abstract class AbstractQueue<X ex
_totalMessagesReceived.incrementAndGet();
- if(_recovering.get())
+ if(_recovering.get() != RECOVERED)
{
- EnqueueRequest request = new EnqueueRequest(message, action,
enqueueRecord);
- _postRecoveryQueue.add(request);
+ _enqueuingWhileRecovering.incrementAndGet();
- // deal with the case the recovering status changed just as we
added to the post recovery queue
- if(!_recovering.get() && _postRecoveryQueue.remove(request))
+ boolean enqueueImmediately;
+ try
+ {
+ if(_recovering.get() == RECOVERING)
+ {
+ EnqueueRequest request = new EnqueueRequest(message,
action, enqueueRecord);
+ _postRecoveryQueue.add(request);
+ // deal with the case the recovering status changed just
as we added to the post recovery queue
+ enqueueImmediately = (_recovering.get() != RECOVERING) &&
_postRecoveryQueue.remove(request);
+ }
+ else
+ {
+ enqueueImmediately = true;
+ }
+ }
+ finally
{
+ _enqueuingWhileRecovering.decrementAndGet();
+ }
+
+ if(enqueueImmediately)
+ {
+ while(_recovering.get() != RECOVERED)
+ {
+ Thread.yield();
+ }
doEnqueue(message, action, enqueueRecord);
}
}
@@ -1090,14 +1118,21 @@ public abstract class AbstractQueue<X ex
@Override
public final void completeRecovery()
{
- if(_recovering.get())
+ if(_recovering.compareAndSet(RECOVERING, COMPLETING_RECOVERY))
{
- enqueueFromPostRecoveryQueue();
+ while(_enqueuingWhileRecovering.get() != 0)
+ {
+ Thread.yield();
+ }
- _recovering.set(false);
+ // at this point we can assert that any new enqueue to the queue
will not try to put into the post recovery
+ // queue (because the state is no longer RECOVERING, but also no
threads are currently trying to enqueue
+ // because the _enqueuingWhileRecovering count is 0.
- // deal with any enqueues that occurred just as we cleared the
queue
enqueueFromPostRecoveryQueue();
+
+ _recovering.set(RECOVERED);
+
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]