Author: dblevins
Date: Wed May 5 23:09:02 2010
New Revision: 941527
URL: http://svn.apache.org/viewvc?rev=941527&view=rev
Log:
Wonderful patch from Ivan, OPENEJB-1246, that really tightens up the thread
safety of the MemoryTimerStore
Thanks, Ivan!
Modified:
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/timer/MemoryTimerStore.java
Modified:
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/timer/MemoryTimerStore.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/timer/MemoryTimerStore.java?rev=941527&r1=941526&r2=941527&view=diff
==============================================================================
---
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/timer/MemoryTimerStore.java
(original)
+++
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/timer/MemoryTimerStore.java
Wed May 5 23:09:02 2010
@@ -17,31 +17,34 @@
package org.apache.openejb.core.timer;
-import org.apache.openejb.util.LogCategory;
-import org.apache.openejb.util.Logger;
-
-import javax.transaction.RollbackException;
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
+import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.Date;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+import org.apache.openejb.util.LogCategory;
+import org.apache.openejb.util.Logger;
public class MemoryTimerStore implements TimerStore {
private static final Logger log = Logger.getInstance(LogCategory.TIMER,
"org.apache.openejb.util.resources");
private final Map<Long,TimerData> taskStore = new
ConcurrentHashMap<Long,TimerData>();
- private final Map<Transaction,TimerDataView> tasksByTransaction = new
HashMap<Transaction, TimerDataView>();
+ private final Map<Transaction,TimerDataView> tasksByTransaction = new
ConcurrentHashMap<Transaction, TimerDataView>();
private final AtomicLong counter = new AtomicLong(0);
private final TransactionManager transactionManager;
@@ -75,7 +78,7 @@ public class MemoryTimerStore implements
Collection<TimerData> timerDatas = new
ArrayList<TimerData>(tasks.getTasks().values());
return timerDatas;
}
-
+
// used to re-register a TimerData, if a cancel() is rolledback...
public void addTimerData(TimerData timerData) throws TimerStoreException {
getTasks().addTimerData(timerData);
@@ -145,71 +148,78 @@ public class MemoryTimerStore implements
}
private class TxTimerDataView implements Synchronization, TimerDataView {
- private final Map<Long,TimerData> tasks;
private final Map<Long,TimerData> add = new TreeMap<Long,TimerData>();
private final Set<Long> remove = new TreeSet<Long>();
-
+ private final Lock lock = new ReentrantLock();
+ private final RuntimeException concurentException;
+ private final WeakReference<Transaction> tansactionReference;
+
+ /**
+ * This class is not designed to be multi-treaded under the assumption
+ * that transactions are single-threaded and this view is only supposed
+ * to be used within the transaction for which it was created.
+ *
+ * @param transaction
+ * @throws TimerStoreException
+ */
public TxTimerDataView(Transaction transaction) throws
TimerStoreException {
+ // We're going to lock this timer inside this transaction and
essentially
+ // never let it go. Any other threads attempting to invoke this
object
+ // will immediately throw an exception.
+ lock.lock();
+ concurentException = new IllegalThreadStateException("Object can
only be invoked by Thread[" + Thread.currentThread().getName() + "] in
Transaction[" + transaction + "]");
+ concurentException.fillInStackTrace();
try {
transaction.registerSynchronization(this);
+ tansactionReference = new
WeakReference<Transaction>(transaction);
} catch (RollbackException e) {
throw new TimerStoreException("Transaction has been rolled
back");
} catch (SystemException e) {
throw new TimerStoreException("Error registering transaction
synchronization callback");
}
- this.tasks = new TreeMap<Long,TimerData>(taskStore);
+ }
+
+ private void checkThread() {
+ if (!lock.tryLock()) throw new IllegalStateException("Illegal
access by Thread[" + Thread.currentThread().getName() + "]",
concurentException);
}
public Map<Long,TimerData> getTasks() {
- return Collections.unmodifiableMap(tasks);
+ checkThread();
+ TreeMap<Long, TimerData> allTasks = new TreeMap<Long, TimerData>();
+ allTasks.putAll(taskStore);
+ for (Long key : remove) allTasks.remove(key);
+ allTasks.putAll(add);
+ return Collections.unmodifiableMap(allTasks);
}
public void addTimerData(TimerData timerData) {
+ checkThread();
Long timerId = new Long(timerData.getId());
- // if it was previously removed...
- if (remove.contains(timerId)) {
- // remove it from the remove set
- remove.remove(timerId);
- // put the work back into the current tasks set
- tasks.put(timerId, timerData);
-
- } else {
- // if it is not in the current tasks
- if (!tasks.containsKey(timerId)) {
- // put it in the add set
- add.put(timerId, timerData);
-
- // put the work into the current tasks set
- tasks.put(timerId, timerData);
- }
- }
+ // remove it from the remove set, if it is there
+ remove.remove(timerId);
+
+ // put it in the add set
+ add.put(timerId, timerData);
}
public void removeTimerData(Long timerId) {
- // if it was previously added...
- if (add.containsKey(timerId)) {
- // remove it from the add set
- add.remove(timerId);
- // re-remove the work from the current tasks set
- tasks.remove(timerId);
-
- } else {
- // if it is in the current tasks
- if (tasks.containsKey(timerId)) {
- // add it in the remove set
- remove.add(timerId);
-
- // remove the work from the current tasks set
- tasks.remove(timerId);
- }
- }
+ checkThread();
+
+ // remove it from the add set, if it is there
+ add.remove(timerId);
+
+ // add it in the remove set
+ remove.add(timerId);
}
public void beforeCompletion() {
+ checkThread();
}
public void afterCompletion(int status) {
+ checkThread();
+
// if the tx was not committed, there is nothign to update
if (status != Status.STATUS_COMMITTED) return;
@@ -219,6 +229,7 @@ public class MemoryTimerStore implements
// remove work
taskStore.keySet().removeAll(remove);
+ tasksByTransaction.remove(tansactionReference.get());
}
}
}