Author: jstrachan
Date: Thu Feb 16 07:06:00 2006
New Revision: 378268
URL: http://svn.apache.org/viewcvs?rev=378268&view=rev
Log:
fix for AMQ-575 to avoid the deadlock when removing consumers, which can
sometimes cause messages to be redispatched when another consumer is ack-ing a
message. a good lesson this one - keep locks for as short a time as possible
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=378268&r1=378267&r2=378268&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Feb 16 07:06:00 2006
@@ -187,12 +187,14 @@
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
MessageGroupSet ownedGroups =
getMessageGroupOwners().removeConsumer(consumerId);
- synchronized (messages) {
- if (!sub.getConsumerInfo().isBrowser()) {
- MessageEvaluationContext msgContext =
context.getMessageEvaluationContext();
- try {
- msgContext.setDestination(destination);
+ if (!sub.getConsumerInfo().isBrowser()) {
+ MessageEvaluationContext msgContext =
context.getMessageEvaluationContext();
+ try {
+ msgContext.setDestination(destination);
+ // lets copy the messages to dispatch to avoid deadlock
+ List messagesToDispatch = new ArrayList();
+ synchronized (messages) {
for (Iterator iter = messages.iterator();
iter.hasNext();) {
IndirectMessageReference node =
(IndirectMessageReference) iter.next();
if (node.isDropped()) {
@@ -202,20 +204,25 @@
String groupID = node.getGroupID();
// Re-deliver all messages that the sub locked
- if (node.getLockOwner() == sub || wasExclusiveOwner
- || (groupID != null &&
ownedGroups.contains(groupID))) {
- node.incrementRedeliveryCounter();
- node.unlock();
- msgContext.setMessageReference(node);
- dispatchPolicy.dispatch(context, node,
msgContext, consumers);
+ if (node.getLockOwner() == sub ||
wasExclusiveOwner || (groupID != null && ownedGroups.contains(groupID))) {
+ messagesToDispatch.add(node);
}
}
- } finally {
- msgContext.clear();
}
+
+ // now lets dispatch from the copy of the collection to
avoid deadlocks
+ for (Iterator iter = messagesToDispatch.iterator();
iter.hasNext();) {
+ IndirectMessageReference node =
(IndirectMessageReference) iter.next();
+ node.incrementRedeliveryCounter();
+ node.unlock();
+ msgContext.setMessageReference(node);
+ dispatchPolicy.dispatch(context, node, msgContext,
consumers);
+ }
+ }
+ finally {
+ msgContext.clear();
}
}
-
} finally {
dispatchValve.turnOn();
}