Author: j16sdiz
Date: 2008-06-04 08:54:39 +0000 (Wed, 04 Jun 2008)
New Revision: 20194

Modified:
   
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
Log:
lock all offset of an entry atomically
(resize would deadlock, will fix in later commit)


Modified: 
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
===================================================================
--- 
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java  
    2008-06-04 08:54:12 UTC (rev 20193)
+++ 
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java  
    2008-06-04 08:54:39 UTC (rev 20194)
@@ -15,6 +15,9 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -50,7 +53,7 @@
        private boolean syncBloom = true;
        private BloomFilter bloomFilter;

-       private static final boolean logLOCK = false;
+       private static final boolean logLOCK = true;
        private static boolean logMINOR;
        private static boolean logDEBUG;

@@ -116,6 +119,13 @@

                configLock.readLock().lock();
                try {
+                       boolean locked = lockPlainKey(routingKey);
+                       if (!locked) {
+                               if (logDEBUG)
+                                       Logger.debug(this, "cannot lock key: " 
+ HexUtil.bytesToHex(routingKey) + ", shutting down?");
+                               return null;
+                       }
+                       try {
                        Entry entry = probeEntry(routingKey);

                        if (entry == null) {
@@ -123,8 +133,6 @@
                                return null;
                        }

-                       unlockEntry(entry.curOffset);
-
                        try {
                                StorableBlock block = 
entry.getStorableBlock(routingKey, fullKey);
                                hits.incrementAndGet();
@@ -137,13 +145,16 @@
                                return null;
                        }
                } finally {
+                               unlockPlainKey(routingKey);
+                       }
+               } finally {
                        configLock.readLock().unlock();
                }
        }

        /**
-        * Find and lock an entry with a specific routing key. <strong>You have 
to unlock the entry
-        * explicitly yourself!</strong>
+        * Find and lock an entry with a specific routing key. This function 
would <strong>not</strong>
+        * lock the entries.
         *
         * @param routingKey
         * @return <code>Entry</code> object
@@ -172,10 +183,6 @@
                        if (logDEBUG)
                                Logger.debug(this, "probing for i=" + i + ", 
offset=" + offset[i]);

-                       if (!lockEntry(offset[i])) {
-                               Logger.error(this, "can't lock entry: " + 
offset[i]);
-                               continue;
-                       }
                        try {
                                entry = readEntry(offset[i], routingKey);
                                if (entry != null)
@@ -184,9 +191,6 @@
                                // may occur on resize, silent it a bit
                                Logger.error(this, "EOFException on 
probeEntry", e);
                                continue;
-                       } finally {
-                               if (entry == null)
-                                       unlockEntry(offset[i]);
                        }
                }
                return null;
@@ -199,12 +203,18 @@

                configLock.readLock().lock();
                try {
+                       boolean locked = lockPlainKey(routingKey);
+                       if (!locked) {
+                               if (logDEBUG)
+                                       Logger.debug(this, "cannot lock key: " 
+ HexUtil.bytesToHex(routingKey) + ", shutting down?");
+                               return;
+                       }
+                       try {
                        // don't use fetch(), as fetch() would do a miss++/hit++
                        Entry oldEntry = probeEntry(routingKey);
                        if (oldEntry != null) {
                                long oldOffset = oldEntry.curOffset;
                                try {
-                                       try {
                                                StorableBlock oldBlock = 
oldEntry.getStorableBlock(routingKey, fullKey);
                                                if (!collisionPossible)
                                                        return;
@@ -225,20 +235,12 @@
                                        writeEntry(entry, oldOffset);
                                        writes.incrementAndGet();
                                        return;
-                               } finally {
-                                       unlockEntry(oldOffset);
-                               }
                        }

                        Entry entry = new Entry(routingKey, header, data);
                        long[] offset = entry.getOffset();

                        for (int i = 0; i < offset.length; i++) {
-                               if (!lockEntry(offset[i])) {
-                                       Logger.error(this, "can't lock entry: " 
+ offset[i]);
-                                       return;
-                               }
-                               try {
                                        if (isFree(offset[i])) {
                                                // write to free block
                                                if (logDEBUG)
@@ -250,17 +252,9 @@
                                                keyCount.incrementAndGet();
                                                return;
                                        }
-                               } finally {
-                                       unlockEntry(offset[i]);
-                               }
                        }

                        // no free blocks, overwrite the first one
-                       if (!lockEntry(offset[0])) {
-                               Logger.error(this, "can't lock entry: " + 
offset[0]);
-                               return;
-                       }
-                       try {
                                if (logDEBUG)
                                        Logger.debug(this, "collision, write to 
i=0, offset=" + offset[0]);
                                if (updateBloom)
@@ -268,7 +262,7 @@
                                writeEntry(entry, offset[0]);
                                writes.incrementAndGet();
                        } finally {
-                               unlockEntry(offset[0]);
+                               unlockPlainKey(routingKey);
                        }
                } finally {
                        configLock.readLock().unlock();
@@ -1211,17 +1205,88 @@
         */
        private void unlockEntry(long offset) {
                if (logDEBUG && logLOCK)
-                       Logger.debug(this, "unlocking " + offset);
+                       Logger.debug(this, "unlocking " + offset, new 
Exception("debug"));

                entryLock.lock();
                try {
                        Condition cond = lockMap.remove(offset);
+                       assert cond != null;
                        cond.signal();
                } finally {
                        entryLock.unlock();
                }
        }

