Author: j16sdiz
Date: 2008-06-04 08:54:39 +0000 (Wed, 04 Jun 2008)
New Revision: 20194
Modified:
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
Log:
lock all offset of an entry atomically
(resize would deadlock, will fix in later commit)
Modified:
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
===================================================================
---
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
2008-06-04 08:54:12 UTC (rev 20193)
+++
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
2008-06-04 08:54:39 UTC (rev 20194)
@@ -15,6 +15,9 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -50,7 +53,7 @@
private boolean syncBloom = true;
private BloomFilter bloomFilter;
- private static final boolean logLOCK = false;
+ private static final boolean logLOCK = true;
private static boolean logMINOR;
private static boolean logDEBUG;
@@ -116,6 +119,13 @@
configLock.readLock().lock();
try {
+ boolean locked = lockPlainKey(routingKey);
+ if (!locked) {
+ if (logDEBUG)
+ Logger.debug(this, "cannot lock key: "
+ HexUtil.bytesToHex(routingKey) + ", shutting down?");
+ return null;
+ }
+ try {
Entry entry = probeEntry(routingKey);
if (entry == null) {
@@ -123,8 +133,6 @@
return null;
}
- unlockEntry(entry.curOffset);
-
try {
StorableBlock block =
entry.getStorableBlock(routingKey, fullKey);
hits.incrementAndGet();
@@ -137,13 +145,16 @@
return null;
}
} finally {
+ unlockPlainKey(routingKey);
+ }
+ } finally {
configLock.readLock().unlock();
}
}
/**
- * Find and lock an entry with a specific routing key. <strong>You have
to unlock the entry
- * explicitly yourself!</strong>
+ * Find and lock an entry with a specific routing key. This function
would <strong>not</strong>
+ * lock the entries.
*
* @param routingKey
* @return <code>Entry</code> object
@@ -172,10 +183,6 @@
if (logDEBUG)
Logger.debug(this, "probing for i=" + i + ",
offset=" + offset[i]);
- if (!lockEntry(offset[i])) {
- Logger.error(this, "can't lock entry: " +
offset[i]);
- continue;
- }
try {
entry = readEntry(offset[i], routingKey);
if (entry != null)
@@ -184,9 +191,6 @@
// may occur on resize, silent it a bit
Logger.error(this, "EOFException on
probeEntry", e);
continue;
- } finally {
- if (entry == null)
- unlockEntry(offset[i]);
}
}
return null;
@@ -199,12 +203,18 @@
configLock.readLock().lock();
try {
+ boolean locked = lockPlainKey(routingKey);
+ if (!locked) {
+ if (logDEBUG)
+ Logger.debug(this, "cannot lock key: "
+ HexUtil.bytesToHex(routingKey) + ", shutting down?");
+ return;
+ }
+ try {
// don't use fetch(), as fetch() would do a miss++/hit++
Entry oldEntry = probeEntry(routingKey);
if (oldEntry != null) {
long oldOffset = oldEntry.curOffset;
try {
- try {
StorableBlock oldBlock =
oldEntry.getStorableBlock(routingKey, fullKey);
if (!collisionPossible)
return;
@@ -225,20 +235,12 @@
writeEntry(entry, oldOffset);
writes.incrementAndGet();
return;
- } finally {
- unlockEntry(oldOffset);
- }
}
Entry entry = new Entry(routingKey, header, data);
long[] offset = entry.getOffset();
for (int i = 0; i < offset.length; i++) {
- if (!lockEntry(offset[i])) {
- Logger.error(this, "can't lock entry: "
+ offset[i]);
- return;
- }
- try {
if (isFree(offset[i])) {
// write to free block
if (logDEBUG)
@@ -250,17 +252,9 @@
keyCount.incrementAndGet();
return;
}
- } finally {
- unlockEntry(offset[i]);
- }
}
// no free blocks, overwrite the first one
- if (!lockEntry(offset[0])) {
- Logger.error(this, "can't lock entry: " +
offset[0]);
- return;
- }
- try {
if (logDEBUG)
Logger.debug(this, "collision, write to
i=0, offset=" + offset[0]);
if (updateBloom)
@@ -268,7 +262,7 @@
writeEntry(entry, offset[0]);
writes.incrementAndGet();
} finally {
- unlockEntry(offset[0]);
+ unlockPlainKey(routingKey);
}
} finally {
configLock.readLock().unlock();
@@ -1211,17 +1205,88 @@
*/
private void unlockEntry(long offset) {
if (logDEBUG && logLOCK)
- Logger.debug(this, "unlocking " + offset);
+ Logger.debug(this, "unlocking " + offset, new
Exception("debug"));
entryLock.lock();
try {
Condition cond = lockMap.remove(offset);
+ assert cond != null;
cond.signal();
} finally {
entryLock.unlock();
}
}
+ /**
+ * Lock all possible offsets of a key. This method would release the
locks if any locking
+ * operation failed.
+ *
+ * @param plainKey
+ * @return <code>true</code> if all the offsets are locked.
+ */
+ private boolean lockPlainKey(byte[] plainKey) {
+ return lockDigestedKey(getDigestedRoutingKey(plainKey));
+ }
+
+ private void unlockPlainKey(byte[] plainKey) {
+ unlockDigestedKey(getDigestedRoutingKey(plainKey));
+ }
+
+ /**
+ * Lock all possible offsets of a key. This method would release the
locks if any locking
+ * operation failed.
+ *
+ * @param digestedKey
+ * @return <code>true</code> if all the offsets are locked.
+ */
+ private boolean lockDigestedKey(byte[] digestedKey) {
+ // use a set to prevent duplicated offsets,
+ // a sorted set to prevent deadlocks
+ SortedSet<Long> offsets = new TreeSet<Long>();
+ long[] offsetArray = getOffsetFromDigestedKey(digestedKey,
storeSize);
+ for (long offset : offsetArray)
+ offsets.add(offset);
+ if (prevStoreSize != 0) {
+ offsetArray = getOffsetFromDigestedKey(digestedKey,
prevStoreSize);
+ for (long offset : offsetArray)
+ offsets.add(offset);
+ }
+
+ Set<Long> locked = new TreeSet<Long>();
+ for (long offset : offsets) {
+ boolean status = lockEntry(offset);
+ if (!status)
+ break;
+ locked.add(offset);
+ }
+
+ if (locked.size() == offsets.size()) {
+ return true;
+ } else {
+ // failed, remove the locks
+ for (long offset : offsets)
+ unlockEntry(offset);
+ return false;
+ }
+ }
+
+ private void unlockDigestedKey(byte[] digestedKey) {
+ // use a set to prevent duplicated offsets
+ SortedSet<Long> offsets = new TreeSet<Long>();
+ long[] offsetArray = getOffsetFromDigestedKey(digestedKey,
storeSize);
+ for (long offset : offsetArray)
+ offsets.add(offset);
+ if (prevStoreSize != 0) {
+ offsetArray = getOffsetFromDigestedKey(digestedKey,
prevStoreSize);
+ for (long offset : offsetArray)
+ offsets.add(offset);
+ }
+
+ for (long offset : offsets) {
+ unlockEntry(offset);
+ }
+ }
+
public class ShutdownDB implements Runnable {
public void run() {
shutdown = true;