Author: rgodfrey
Date: Fri Aug 28 00:19:29 2015
New Revision: 1698248
URL: http://svn.apache.org/r1698248
Log:
QPID-6713 : Ensure messages being processed by management or browsers are not
concurrently removed from the store
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1698248&r1=1698247&r2=1698248&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Fri Aug 28 00:19:29 2015
@@ -1304,15 +1304,29 @@ public abstract class AbstractQueue<X ex
&& mightAssign(sub, entry)
&& !sub.wouldSuspend(entry))
{
- if (sub.acquires() && !assign(sub, entry))
+
+ MessageReference messageReference = null;
+ try
{
- // restore credit here that would have been taken away
by wouldSuspend since we didn't manage
- // to acquire the entry for this consumer
- sub.restoreCredit(entry);
+
+ if ((sub.acquires() && !assign(sub, entry))
+ || (!sub.acquires() && (messageReference =
entry.newMessageReference()) == null))
+ {
+ // restore credit here that would have been taken
away by wouldSuspend since we didn't manage
+ // to acquire the entry for this consumer
+ sub.restoreCredit(entry);
+ }
+ else
+ {
+ deliverMessage(sub, entry, false);
+ }
}
- else
+ finally
{
- deliverMessage(sub, entry, false);
+ if (messageReference != null)
+ {
+ messageReference.release();
+ }
}
}
}
@@ -1740,10 +1754,22 @@ public abstract class AbstractQueue<X ex
while (queueListIterator.advance() && !filter.filterComplete())
{
QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && filter.accept(node))
+ MessageReference reference = node.newMessageReference();
+ if (reference != null)
{
- entryList.add(node);
+ try
+ {
+ if (!node.isDeleted() && filter.accept(node))
+ {
+ entryList.add(node);
+ }
+ }
+ finally
+ {
+ reference.release();
+ }
}
+
}
return entryList;
@@ -1756,12 +1782,21 @@ public abstract class AbstractQueue<X ex
while(queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
-
- if(!node.isDeleted())
+ MessageReference reference = node.newMessageReference();
+ if(reference != null)
{
- if(visitor.visit(node))
+ try
{
- break;
+
+ final boolean done = !node.isDeleted() &&
visitor.visit(node);
+ if(done)
+ {
+ break;
+ }
+ }
+ finally
+ {
+ reference.release();
}
}
}
@@ -2149,17 +2184,29 @@ public abstract class AbstractQueue<X ex
{
if (!sub.wouldSuspend(node))
{
- if (sub.acquires() && !assign(sub, node))
+ MessageReference messageReference = null;
+ try
{
- // restore credit here that would have been taken
away by wouldSuspend since we didn't manage
- // to acquire the entry for this consumer
- sub.restoreCredit(node);
+
+ if ((sub.acquires() && !assign(sub, node))
+ || (!sub.acquires() && (messageReference =
node.newMessageReference()) == null))
+ {
+ // restore credit here that would have been
taken away by wouldSuspend since we didn't manage
+ // to acquire the entry for this consumer
+ sub.restoreCredit(node);
+ }
+ else
+ {
+ deliverMessage(sub, node, batch);
+ }
}
- else
+ finally
{
- deliverMessage(sub, node, batch);
+ if (messageReference != null)
+ {
+ messageReference.release();
+ }
}
-
}
else // Not enough Credit for message and wouldSuspend
{
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1698248&r1=1698247&r2=1698248&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
Fri Aug 28 00:19:29 2015
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageReference;
public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
{
@@ -36,4 +37,6 @@ public interface QueueEntry extends Mess
QueueEntry getNextValidEntry();
void setExpiration(long calculatedExpiration);
+
+ MessageReference newMessageReference();
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1698248&r1=1698247&r2=1698248&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
Fri Aug 28 00:19:29 2015
@@ -22,7 +22,6 @@ package org.apache.qpid.server.queue;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -33,6 +32,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDeletedException;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -612,6 +612,19 @@ public abstract class QueueEntryImpl imp
return (_flags & REDELIVERED_FLAG) != 0;
}
+ @Override
+ public MessageReference newMessageReference()
+ {
+ try
+ {
+ return getMessage().newReference();
+ }
+ catch (MessageDeletedException mde)
+ {
+ return null;
+ }
+ }
+
private class EntryInstanceProperties implements InstanceProperties
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]