Author: toad
Date: 2007-03-09 22:09:18 +0000 (Fri, 09 Mar 2007)
New Revision: 12077
Modified:
trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
Log:
Remove the Big Fat Lock.
BDB's own record-level locking should be plenty.
The BFL is causing problems in practice due to the slowdown caused by recently
enabling durability.
Having said that, this makes some fairly large changes, so expect trouble...
Modified: trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
===================================================================
--- trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2007-03-09
21:22:34 UTC (rev 12076)
+++ trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2007-03-09
22:09:18 UTC (rev 12077)
@@ -34,6 +34,7 @@
import freenet.crypt.RandomSource;
import freenet.keys.CHKBlock;
import freenet.keys.CHKVerifyException;
+import freenet.keys.Key;
import freenet.keys.KeyBlock;
import freenet.keys.NodeCHK;
import freenet.keys.NodeSSK;
@@ -53,10 +54,6 @@
*
* TODO: Fix ugly Exception handling
*
- *
- * FIXME:
- * This should in theory be threadsafe without the Big Lock.
- * Remove the big lock, when we are sure that the major issues are sorted.
*/
public class BerkeleyDBFreenetStore implements FreenetStore {
@@ -589,7 +586,7 @@
private long checkForHoles(long blocksInFile, boolean dontTruncate)
throws DatabaseException {
System.err.println("Checking for holes in database...
"+blocksInFile+" blocks in file");
- WrapperManager.signalStarting(5*60*1000 +
(int)blocksInFile*100); // 10/sec
+ WrapperManager.signalStarting((int)Math.min(Integer.MAX_VALUE,
5*60*1000 + blocksInFile*100)); // 10/sec
long holes = 0;
long maxPresent = 0;
freeBlocks.clear();
@@ -669,7 +666,7 @@
if(!dontCheck)
checkForHoles(maxChkBlocks, true);
- WrapperManager.signalStarting(5*60*1000 + (int)chkBlocksInStore * 100);
// 10 per second
+ WrapperManager.signalStarting((int)(Math.min(Integer.MAX_VALUE,
5*60*1000 + chkBlocksInStore * 100))); // 10 per second
long realSize = countCHKBlocksFromFile();
@@ -1196,9 +1193,11 @@
* @param dontPromote If true, don't promote data if fetched.
* @return null if there is no such block stored, otherwise the block.
*/
- public synchronized CHKBlock fetch(NodeCHK chk, boolean dontPromote)
throws IOException {
- if(closed)
- return null;
+ public CHKBlock fetch(NodeCHK chk, boolean dontPromote) throws IOException
{
+ synchronized(this) {
+ if(closed)
+ return null;
+ }
byte[] routingkey = chk.getRoutingKey();
DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
@@ -1315,9 +1314,11 @@
* @param dontPromote If true, don't promote data if fetched.
* @return null if there is no such block stored, otherwise the block.
*/
- public synchronized SSKBlock fetch(NodeSSK chk, boolean dontPromote)
throws IOException {
- if(closed)
- return null;
+ public SSKBlock fetch(NodeSSK chk, boolean dontPromote) throws IOException
{
+ synchronized(this) {
+ if(closed)
+ return null;
+ }
byte[] routingkey = chk.getRoutingKey();
DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
@@ -1328,8 +1329,12 @@
t = environment.beginTransaction(null,null);
c = chkDB.openCursor(t,null);
+ // Explanation of locking is in fetchPubKey.
+ // Basically, locking the whole element saves us all sorts of
trouble, especially
+ // since we will usually be writing here if only to promote it.
if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.RMW)
!=OperationStatus.SUCCESS) {
+ // FIXME we are supposed to close the cursor first,
right?
c.close();
c = null;
t.abort();
@@ -1360,6 +1365,7 @@
DatabaseEntry updateDBE = new DatabaseEntry();
storeBlockTupleBinding.objectToEntry(storeBlock, updateDBE);
c.putCurrent(updateDBE);
+ // FIXME we are supposed to close the cursor
first, right?
c.close();
c = null;
t.commit();
@@ -1379,9 +1385,9 @@
}catch(SSKVerifyException ex){
Logger.normal(this, "SSKBlock: Does not verify
("+ex+"), setting accessTime to 0 for : "+chk, ex);
+ chkDB.delete(t, routingkeyDBE);
c.close();
c = null;
- chkDB.delete(t, routingkeyDBE);
t.commit();
t = null;
addFreeBlock(storeBlock.offset, true, "SSK does not
verify");
@@ -1412,7 +1418,7 @@
// FIXME do this with interfaces etc.
- public synchronized DSAPublicKey fetchPubKey(byte[] hash, boolean
dontPromote) throws IOException {
+ public DSAPublicKey fetchPubKey(byte[] hash, boolean dontPromote) throws
IOException {
return fetchPubKey(hash, null, dontPromote);
}
@@ -1422,9 +1428,11 @@
* @param replacement If non-null, and the data exists but is corrupt,
replace it with this.
* @return null if there is no such block stored, otherwise the block.
*/
- public synchronized DSAPublicKey fetchPubKey(byte[] hash, DSAPublicKey
replacement, boolean dontPromote) throws IOException {
- if(closed)
- return null;
+ public DSAPublicKey fetchPubKey(byte[] hash, DSAPublicKey replacement,
boolean dontPromote) throws IOException {
+ synchronized(this) {
+ if(closed)
+ return null;
+ }
DatabaseEntry routingkeyDBE = new DatabaseEntry(hash);
DatabaseEntry blockDBE = new DatabaseEntry();
@@ -1435,6 +1443,10 @@
c = chkDB.openCursor(t,null);
// Lock the records as soon as we find them.
+ // RMW - nobody else may access this key until we are finished.
+ // This is advantageous as we will usually promote it and we
may replace its content;
+ // if two readers accessed it at once both might try to. Also
IIRC we can deadlock
+ // if we don't.
if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.RMW)
!=OperationStatus.SUCCESS) {
c.close();
@@ -1463,6 +1475,7 @@
byte[] data = new byte[dataBlockSize];
if(logMINOR) Logger.minor(this, "Reading from store...
"+storeBlock.offset+" ("+storeBlock.recentlyUsed+ ')');
+ // When will java have pread/pwrite? :(
synchronized(chkStore) {
chkStore.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
chkStore.readFully(data);
@@ -1474,12 +1487,12 @@
} catch (CryptFormatException e) {
Logger.error(this, "Could not read key: "+e, e);
finishKey(storeBlock, c, t, routingkeyDBE, hash,
replacement);
- return null;
+ return replacement;
}
if(!Arrays.equals(block.asBytesHash(), hash)) {
finishKey(storeBlock, c, t, routingkeyDBE, hash,
replacement);
- return null;
+ return replacement;
}
// Finished, commit.
@@ -1562,7 +1575,7 @@
}
}
- public synchronized void put(CHKBlock b) throws IOException {
+ public void put(CHKBlock b) throws IOException {
NodeCHK chk = (NodeCHK) b.getKey();
CHKBlock oldBlock = fetch(chk, false);
if(oldBlock != null)
@@ -1570,7 +1583,7 @@
innerPut(b);
}
- public synchronized void put(SSKBlock b, boolean overwrite) throws
IOException, KeyCollisionException {
+ public void put(SSKBlock b, boolean overwrite) throws IOException,
KeyCollisionException {
NodeSSK ssk = (NodeSSK) b.getKey();
SSKBlock oldBlock = fetch(ssk, false);
if(oldBlock != null) {
@@ -1586,7 +1599,12 @@
}
}
- private synchronized boolean overwrite(SSKBlock b) throws IOException {
+ private boolean overwrite(SSKBlock b) throws IOException {
+ synchronized(this) {
+ if(closed)
+ return false;
+ }
+
NodeSSK chk = (NodeSSK) b.getKey();
byte[] routingkey = chk.getRoutingKey();
DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
@@ -1645,9 +1663,11 @@
/**
* Store a block.
*/
- private synchronized void innerPut(KeyBlock block) throws IOException {
- if(closed)
- return;
+ private void innerPut(KeyBlock block) throws IOException {
+ synchronized(this) {
+ if(closed)
+ return;
+ }
byte[] routingkey = block.getKey().getRoutingKey();
byte[] data = block.getRawData();
@@ -1668,6 +1688,28 @@
t = environment.beginTransaction(null,null);
DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
+ DatabaseEntry blockDBE = new DatabaseEntry();
+
+ // Check whether it already exists
+
+ OperationStatus result = chkDB.get(t, routingkeyDBE, blockDBE,
LockMode.RMW);
+
+ if(result == OperationStatus.SUCCESS || result ==
OperationStatus.KEYEXIST) {
+ // Key already exists!
+ // But is it valid?
+ t.abort();
+ if(fetchKey(block.getKey(), false) != null) return; //
old key was valid, we are not overwriting
+ // If we are here, it was corrupt, or it was just
deleted, so we can replace it.
+ innerPut(block);
+ return;
+ } else if(result == OperationStatus.KEYEMPTY) {
+ Logger.error(this, "Got KEYEMPTY - record deleted?
Shouldn't be possible with record locking...!");
+ // Put it in anyway
+ } else if(result == OperationStatus.NOTFOUND) {
+ // Good
+ } else
+ throw new IllegalStateException("Unknown operation
status: "+result);
+
writeBlock(header, data, t, routingkeyDBE);
t.commit();
@@ -1691,7 +1733,14 @@
}
}
- private void overwriteLRUBlock(byte[] header, byte[] data, Transaction t,
DatabaseEntry routingkeyDBE) throws DatabaseException, IOException {
+ private KeyBlock fetchKey(Key key, boolean b) throws IOException {
+ if(key instanceof NodeCHK)
+ return fetch((NodeCHK)key, b);
+ else
+ return fetch((NodeSSK)key, b);
+ }
+
+ private void overwriteLRUBlock(byte[] header, byte[] data, Transaction
t, DatabaseEntry routingkeyDBE) throws DatabaseException, IOException {
// Overwrite an other block
Cursor c = chkDB_accessTime.openCursor(t,null);
DatabaseEntry keyDBE = new DatabaseEntry();
@@ -1753,7 +1802,7 @@
return true;
}
- private synchronized void checkSecondaryDatabaseError(Throwable ex) {
+ private void checkSecondaryDatabaseError(Throwable ex) {
if((ex instanceof DatabaseException) && (ex.getMessage() != null &&
ex.getMessage().indexOf("missing key in the primary database") > -1)) {
try {
fixSecondaryFile.createNewFile();
@@ -1771,18 +1820,18 @@
/**
* Store a pubkey.
*/
- public synchronized void put(byte[] hash, DSAPublicKey key) throws
IOException {
- DSAPublicKey k = fetchPubKey(hash, key, true);
- if(k == null)
- innerPut(hash, key);
+ public void put(byte[] hash, DSAPublicKey key) throws IOException {
+ innerPut(hash, key);
}
/**
* Store a block.
*/
- private void innerPut(byte[] hash, DSAPublicKey key) throws IOException {
- if(closed)
- return;
+ private void innerPut(byte[] hash, DSAPublicKey key) throws IOException {
+ synchronized(this) {
+ if(closed)
+ return;
+ }
byte[] routingkey = hash;
byte[] data = key.asPaddedBytes();
@@ -1790,7 +1839,6 @@
if(!(Arrays.equals(hash, key.asBytesHash()))) {
Logger.error(this, "Invalid hash!: "+HexUtil.bytesToHex(hash)+"
: "+key.asBytesHash());
}
-
if(data.length!=dataBlockSize) {
Logger.error(this, "This data is "+data.length+" bytes. Should
be "+dataBlockSize);
@@ -1802,7 +1850,28 @@
try{
t = environment.beginTransaction(null,null);
DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
+ DatabaseEntry blockDBE = new DatabaseEntry();
+ // Check whether it already exists
+
+ OperationStatus result = chkDB.get(t, routingkeyDBE, blockDBE,
LockMode.RMW);
+
+ if(result == OperationStatus.SUCCESS || result ==
OperationStatus.KEYEXIST) {
+ // Key already exists!
+ // But is it valid?
+ t.abort();
+ if(fetchPubKey(hash, key, false) != null) return; //
replaced key
+ // If we are here, it was corrupt, and it got deleted
before it could be replaced.
+ innerPut(hash, key);
+ return;
+ } else if(result == OperationStatus.KEYEMPTY) {
+ Logger.error(this, "Got KEYEMPTY - record deleted?
Shouldn't be possible with record locking...!");
+ // Put it in anyway
+ } else if(result == OperationStatus.NOTFOUND) {
+ // Good
+ } else
+ throw new IllegalStateException("Unknown operation
status: "+result);
+
writeBlock(dummy, data, t, routingkeyDBE);
t.commit();