Author: dblevins
Date: Wed May  5 23:09:02 2010
New Revision: 941527

URL: http://svn.apache.org/viewvc?rev=941527&view=rev
Log:
Wonderful patch from Ivan, OPENEJB-1246, that really tightens up the thread 
safety of the MemoryTimerStore
Thanks, Ivan!

Modified:
    
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/timer/MemoryTimerStore.java

Modified: 
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/timer/MemoryTimerStore.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/timer/MemoryTimerStore.java?rev=941527&r1=941526&r2=941527&view=diff
==============================================================================
--- 
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/timer/MemoryTimerStore.java
 (original)
+++ 
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/timer/MemoryTimerStore.java
 Wed May  5 23:09:02 2010
@@ -17,31 +17,34 @@
 
 package org.apache.openejb.core.timer;
 
-import org.apache.openejb.util.LogCategory;
-import org.apache.openejb.util.Logger;
-
-import javax.transaction.RollbackException;
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
+import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.Date;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.Date;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+import org.apache.openejb.util.LogCategory;
+import org.apache.openejb.util.Logger;
 
 public class MemoryTimerStore implements TimerStore {
     private static final Logger log = Logger.getInstance(LogCategory.TIMER, 
"org.apache.openejb.util.resources");
     private final Map<Long,TimerData> taskStore = new 
ConcurrentHashMap<Long,TimerData>();
-    private final Map<Transaction,TimerDataView> tasksByTransaction = new 
HashMap<Transaction, TimerDataView>();
+    private final Map<Transaction,TimerDataView> tasksByTransaction = new 
ConcurrentHashMap<Transaction, TimerDataView>();
     private final AtomicLong counter = new AtomicLong(0);
 
     private final TransactionManager transactionManager;
@@ -75,7 +78,7 @@ public class MemoryTimerStore implements
         Collection<TimerData> timerDatas = new 
ArrayList<TimerData>(tasks.getTasks().values());
         return timerDatas;
     }
-    
+
     // used to re-register a TimerData, if a cancel() is rolledback...
     public void addTimerData(TimerData timerData) throws TimerStoreException {
         getTasks().addTimerData(timerData);
@@ -145,71 +148,78 @@ public class MemoryTimerStore implements
     }
 
     private class TxTimerDataView implements Synchronization, TimerDataView {
-        private final Map<Long,TimerData> tasks;
         private final Map<Long,TimerData> add = new TreeMap<Long,TimerData>();
         private final Set<Long> remove = new TreeSet<Long>();
-
+        private final Lock lock = new ReentrantLock();
+        private final RuntimeException concurentException;
+        private final WeakReference<Transaction> tansactionReference;
+
+        /**
+         * This class is not designed to be multi-treaded under the assumption
+         * that transactions are single-threaded and this view is only supposed
+         * to be used within the transaction for which it was created.
+         *
+         * @param transaction
+         * @throws TimerStoreException
+         */
         public TxTimerDataView(Transaction transaction) throws 
TimerStoreException {
+            // We're going to lock this timer inside this transaction and 
essentially
+            // never let it go.  Any other threads attempting to invoke this 
object
+            // will immediately throw an exception.
+            lock.lock();
+            concurentException = new IllegalThreadStateException("Object can 
only be invoked by Thread[" + Thread.currentThread().getName() + "] in 
Transaction[" + transaction + "]");
+            concurentException.fillInStackTrace();
             try {
                 transaction.registerSynchronization(this);
+                tansactionReference = new 
WeakReference<Transaction>(transaction);
             } catch (RollbackException e) {
                 throw new TimerStoreException("Transaction has been rolled 
back");
             } catch (SystemException e) {
                 throw new TimerStoreException("Error registering transaction 
synchronization callback");
             }
-            this.tasks = new TreeMap<Long,TimerData>(taskStore);
+        }
+
+        private void checkThread() {
+            if (!lock.tryLock()) throw new IllegalStateException("Illegal 
access by Thread[" + Thread.currentThread().getName() + "]", 
concurentException);
         }
 
         public Map<Long,TimerData> getTasks() {
-            return Collections.unmodifiableMap(tasks);
+            checkThread();
+            TreeMap<Long, TimerData> allTasks = new TreeMap<Long, TimerData>();
+            allTasks.putAll(taskStore);
+            for (Long key : remove) allTasks.remove(key);
+            allTasks.putAll(add);
+            return Collections.unmodifiableMap(allTasks);
         }
 
         public void addTimerData(TimerData timerData) {
+            checkThread();
             Long timerId = new Long(timerData.getId());
 
-            // if it was previously removed...
-            if (remove.contains(timerId)) {
-                // remove it from the remove set
-                remove.remove(timerId);
-                // put the work back into the current tasks set
-                tasks.put(timerId, timerData);
-
-            } else {
-                // if it is not in the current tasks
-                if (!tasks.containsKey(timerId)) {
-                    // put it in the add set
-                    add.put(timerId, timerData);
-
-                    // put the work into the current tasks set
-                    tasks.put(timerId, timerData);
-                }
-            }
+            // remove it from the remove set, if it is there
+            remove.remove(timerId);
+
+            // put it in the add set
+            add.put(timerId, timerData);
         }
 
         public void removeTimerData(Long timerId) {
-            // if it was previously added...
-            if (add.containsKey(timerId)) {
-                // remove it from the add set
-                add.remove(timerId);
-                // re-remove the work from the current tasks set
-                tasks.remove(timerId);
-
-            } else {
-                // if it is in the current tasks
-                if (tasks.containsKey(timerId)) {
-                    // add it in the remove set
-                    remove.add(timerId);
-
-                    // remove the work from the current tasks set
-                    tasks.remove(timerId);
-                }
-            }
+            checkThread();
+
+            // remove it from the add set, if it is there
+            add.remove(timerId);
+
+            // add it in the remove set
+            remove.add(timerId);
         }
 
         public void beforeCompletion() {
+            checkThread();
         }
 
         public void afterCompletion(int status) {
+            checkThread();
+
             // if the tx was not committed, there is nothign to update
             if (status != Status.STATUS_COMMITTED) return;
 
@@ -219,6 +229,7 @@ public class MemoryTimerStore implements
             // remove work
             taskStore.keySet().removeAll(remove);
 
+            tasksByTransaction.remove(tansactionReference.get());
         }
     }
 }


Reply via email to