Author: toad
Date: 2006-06-20 19:28:02 +0000 (Tue, 20 Jun 2006)
New Revision: 9318
Modified:
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
Log:
Don't use the node lock to serialize accesses to the datastore.
One big lock per datastore.
Also the store code is just about ready to have no big lock, but I want to be
sure it works with a big lock first as we've been having some wierd reports.
Also some minor fixes to the datastore.
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2006-06-20 18:52:48 UTC (rev
9317)
+++ trunk/freenet/src/freenet/node/Node.java 2006-06-20 19:28:02 UTC (rev
9318)
@@ -1812,17 +1812,15 @@
throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
}
long startTime = System.currentTimeMillis();
- synchronized(this) {
- if(cache) {
- try {
- chkDatastore.put(block);
- } catch (IOException e) {
- Logger.error(this, "Datastore failure:
"+e, e);
- }
+ if(cache) {
+ try {
+ chkDatastore.put(block);
+ } catch (IOException e) {
+ Logger.error(this, "Datastore failure: "+e, e);
}
- is =
makeInsertSender((NodeCHK)block.getClientKey().getNodeKey(),
- MAX_HTL, uid, null, headers, prb,
false, lm.getLocation().getValue(), cache);
}
+ is =
makeInsertSender((NodeCHK)block.getClientKey().getNodeKey(),
+ MAX_HTL, uid, null, headers, prb, false,
lm.getLocation().getValue(), cache);
boolean hasForwardedRejectedOverload = false;
// Wait for status
while(true) {
@@ -1914,19 +1912,17 @@
throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
}
long startTime = System.currentTimeMillis();
- synchronized(this) {
- if(cache) {
- try {
- sskDatastore.put(block, false);
- } catch (IOException e) {
- Logger.error(this, "Datastore failure:
"+e, e);
- } catch (KeyCollisionException e) {
- throw new
LowLevelPutException(LowLevelPutException.COLLISION);
- }
+ if(cache) {
+ try {
+ sskDatastore.put(block, false);
+ } catch (IOException e) {
+ Logger.error(this, "Datastore failure: "+e, e);
+ } catch (KeyCollisionException e) {
+ throw new
LowLevelPutException(LowLevelPutException.COLLISION);
}
- is = makeInsertSender(block,
- MAX_HTL, uid, null, false,
lm.getLocation().getValue(), cache);
}
+ is = makeInsertSender(block,
+ MAX_HTL, uid, null, false,
lm.getLocation().getValue(), cache);
boolean hasForwardedRejectedOverload = false;
// Wait for status
while(true) {
@@ -1976,9 +1972,7 @@
if(is.hasCollided()) {
// Store it locally so it can be fetched immediately,
and overwrites any locally inserted.
try {
- synchronized(this) {
- sskDatastore.put(is.getBlock(), true);
- }
+ sskDatastore.put(is.getBlock(), true);
} catch (KeyCollisionException e) {
// Impossible
} catch (IOException e) {
@@ -2368,7 +2362,7 @@
}
}
- public synchronized SSKBlock fetch(NodeSSK key) {
+ public SSKBlock fetch(NodeSSK key) {
// Can we just lock on sskDatastore here? **FIXME**
try {
return sskDatastore.fetch(key, false);
@@ -2378,8 +2372,7 @@
}
}
- public synchronized CHKBlock fetch(NodeCHK key) {
- // Can we just lock on chkDatastore here? **FIXME**
+ public CHKBlock fetch(NodeCHK key) {
try {
return chkDatastore.fetch(key, false);
} catch (IOException e) {
@@ -2391,8 +2384,7 @@
/**
* Store a datum.
*/
- public synchronized void store(CHKBlock block) {
- // Can we just lock on chkDatastore here? **FIXME**
+ public void store(CHKBlock block) {
try {
chkDatastore.put(block);
} catch (IOException e) {
@@ -2400,7 +2392,7 @@
}
}
- public synchronized void store(SSKBlock block) throws
KeyCollisionException {
+ public void store(SSKBlock block) throws KeyCollisionException {
try {
sskDatastore.put(block, false);
cacheKey(((NodeSSK)block.getKey()).getPubKeyHash(),
((NodeSSK)block.getKey()).getPubKey());
Modified: trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
===================================================================
--- trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2006-06-20
18:52:48 UTC (rev 9317)
+++ trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2006-06-20
19:28:02 UTC (rev 9318)
@@ -44,6 +44,11 @@
* @author tubbie
*
* TODO: Fix ugly Exception handling
+ *
+ *
+ * FIXME:
+ * This should in theory be threadsafe without the Big Lock.
+ * Remove the big lock, when we are sure that the major issues are sorted.
*/
public class BerkeleyDBFreenetStore implements FreenetStore {
@@ -56,6 +61,7 @@
private final File fixSecondaryFile;
private long chkBlocksInStore;
+ private final Object chkBlocksInStoreLock = new Object();
private long maxChkBlocks;
private final Database chkDB;
private final Database chkDB_accessTime;
@@ -63,6 +69,7 @@
private final RandomAccessFile chkStore;
private long lastRecentlyUsed;
+ private final Object lastRecentlyUsedSync = new Object();
private boolean closed = false;
@@ -235,8 +242,7 @@
* @param dontPromote If true, don't promote data if fetched.
* @return null if there is no such block stored, otherwise the block.
*/
- public CHKBlock fetch(NodeCHK chk, boolean dontPromote) throws IOException
- {
+ public synchronized CHKBlock fetch(NodeCHK chk, boolean dontPromote)
throws IOException {
if(closed)
return null;
@@ -261,6 +267,8 @@
!=OperationStatus.SUCCESS) {
c.close();
t.abort();
+ t = null;
+ c = null;
return null;
}
@@ -272,7 +280,7 @@
byte[] data = new byte[dataBlockSize];
synchronized(chkStore) {
long seekTarget =
storeBlock.offset*(long)(dataBlockSize+headerBlockSize);
- try {
+ try {
chkStore.seek(seekTarget);
} catch (IOException ioe) {
if(seekTarget > (2*1024*1024*1024)) {
@@ -297,9 +305,13 @@
c.putCurrent(updateDBE);
c.close();
t.commit();
+ c = null;
+ t = null;
}else{
c.close();
t.abort();
+ c = null;
+ t = null;
}
Logger.minor(this, "Get key: "+chk);
@@ -337,8 +349,7 @@
* @param dontPromote If true, don't promote data if fetched.
* @return null if there is no such block stored, otherwise the block.
*/
- public SSKBlock fetch(NodeSSK chk, boolean dontPromote) throws IOException
- {
+ public synchronized SSKBlock fetch(NodeSSK chk, boolean dontPromote)
throws IOException {
if(closed)
return null;
@@ -355,6 +366,8 @@
!=OperationStatus.SUCCESS) {
c.close();
t.abort();
+ c = null;
+ t = null;
return null;
}
@@ -373,17 +386,20 @@
block = new SSKBlock(data,header,chk, true);
- if(!dontPromote)
- {
+ if(!dontPromote) {
storeBlock.updateRecentlyUsed();
DatabaseEntry updateDBE = new DatabaseEntry();
storeBlockTupleBinding.objectToEntry(storeBlock, updateDBE);
c.putCurrent(updateDBE);
c.close();
t.commit();
+ c = null;
+ t = null;
}else{
c.close();
t.abort();
+ c = null;
+ t = null;
}
Logger.minor(this, "Get key: "+chk);
@@ -398,6 +414,8 @@
c.putCurrent(updateDBE);
c.close();
t.commit();
+ c = null;
+ t = null;
return null;
}
return block;
@@ -419,7 +437,7 @@
// FIXME do this with interfaces etc.
- public DSAPublicKey fetchPubKey(byte[] hash, boolean dontPromote) throws
IOException {
+ public synchronized DSAPublicKey fetchPubKey(byte[] hash, boolean
dontPromote) throws IOException {
return fetchPubKey(hash, null, dontPromote);
}
@@ -429,8 +447,7 @@
* @param replacement If non-null, and the data exists but is corrupt,
replace it with this.
* @return null if there is no such block stored, otherwise the block.
*/
- public DSAPublicKey fetchPubKey(byte[] hash, DSAPublicKey replacement,
boolean dontPromote) throws IOException
- {
+ public synchronized DSAPublicKey fetchPubKey(byte[] hash, DSAPublicKey
replacement, boolean dontPromote) throws IOException {
if(closed)
return null;
@@ -447,6 +464,8 @@
!=OperationStatus.SUCCESS) {
c.close();
t.abort();
+ c = null;
+ t = null;
return null;
}
@@ -478,6 +497,8 @@
Logger.error(this, "Could not read key");
c.close();
t.abort();
+ c = null;
+ t = null;
return null;
}
@@ -498,6 +519,8 @@
c.putCurrent(updateDBE);
c.close();
t.commit();
+ t = null;
+ c = null;
return null;
}
}
@@ -505,6 +528,8 @@
// Finished, commit.
c.close();
t.commit();
+ c = null;
+ t = null;
Logger.minor(this, "Get key:
"+HexUtil.bytesToHex(hash));
Logger.minor(this, "Data: "+data.length+" bytes, hash
"+data);
@@ -526,7 +551,7 @@
// return null;
}
- public void put(CHKBlock b) throws IOException {
+ public synchronized void put(CHKBlock b) throws IOException {
NodeCHK chk = (NodeCHK) b.getKey();
CHKBlock oldBlock = fetch(chk, false);
if(oldBlock != null)
@@ -534,7 +559,7 @@
innerPut(b);
}
- public void put(SSKBlock b, boolean overwrite) throws IOException,
KeyCollisionException {
+ public synchronized void put(SSKBlock b, boolean overwrite) throws
IOException, KeyCollisionException {
NodeSSK ssk = (NodeSSK) b.getKey();
SSKBlock oldBlock = fetch(ssk, false);
if(oldBlock != null) {
@@ -542,18 +567,15 @@
if(!overwrite)
throw new KeyCollisionException();
else {
- if(overwrite(b)) {
- fetch(ssk, false); // promote it
- return;
- }
+ overwrite(b);
}
}
- return;
+ } else {
+ innerPut(b);
}
- innerPut(b);
}
- private boolean overwrite(SSKBlock b) throws IOException {
+ private synchronized boolean overwrite(SSKBlock b) throws IOException {
NodeSSK chk = (NodeSSK) b.getKey();
byte[] routingkey = chk.getRoutingKey();
DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
@@ -563,7 +585,8 @@
try{
t = environment.beginTransaction(null,null);
c = chkDB.openCursor(t,null);
-
+
+ // Lock the record.
if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.RMW)
!=OperationStatus.SUCCESS) {
c.close();
@@ -577,12 +600,22 @@
byte[] data = b.getRawData();
synchronized(chkStore) {
chkStore.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
-
chkStore.write(header);
chkStore.write(data);
}
-
+
+ // Unlock record.
+ c.close();
+ c = null;
+ t.commit();
+ t = null;
+
} catch(Throwable ex) { // FIXME: ugly
+ checkSecondaryDatabaseError(ex);
+ Logger.error(this, "Caught "+ex, ex);
+ ex.printStackTrace();
+ throw new IOException(ex.getMessage());
+ } finally {
if(c!=null) {
try{c.close();}catch(DatabaseException ex2){}
@@ -590,11 +623,7 @@
if(t!=null) {
try{t.abort();}catch(DatabaseException ex2){}
}
-
- checkSecondaryDatabaseError(ex);
- Logger.error(this, "Caught "+ex, ex);
- ex.printStackTrace();
- throw new IOException(ex.getMessage());
+
}
return true;
@@ -603,8 +632,7 @@
/**
* Store a block.
*/
- void innerPut(KeyBlock block) throws IOException
- {
+ private synchronized void innerPut(KeyBlock block) throws IOException {
if(closed)
return;
@@ -627,47 +655,57 @@
t = environment.beginTransaction(null,null);
DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
- synchronized(chkStore) {
- if(chkBlocksInStore<maxChkBlocks) {
- // Expand the store file
- long byteOffset =
chkBlocksInStore*(dataBlockSize+headerBlockSize);
- StoreBlock storeBlock = new
StoreBlock(chkBlocksInStore);
- DatabaseEntry blockDBE = new DatabaseEntry();
- storeBlockTupleBinding.objectToEntry(storeBlock,
blockDBE);
- chkDB.put(t,routingkeyDBE,blockDBE);
- try {
- chkStore.seek(byteOffset);
- } catch (IOException ioe) {
- if(byteOffset > (2*1024*1024*1024)) {
- Logger.error(this, "Environment does not support
files bigger than 2 GB?");
- System.out.println("Environment does not support
files bigger than 2 GB? (exception to follow)");
- }
- Logger.error(this, "Caught IOException on
chkStore.seek("+byteOffset+")");
- throw ioe;
- }
- chkStore.write(header);
- chkStore.write(data);
- t.commit();
- chkBlocksInStore++;
- }else{
- // Overwrite an other block
- Cursor c = chkDB_accessTime.openCursor(t,null);
- DatabaseEntry keyDBE = new DatabaseEntry();
- DatabaseEntry dataDBE = new DatabaseEntry();
- c.getFirst(keyDBE,dataDBE,null);
- StoreBlock oldStoreBlock = (StoreBlock)
storeBlockTupleBinding.entryToObject(dataDBE);
- c.delete();
- c.close();
- StoreBlock storeBlock = new
StoreBlock(oldStoreBlock.getOffset());
- DatabaseEntry blockDBE = new DatabaseEntry();
-
storeBlockTupleBinding.objectToEntry(storeBlock, blockDBE);
- chkDB.put(t,routingkeyDBE,blockDBE);
-
chkStore.seek(storeBlock.getOffset()*(long)(dataBlockSize+headerBlockSize));
- chkStore.write(header);
- chkStore.write(data);
- t.commit();
- }
- }
+ if(chkBlocksInStore<maxChkBlocks) {
+ // Expand the store file
+ long blockNum;
+ synchronized(chkBlocksInStoreLock) {
+ blockNum = chkBlocksInStore;
+ chkBlocksInStore++;
+ }
+ long byteOffset =
blockNum*(dataBlockSize+headerBlockSize);
+ StoreBlock storeBlock = new StoreBlock(blockNum);
+ DatabaseEntry blockDBE = new DatabaseEntry();
+ storeBlockTupleBinding.objectToEntry(storeBlock,
blockDBE);
+ chkDB.put(t,routingkeyDBE,blockDBE);
+ synchronized(chkStore) {
+ try {
+ chkStore.seek(byteOffset);
+ } catch (IOException ioe) {
+ if(byteOffset > (2*1024*1024*1024)) {
+ Logger.error(this, "Environment
does not support files bigger than 2 GB?");
+ System.out.println("Environment
does not support files bigger than 2 GB? (exception to follow)");
+ }
+ Logger.error(this, "Caught IOException
on chkStore.seek("+byteOffset+")");
+ throw ioe;
+ }
+ chkStore.write(header);
+ chkStore.write(data);
+ }
+ t.commit();
+ t = null;
+ }else{
+ // Overwrite an other block
+ Cursor c = chkDB_accessTime.openCursor(t,null);
+ DatabaseEntry keyDBE = new DatabaseEntry();
+ DatabaseEntry dataDBE = new DatabaseEntry();
+ c.getFirst(keyDBE,dataDBE,LockMode.RMW);
+ StoreBlock oldStoreBlock = (StoreBlock)
storeBlockTupleBinding.entryToObject(dataDBE);
+ c.delete();
+ c.close();
+ // Deleted, so we can now reuse it.
+ // Because we acquired a write lock, nobody else has
taken it.
+ StoreBlock storeBlock = new
StoreBlock(oldStoreBlock.getOffset());
+ DatabaseEntry blockDBE = new DatabaseEntry();
+ storeBlockTupleBinding.objectToEntry(storeBlock,
blockDBE);
+ chkDB.put(t,routingkeyDBE,blockDBE);
+ synchronized(chkStore) {
+
chkStore.seek(storeBlock.getOffset()*(long)(dataBlockSize+headerBlockSize));
+ chkStore.write(header);
+ chkStore.write(data);
+ }
+ t.commit();
+ t = null;
+ }
Logger.minor(this, "Put key: "+block.getKey());
Logger.minor(this, "Headers: "+header.length+" bytes, hash
"+Fields.hashCode(header));
@@ -680,11 +718,12 @@
checkSecondaryDatabaseError(ex);
Logger.error(this, "Caught "+ex, ex);
ex.printStackTrace();
- throw new IOException(ex.getMessage());
+ if(ex instanceof IOException) throw (IOException) ex;
+ else throw new IOException(ex.getMessage());
}
}
- private void checkSecondaryDatabaseError(Throwable ex) {
+ private synchronized void checkSecondaryDatabaseError(Throwable ex) {
if(ex instanceof DatabaseException && ex.getMessage().indexOf("missing
key in the primary database") > -1) {
try {
fixSecondaryFile.createNewFile();
@@ -702,7 +741,7 @@
/**
* Store a pubkey.
*/
- public void put(byte[] hash, DSAPublicKey key) throws IOException {
+ public synchronized void put(byte[] hash, DSAPublicKey key) throws
IOException {
DSAPublicKey k = fetchPubKey(hash, key, true);
if(k == null)
innerPut(hash, key);
@@ -711,8 +750,7 @@
/**
* Store a block.
*/
- public void innerPut(byte[] hash, DSAPublicKey key) throws IOException
- {
+ private void innerPut(byte[] hash, DSAPublicKey key) throws IOException {
if(closed)
return;
@@ -738,31 +776,44 @@
synchronized(chkStore) {
if(chkBlocksInStore<maxChkBlocks) {
// Expand the store file
- long byteOffset =
chkBlocksInStore*(dataBlockSize+headerBlockSize);
+ long blockNum;
+ synchronized(chkBlocksInStoreLock) {
+ blockNum = chkBlocksInStore;
+ chkBlocksInStore++;
+ }
+
+ long byteOffset =
blockNum*(dataBlockSize+headerBlockSize);
StoreBlock storeBlock = new
StoreBlock(chkBlocksInStore);
DatabaseEntry blockDBE = new DatabaseEntry();
storeBlockTupleBinding.objectToEntry(storeBlock,
blockDBE);
- chkDB.put(t,routingkeyDBE,blockDBE);
- chkStore.seek(byteOffset);
- chkStore.write(data);
+ chkDB.put(t,routingkeyDBE,blockDBE);
+ synchronized(chkStore) {
+ chkStore.seek(byteOffset);
+ chkStore.write(data);
+ }
t.commit();
- chkBlocksInStore++;
+ t = null;
}else{
// Overwrite an other block
Cursor c = chkDB_accessTime.openCursor(t,null);
DatabaseEntry keyDBE = new DatabaseEntry();
DatabaseEntry dataDBE = new DatabaseEntry();
- c.getFirst(keyDBE,dataDBE,null);
+ c.getFirst(keyDBE,dataDBE,LockMode.RMW);
StoreBlock oldStoreBlock = (StoreBlock)
storeBlockTupleBinding.entryToObject(dataDBE);
c.delete();
c.close();
+ // Deleted, so we can now reuse it.
+ // Because we acquired a write lock, nobody
else has taken it.
StoreBlock storeBlock = new
StoreBlock(oldStoreBlock.getOffset());
DatabaseEntry blockDBE = new DatabaseEntry();
storeBlockTupleBinding.objectToEntry(storeBlock, blockDBE);
chkDB.put(t,routingkeyDBE,blockDBE);
-
chkStore.seek(storeBlock.getOffset()*(long)(dataBlockSize+headerBlockSize));
- chkStore.write(data);
+ synchronized(chkStore) {
+
chkStore.seek(storeBlock.getOffset()*(long)(dataBlockSize+headerBlockSize));
+ chkStore.write(data);
+ }
t.commit();
+ t = null;
}
}
@@ -776,22 +827,20 @@
checkSecondaryDatabaseError(ex);
Logger.error(this, "Caught "+ex, ex);
ex.printStackTrace();
- throw new IOException(ex.getMessage());
+ if(ex instanceof IOException) throw (IOException) ex;
+ else throw new IOException(ex.getMessage());
}
}
- private class StoreBlock
- {
+ private class StoreBlock {
private long recentlyUsed;
private long offset;
- public StoreBlock(long offset)
- {
+ public StoreBlock(long offset) {
this(offset,getNewRecentlyUsed());
}
- public StoreBlock(long offset,long recentlyUsed)
- {
+ public StoreBlock(long offset,long recentlyUsed) {
this.offset = offset;
this.recentlyUsed = recentlyUsed;
}
@@ -801,13 +850,11 @@
return recentlyUsed;
}
- public void setRecentlyUsedToZero()
- {
+ public void setRecentlyUsedToZero() {
recentlyUsed = 0;
}
- public void updateRecentlyUsed()
- {
+ public void updateRecentlyUsed() {
recentlyUsed = getNewRecentlyUsed();
}
@@ -819,8 +866,7 @@
/**
* Convert StoreBlock's to the format used by the database
*/
- private class StoreBlockTupleBinding extends TupleBinding
- {
+ private class StoreBlockTupleBinding extends TupleBinding {
public void objectToEntry(Object object, TupleOutput to) {
StoreBlock myData = (StoreBlock)object;
@@ -882,8 +928,7 @@
}
- private class ShutdownHook extends Thread
- {
+ private class ShutdownHook extends Thread {
public void run() {
close();
}
@@ -938,8 +983,7 @@
return count;
}
- private long getMaxRecentlyUsed()
- {
+ private long getMaxRecentlyUsed() {
long maxRecentlyUsed = 0;
try{
@@ -956,12 +1000,14 @@
return maxRecentlyUsed;
}
- private synchronized long getNewRecentlyUsed() {
- lastRecentlyUsed++;
- return lastRecentlyUsed;
+ private long getNewRecentlyUsed() {
+ synchronized(lastRecentlyUsedSync) {
+ lastRecentlyUsed++;
+ return lastRecentlyUsed;
+ }
}
public void setMaxKeys(long maxStoreKeys) {
maxChkBlocks = maxStoreKeys;
}
-}
\ No newline at end of file
+}