Author: ozeigermann
Date: Mon Jul 23 10:40:10 2007
New Revision: 558810

URL: http://svn.apache.org/viewvc?view=rev&rev=558810
Log:
Added first version of lock manager that supports deadlock detection. 
This has not been properly tested and still misses reasonable thread-safety 
protection.

Modified:
    
jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/LockException.java
    
jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/RWLockManager.java

Modified: 
jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/LockException.java
URL: 
http://svn.apache.org/viewvc/jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/LockException.java?view=diff&rev=558810&r1=558809&r2=558810
==============================================================================
--- 
jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/LockException.java
 (original)
+++ 
jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/LockException.java
 Mon Jul 23 10:40:10 2007
@@ -43,7 +43,7 @@
         /**
          * Locking request canceled because of deadlock.
          */
-        DEADLOCK_VICTIM,
+        WOULD_DEADLOCK,
         
         /**
          * A conflict between two optimistic transactions occured.

Modified: 
jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/RWLockManager.java
URL: 
http://svn.apache.org/viewvc/jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/RWLockManager.java?view=diff&rev=558810&r1=558809&r2=558810
==============================================================================
--- 
jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/RWLockManager.java
 (original)
+++ 
jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/RWLockManager.java
 Mon Jul 23 10:40:10 2007
@@ -16,10 +16,13 @@
  */
 package org.apache.commons.transaction.locking;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -31,45 +34,15 @@
 
     protected ConcurrentHashMap<KeyEntry<K, M>, ReadWriteLock> locks = new 
ConcurrentHashMap<KeyEntry<K, M>, ReadWriteLock>();
 
-    protected Map<Thread, Set<Lock>> locksForThreads = new 
ConcurrentHashMap<Thread, Set<Lock>>();
+    protected Map<Thread, CopyOnWriteArraySet<Lock>> locksForThreads = new 
ConcurrentHashMap<Thread, CopyOnWriteArraySet<Lock>>();
 
-    protected Map<ReadWriteLock, Set<Thread>> threadsForLocks = new 
ConcurrentHashMap<ReadWriteLock, Set<Thread>>();
+    protected ConcurrentHashMap<Lock, Set<Thread>> threadsForLocks = new 
ConcurrentHashMap<Lock, Set<Thread>>();
 
     protected Map<Thread, Long> effectiveGlobalTimeouts = new 
ConcurrentHashMap<Thread, Long>();
 
-    // TODO
-    public Iterable<ReadWriteLock> orderLocks() {
-        Set<Lock> locks = locksForThreads.get(Thread.currentThread());
-        if (locks == null) {
-            throw new IllegalStateException("lock() can only be called after 
startWork()");
-        }
-
-        return null;
-
-    }
-
     @Override
     public void endWork() {
-        Set<Lock> locks = locksForThreads.get(Thread.currentThread());
-        // graceful reaction...
-        if (locks == null) {
-            return;
-        }
-        for (Lock lock : locks) {
-            lock.unlock();
-
-            // FIXME: We need to do this atomically
-            Set<Thread> threadsForThisLock = threadsForLocks.get(lock);
-            if (threadsForThisLock != null) {
-                threadsForThisLock.remove(Thread.currentThread());
-                if (threadsForThisLock.isEmpty()) {
-                    threadsForLocks.remove(lock);
-                    locks.remove(lock);
-                }
-            }
-        }
-
-        locksForThreads.remove(Thread.currentThread());
+        release(Thread.currentThread());
     }
 
     @Override
@@ -77,7 +50,7 @@
         if (isWorking()) {
             throw new IllegalStateException("work has already been started");
         }
-        locksForThreads.put(Thread.currentThread(), new HashSet<Lock>());
+        locksForThreads.put(Thread.currentThread(), new 
CopyOnWriteArraySet<Lock>());
 
         long timeoutMSecs = unit.toMillis(timeout);
         long now = System.currentTimeMillis();
@@ -91,8 +64,8 @@
 
     }
 
