Author: ritchiem
Date: Fri Apr  3 14:46:29 2009
New Revision: 761700

URL: http://svn.apache.org/viewvc?rev=761700&view=rev
Log:
QPID-1764 : Resolved ConcurrentME. Perils of using the 'syntax sugar' for loop 
hides the message iterator that you need to call .remove(). Calling remove on 
the underlying Map will cause the resulting CME.

Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java?rev=761700&r1=761699&r2=761700&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
 Fri Apr  3 14:46:29 2009
@@ -33,6 +33,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.Iterator;
 
 public class BaseTransactionLog implements TransactionLog
 {
@@ -80,15 +81,18 @@
             Map<Long, ArrayList<AMQQueue>> messageMap = 
context.getDequeueMap();
 
             //For each Message ID that is in the map check
-            for (Long messageID : messageMap.keySet())
+            Iterator iterator = messageMap.keySet().iterator();
+
+            while (iterator.hasNext())
             {
+                Long messageID = (Long) iterator.next();
                 //If we don't have a gloabl reference for this message then 
there is only a single enqueue
                 if (_idToQueues.get(messageID) == null)
                 {
                     // Add the removal of the message to this transaction
                     _delegate.removeMessage(context,messageID);
                     // Remove this message ID as we have processed it so we 
don't reprocess after the main commmit
-                    messageMap.remove(messageID);
+                    iterator.remove();
                 }
             }
         }
@@ -179,6 +183,15 @@
                     }
                     else
                     {
+                        //When a message is on more than one queue it is 
possible that this code section is exectuted
+                        // by one thread per enqueue.
+                        // It is however, thread safe because there is only 
removes being performed and so the
+                        // last thread that does the remove will see the empty 
queue and remove the message
+                        // At this stage there is nothing that is going to 
cause this operation to abort. So we don't
+                        // need to worry about any potential adds.
+                        // The message will no longer be enqueued as that 
operation has been committed before now so
+                        // this is clean up of the data.                       
 
+
                         // Update the enqueued list
                         enqueuedList.remove(queue);
 
@@ -195,6 +208,8 @@
 
             //Commit the removes on the delegate.
             _delegate.commitTran(removeContext);
+            // Mark this context as committed.
+            removeContext.commitTransaction();
         }
         finally
         {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to