+       /**
+        * Lock all possible offsets of a key. This method would release the 
locks if any locking
+        * operation failed.
+        * 
+        * @param plainKey
+        * @return <code>true</code> if all the offsets are locked.
+        */
+       private boolean lockPlainKey(byte[] plainKey) {
+               return lockDigestedKey(getDigestedRoutingKey(plainKey));
+       }
+
+       private void unlockPlainKey(byte[] plainKey) {
+               unlockDigestedKey(getDigestedRoutingKey(plainKey));
+       }
+
+       /**
+        * Lock all possible offsets of a key. This method would release the 
locks if any locking
+        * operation failed.
+        * 
+        * @param digestedKey
+        * @return <code>true</code> if all the offsets are locked.
+        */
+       private boolean lockDigestedKey(byte[] digestedKey) {
+               // use a set to prevent duplicated offsets,
+               // a sorted set to prevent deadlocks
+               SortedSet<Long> offsets = new TreeSet<Long>();
+               long[] offsetArray = getOffsetFromDigestedKey(digestedKey, 
storeSize);
+               for (long offset : offsetArray)
+                       offsets.add(offset);
+               if (prevStoreSize != 0) {
+                       offsetArray = getOffsetFromDigestedKey(digestedKey, 
prevStoreSize);
+                       for (long offset : offsetArray)
+                               offsets.add(offset);
+               }
+
+               Set<Long> locked = new TreeSet<Long>();
+               for (long offset : offsets) {
+                       boolean status = lockEntry(offset);
+                       if (!status)
+                               break;
+                       locked.add(offset);
+               }
+
+               if (locked.size() == offsets.size()) {
+                       return true;
+               } else {
+                       // failed, remove the locks
+                       for (long offset : offsets)
+                               unlockEntry(offset);
+                       return false;
+               }
+       }
+
+       private void unlockDigestedKey(byte[] digestedKey) {
+               // use a set to prevent duplicated offsets
+               SortedSet<Long> offsets = new TreeSet<Long>();
+               long[] offsetArray = getOffsetFromDigestedKey(digestedKey, 
storeSize);
+               for (long offset : offsetArray)
+                       offsets.add(offset);
+               if (prevStoreSize != 0) {
+                       offsetArray = getOffsetFromDigestedKey(digestedKey, 
prevStoreSize);
+                       for (long offset : offsetArray)
+                               offsets.add(offset);
+               }
+
+               for (long offset : offsets) {
+                       unlockEntry(offset);
+               }
+       }
+
        public class ShutdownDB implements Runnable {
                public void run() {
                        shutdown = true;


Reply via email to