-    protected long computeRemainingTime() {
-        long timeout = effectiveGlobalTimeouts.get(Thread.currentThread());
+    protected long computeRemainingTime(Thread thread) {
+        long timeout = effectiveGlobalTimeouts.get(thread);
         long now = System.currentTimeMillis();
         long remaining = timeout - now;
         return remaining;
@@ -147,10 +120,8 @@
 
     @Override
     public void lock(M managedResource, K key, boolean exclusive) throws 
LockException {
-        long remainingTime = computeRemainingTime();
-        if (remainingTime < 0) {
-            throw new LockException(LockException.Code.TIMED_OUT);
-        }
+        long remainingTime = computeRemainingTime(Thread.currentThread());
+
         boolean locked = tryLockInternal(managedResource, key, exclusive, 
remainingTime,
                 TimeUnit.MILLISECONDS);
         if (!locked) {
@@ -165,6 +136,8 @@
 
     protected boolean tryLockInternal(M managedResource, K key, boolean 
exclusive, long time,
             TimeUnit unit) throws LockException {
+        reportTimeout(Thread.currentThread());
+
         KeyEntry<K, M> entry = new KeyEntry<K, M>(key, managedResource);
         ReadWriteLock rwlock = putIfAbsent(entry, create());
         Set<Lock> locks = locksForThreads.get(Thread.currentThread());
@@ -174,20 +147,171 @@
 
         Lock lock = exclusive ? rwlock.writeLock() : rwlock.readLock();
 
+        boolean locked;
+        if (time == 0) {
+            locked = lock.tryLock();
+        } else {
+            locked = 
doTrickyYetEfficientLockOnlyIfThisCanNotCauseADeadlock(lock, 
unit.toMillis(time));
+        }
+        if (locked) {
+            locks.add(lock);
+            Set<Thread> threads = threadsForLocks.get(lock);
+            if (threads == null) {
+                threads = new HashSet<Thread>();
+                Set<Thread> concurrentlyInsertedThreads = threadsForLocks
+                        .putIfAbsent(lock, threads);
+                if (concurrentlyInsertedThreads != null)
+                    threads = concurrentlyInsertedThreads;
+            }
+            threads.add(Thread.currentThread());
+        }
+        return locked;
+    }
+
+    protected boolean 
doTrickyYetEfficientLockOnlyIfThisCanNotCauseADeadlock(Lock lock,
+            long timeMsecs) throws LockException {
+
+        // This algorithm is devided into three parts:
+        // Note: We can be interrupted most of the time
+        //
+        // I prewait:
+        // Wait a fraktion of the time to see if we can acquire
+        // the lock in short time. If we can all is good and we exit
+        // signalling success. If not we need to get into a more resource
+        // consuming phase.
+        //
+        // II clearing of timed out thtreads / deadlock detection:
+        // As we have not been able to acquire the lock, yet, maybe there is
+        // deadlock. Clear all threads already timed out and afterwards
+        // check for a deadlock state. If there is one report it with an
+        // exception. If not we enter the final phase.
+        // 
+        // III real wait:
+        // Everything is under control, we were just a little bit too
+        // impatient. So wait for the remaining time and see if the can get
+        // the lock
+        // 
+
         try {
             boolean locked;
-            if (time == 0) {
-                locked = lock.tryLock();
-            } else {
-                locked = lock.tryLock(time, unit);
-            }
-            if (locked) {
-                locks.add(lock);
+
+            // I prewait
+
+            long startTime = System.currentTimeMillis();
+
+            // TODO this heuristic devisor really should be configurable
+            long preWaitTime = timeMsecs / 5;
+            locked = lock.tryLock(preWaitTime, TimeUnit.MILLISECONDS);
+            if (locked)
+                return true;
+
+            // II deadlock detect
+            cancelAllTimedOut();
+            if (wouldDeadlock(Thread.currentThread(), new HashSet<Thread>())) {
+                throw new LockException(LockException.Code.WOULD_DEADLOCK);
             }
+
+            // III real wait
+            long now = System.currentTimeMillis();
+            long remainingWaitTime = timeMsecs - (now - startTime);
+            if (remainingWaitTime < 0)
+                return false;
+
+            locked = lock.tryLock(remainingWaitTime, TimeUnit.MILLISECONDS);
             return locked;
         } catch (InterruptedException e) {
-            throw new LockException(Code.INTERRUPTED, key);
+            throw new LockException(Code.INTERRUPTED);
         }
+
+    }
+
+    protected boolean wouldDeadlock(Thread thread, Set<Thread> path) {
+        path.add(thread);
+        // these are our locks
+        // Note: No need to make a copy as we can be sure to iterate on our
+        // private
+        // version, as this is a CopyOnWriteArraySet!
+        CopyOnWriteArraySet<Lock> locks = locksForThreads.get(thread);
+        for (Lock lock : locks) {
+            // these are the ones waiting for one of our locks
+            // and if they wait, they wait because of me!
+            Collection<Thread> conflicts = 
getConflictingWaiters((ReentrantReadWriteLock) lock);
+            for (Thread conflictThread : conflicts) {
+                // this means, we have found a cycle in the wait graph
+                if (path.contains(conflictThread)) {
+                    return true;
+                } else if (wouldDeadlock(conflictThread, path)) {
+                    return true;
+                }
+            }
+        }
+
+        path.remove(thread);
+        return false;
+    }
+
+    protected Collection<Thread> getConflictingWaiters(ReentrantReadWriteLock 
lock) {
+        Collection<Thread> result = new ArrayList<Thread>();
+        // Consider every thread that holds at least one lock!
+        // Caution: We can not use "threadsForLocks" as the waiting threads
+        // have not yet acquired the lock and thus are not part of the map.
+        // An alternative algorithm could also remember the threads waiting for
+        // a lock
+        Collection<Thread> threadsWithLocks = locksForThreads.keySet();
+        for (Thread thread : threadsWithLocks) {
+            if (lock.hasQueuedThread(thread)) {
+                result.add(thread);
+            }
+        }
+        return result;
+    }
+
+    protected void reportTimeout(Thread thread) throws LockException {
+        if (hasTimedOut(thread)) {
+            throw new LockException(LockException.Code.TIMED_OUT);
+        }
+    }
+
+    protected void cancelAllTimedOut() {
+        Set<Thread> threads = effectiveGlobalTimeouts.keySet();
+        for (Thread thread : threads) {
+            if (hasTimedOut(thread)) {
+                // TODO: We need to record this thread has timed out to produce
+                // a meaningful exception when it tries to continue its work
+                release(thread);
+                thread.interrupt();
+            }
+
+        }
+    }
+
+    protected boolean hasTimedOut(Thread thread) {
+        long remainingTime = computeRemainingTime(thread);
+        return (remainingTime < 0);
+
+    }
+
+    protected void release(Thread thread) {
+        Set<Lock> locks = locksForThreads.get(thread);
+        // graceful reaction...
+        if (locks == null) {
+            return;
+        }
+        for (Lock lock : locks) {
+            lock.unlock();
+
+            // FIXME: We need to do this atomically
+            Set<Thread> threadsForThisLock = threadsForLocks.get(lock);
+            if (threadsForThisLock != null) {
+                threadsForThisLock.remove(Thread.currentThread());
+                if (threadsForThisLock.isEmpty()) {
+                    threadsForLocks.remove(lock);
+                    locks.remove(lock);
+                }
+            }
+        }
+
+        locksForThreads.remove(thread);
     }
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to