Author: ritchiem
Date: Fri Apr 3 14:46:29 2009
New Revision: 761700
URL: http://svn.apache.org/viewvc?rev=761700&view=rev
Log:
QPID-1764 : Resolved ConcurrentME. Perils of using the 'syntax sugar' for loop
hides the message iterator that you need to call .remove(). Calling remove on
the underlying Map will cause the resulting CME.
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java?rev=761700&r1=761699&r2=761700&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
Fri Apr 3 14:46:29 2009
@@ -33,6 +33,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.Iterator;
public class BaseTransactionLog implements TransactionLog
{
@@ -80,15 +81,18 @@
Map<Long, ArrayList<AMQQueue>> messageMap =
context.getDequeueMap();
//For each Message ID that is in the map check
- for (Long messageID : messageMap.keySet())
+ Iterator iterator = messageMap.keySet().iterator();
+
+ while (iterator.hasNext())
{
+ Long messageID = (Long) iterator.next();
//If we don't have a gloabl reference for this message then
there is only a single enqueue
if (_idToQueues.get(messageID) == null)
{
// Add the removal of the message to this transaction
_delegate.removeMessage(context,messageID);
// Remove this message ID as we have processed it so we
don't reprocess after the main commmit
- messageMap.remove(messageID);
+ iterator.remove();
}
}
}
@@ -179,6 +183,15 @@
}
else
{
+ //When a message is on more than one queue it is
possible that this code section is exectuted
+ // by one thread per enqueue.
+ // It is however, thread safe because there is only
removes being performed and so the
+ // last thread that does the remove will see the empty
queue and remove the message
+ // At this stage there is nothing that is going to
cause this operation to abort. So we don't
+ // need to worry about any potential adds.
+ // The message will no longer be enqueued as that
operation has been committed before now so
+ // this is clean up of the data.
+
// Update the enqueued list
enqueuedList.remove(queue);
@@ -195,6 +208,8 @@
//Commit the removes on the delegate.
_delegate.commitTran(removeContext);
+ // Mark this context as committed.
+ removeContext.commitTransaction();
}
finally
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]