Author: ozeigermann Date: Mon Jul 23 10:40:10 2007 New Revision: 558810 URL: http://svn.apache.org/viewvc?view=rev&rev=558810 Log: Added first version of lock manager that supports deadlock detection. This has not been properly tested and still misses reasonable thread-safety protection.
Modified: jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/LockException.java jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/RWLockManager.java Modified: jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/LockException.java URL: http://svn.apache.org/viewvc/jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/LockException.java?view=diff&rev=558810&r1=558809&r2=558810 ============================================================================== --- jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/LockException.java (original) +++ jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/LockException.java Mon Jul 23 10:40:10 2007 @@ -43,7 +43,7 @@ /** * Locking request canceled because of deadlock. */ - DEADLOCK_VICTIM, + WOULD_DEADLOCK, /** * A conflict between two optimistic transactions occured. Modified: jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/RWLockManager.java URL: http://svn.apache.org/viewvc/jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/RWLockManager.java?view=diff&rev=558810&r1=558809&r2=558810 ============================================================================== --- jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/RWLockManager.java (original) +++ jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/RWLockManager.java Mon Jul 23 10:40:10 2007 @@ -16,10 +16,13 @@ */ package org.apache.commons.transaction.locking; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -31,45 +34,15 @@ protected ConcurrentHashMap<KeyEntry<K, M>, ReadWriteLock> locks = new ConcurrentHashMap<KeyEntry<K, M>, ReadWriteLock>(); - protected Map<Thread, Set<Lock>> locksForThreads = new ConcurrentHashMap<Thread, Set<Lock>>(); + protected Map<Thread, CopyOnWriteArraySet<Lock>> locksForThreads = new ConcurrentHashMap<Thread, CopyOnWriteArraySet<Lock>>(); - protected Map<ReadWriteLock, Set<Thread>> threadsForLocks = new ConcurrentHashMap<ReadWriteLock, Set<Thread>>(); + protected ConcurrentHashMap<Lock, Set<Thread>> threadsForLocks = new ConcurrentHashMap<Lock, Set<Thread>>(); protected Map<Thread, Long> effectiveGlobalTimeouts = new ConcurrentHashMap<Thread, Long>(); - // TODO - public Iterable<ReadWriteLock> orderLocks() { - Set<Lock> locks = locksForThreads.get(Thread.currentThread()); - if (locks == null) { - throw new IllegalStateException("lock() can only be called after startWork()"); - } - - return null; - - } - @Override public void endWork() { - Set<Lock> locks = locksForThreads.get(Thread.currentThread()); - // graceful reaction... - if (locks == null) { - return; - } - for (Lock lock : locks) { - lock.unlock(); - - // FIXME: We need to do this atomically - Set<Thread> threadsForThisLock = threadsForLocks.get(lock); - if (threadsForThisLock != null) { - threadsForThisLock.remove(Thread.currentThread()); - if (threadsForThisLock.isEmpty()) { - threadsForLocks.remove(lock); - locks.remove(lock); - } - } - } - - locksForThreads.remove(Thread.currentThread()); + release(Thread.currentThread()); } @Override @@ -77,7 +50,7 @@ if (isWorking()) { throw new IllegalStateException("work has already been started"); } - locksForThreads.put(Thread.currentThread(), new HashSet<Lock>()); + locksForThreads.put(Thread.currentThread(), new CopyOnWriteArraySet<Lock>()); long timeoutMSecs = unit.toMillis(timeout); long now = System.currentTimeMillis(); @@ -91,8 +64,8 @@ } - protected long computeRemainingTime() { - long timeout = effectiveGlobalTimeouts.get(Thread.currentThread()); + protected long computeRemainingTime(Thread thread) { + long timeout = effectiveGlobalTimeouts.get(thread); long now = System.currentTimeMillis(); long remaining = timeout - now; return remaining; @@ -147,10 +120,8 @@ @Override public void lock(M managedResource, K key, boolean exclusive) throws LockException { - long remainingTime = computeRemainingTime(); - if (remainingTime < 0) { - throw new LockException(LockException.Code.TIMED_OUT); - } + long remainingTime = computeRemainingTime(Thread.currentThread()); + boolean locked = tryLockInternal(managedResource, key, exclusive, remainingTime, TimeUnit.MILLISECONDS); if (!locked) { @@ -165,6 +136,8 @@ protected boolean tryLockInternal(M managedResource, K key, boolean exclusive, long time, TimeUnit unit) throws LockException { + reportTimeout(Thread.currentThread()); + KeyEntry<K, M> entry = new KeyEntry<K, M>(key, managedResource); ReadWriteLock rwlock = putIfAbsent(entry, create()); Set<Lock> locks = locksForThreads.get(Thread.currentThread()); @@ -174,20 +147,171 @@ Lock lock = exclusive ? rwlock.writeLock() : rwlock.readLock(); + boolean locked; + if (time == 0) { + locked = lock.tryLock(); + } else { + locked = doTrickyYetEfficientLockOnlyIfThisCanNotCauseADeadlock(lock, unit.toMillis(time)); + } + if (locked) { + locks.add(lock); + Set<Thread> threads = threadsForLocks.get(lock); + if (threads == null) { + threads = new HashSet<Thread>(); + Set<Thread> concurrentlyInsertedThreads = threadsForLocks + .putIfAbsent(lock, threads); + if (concurrentlyInsertedThreads != null) + threads = concurrentlyInsertedThreads; + } + threads.add(Thread.currentThread()); + } + return locked; + } + + protected boolean doTrickyYetEfficientLockOnlyIfThisCanNotCauseADeadlock(Lock lock, + long timeMsecs) throws LockException { + + // This algorithm is devided into three parts: + // Note: We can be interrupted most of the time + // + // I prewait: + // Wait a fraktion of the time to see if we can acquire + // the lock in short time. If we can all is good and we exit + // signalling success. If not we need to get into a more resource + // consuming phase. + // + // II clearing of timed out thtreads / deadlock detection: + // As we have not been able to acquire the lock, yet, maybe there is + // deadlock. Clear all threads already timed out and afterwards + // check for a deadlock state. If there is one report it with an + // exception. If not we enter the final phase. + // + // III real wait: + // Everything is under control, we were just a little bit too + // impatient. So wait for the remaining time and see if the can get + // the lock + // + try { boolean locked; - if (time == 0) { - locked = lock.tryLock(); - } else { - locked = lock.tryLock(time, unit); - } - if (locked) { - locks.add(lock); + + // I prewait + + long startTime = System.currentTimeMillis(); + + // TODO this heuristic devisor really should be configurable + long preWaitTime = timeMsecs / 5; + locked = lock.tryLock(preWaitTime, TimeUnit.MILLISECONDS); + if (locked) + return true; + + // II deadlock detect + cancelAllTimedOut(); + if (wouldDeadlock(Thread.currentThread(), new HashSet<Thread>())) { + throw new LockException(LockException.Code.WOULD_DEADLOCK); } + + // III real wait + long now = System.currentTimeMillis(); + long remainingWaitTime = timeMsecs - (now - startTime); + if (remainingWaitTime < 0) + return false; + + locked = lock.tryLock(remainingWaitTime, TimeUnit.MILLISECONDS); return locked; } catch (InterruptedException e) { - throw new LockException(Code.INTERRUPTED, key); + throw new LockException(Code.INTERRUPTED); } + + } + + protected boolean wouldDeadlock(Thread thread, Set<Thread> path) { + path.add(thread); + // these are our locks + // Note: No need to make a copy as we can be sure to iterate on our + // private + // version, as this is a CopyOnWriteArraySet! + CopyOnWriteArraySet<Lock> locks = locksForThreads.get(thread); + for (Lock lock : locks) { + // these are the ones waiting for one of our locks + // and if they wait, they wait because of me! + Collection<Thread> conflicts = getConflictingWaiters((ReentrantReadWriteLock) lock); + for (Thread conflictThread : conflicts) { + // this means, we have found a cycle in the wait graph + if (path.contains(conflictThread)) { + return true; + } else if (wouldDeadlock(conflictThread, path)) { + return true; + } + } + } + + path.remove(thread); + return false; + } + + protected Collection<Thread> getConflictingWaiters(ReentrantReadWriteLock lock) { + Collection<Thread> result = new ArrayList<Thread>(); + // Consider every thread that holds at least one lock! + // Caution: We can not use "threadsForLocks" as the waiting threads + // have not yet acquired the lock and thus are not part of the map. + // An alternative algorithm could also remember the threads waiting for + // a lock + Collection<Thread> threadsWithLocks = locksForThreads.keySet(); + for (Thread thread : threadsWithLocks) { + if (lock.hasQueuedThread(thread)) { + result.add(thread); + } + } + return result; + } + + protected void reportTimeout(Thread thread) throws LockException { + if (hasTimedOut(thread)) { + throw new LockException(LockException.Code.TIMED_OUT); + } + } + + protected void cancelAllTimedOut() { + Set<Thread> threads = effectiveGlobalTimeouts.keySet(); + for (Thread thread : threads) { + if (hasTimedOut(thread)) { + // TODO: We need to record this thread has timed out to produce + // a meaningful exception when it tries to continue its work + release(thread); + thread.interrupt(); + } + + } + } + + protected boolean hasTimedOut(Thread thread) { + long remainingTime = computeRemainingTime(thread); + return (remainingTime < 0); + + } + + protected void release(Thread thread) { + Set<Lock> locks = locksForThreads.get(thread); + // graceful reaction... + if (locks == null) { + return; + } + for (Lock lock : locks) { + lock.unlock(); + + // FIXME: We need to do this atomically + Set<Thread> threadsForThisLock = threadsForLocks.get(lock); + if (threadsForThisLock != null) { + threadsForThisLock.remove(Thread.currentThread()); + if (threadsForThisLock.isEmpty()) { + threadsForLocks.remove(lock); + locks.remove(lock); + } + } + } + + locksForThreads.remove(thread); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]