Author: toad
Date: 2006-01-05 12:56:47 +0000 (Thu, 05 Jan 2006)
New Revision: 7753
Added:
trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
Modified:
trunk/freenet/src/freenet/keys/NodeCHK.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/TestnetHandler.java
trunk/freenet/src/freenet/node/Version.java
Log:
310 (mandatory): Merge BDB store branch (***DATASTORE RESET***). Also some
stderr logging.
Modified: trunk/freenet/src/freenet/keys/NodeCHK.java
===================================================================
--- trunk/freenet/src/freenet/keys/NodeCHK.java 2006-01-05 12:55:43 UTC (rev
7752)
+++ trunk/freenet/src/freenet/keys/NodeCHK.java 2006-01-05 12:56:47 UTC (rev
7753)
@@ -61,4 +61,8 @@
public short getType() {
return TYPE;
}
+
+ public byte[] getRoutingKey(){
+ return routingKey;
+ }
}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2006-01-05 12:55:43 UTC (rev
7752)
+++ trunk/freenet/src/freenet/node/Node.java 2006-01-05 12:56:47 UTC (rev
7753)
@@ -49,6 +49,7 @@
import freenet.keys.KeyBlock;
import freenet.keys.NodeCHK;
import freenet.store.BaseFreenetStore;
+import freenet.store.BerkeleyDBFreenetStore;
import freenet.store.FreenetStore;
import freenet.support.BucketFactory;
import freenet.support.FileLoggerHook;
@@ -342,17 +343,20 @@
downloadDir = new File("downloads");
downloadDir.mkdir();
try {
- datastore = new
BaseFreenetStore(prefix+"freenet-"+portNumber,16384); // 512MB
+ datastore = new BerkeleyDBFreenetStore(prefix+"store-"+portNumber,
32768); // 1GB
} catch (FileNotFoundException e1) {
Logger.error(this, "Could not open datastore: "+e1, e1);
+ System.err.println("Could not open datastore: "+e1);
System.exit(EXIT_STORE_FILE_NOT_FOUND);
throw new Error();
} catch (IOException e1) {
Logger.error(this, "Could not open datastore: "+e1, e1);
+ System.err.println("Could not open datastore: "+e1);
System.exit(EXIT_STORE_IOEXCEPTION);
throw new Error();
} catch (Exception e1) {
Logger.error(this, "Could not open datastore: "+e1, e1);
+ System.err.println("Could not open datastore: "+e1);
System.exit(EXIT_STORE_OTHER);
throw new Error();
}
@@ -423,6 +427,7 @@
// FIXME reenable the above
insertThrottle = new RequestThrottle(10000, 2.0F);
insertStarter = new RequestStarter(insertThrottle, "Insert
starter ("+portNumber+")");
+ System.err.println("Created Node on port "+port);
}
void start(SwapRequestInterval interval) {
Modified: trunk/freenet/src/freenet/node/TestnetHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/TestnetHandler.java 2006-01-05 12:55:43 UTC
(rev 7752)
+++ trunk/freenet/src/freenet/node/TestnetHandler.java 2006-01-05 12:56:47 UTC
(rev 7753)
@@ -47,6 +47,7 @@
serverThread = new Thread(this, "Testnet handler thread");
serverThread.setDaemon(true);
serverThread.start();
+ System.err.println("Started testnet handler on port
"+testnetPort);
}
private final Node node;
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-01-05 12:55:43 UTC (rev
7752)
+++ trunk/freenet/src/freenet/node/Version.java 2006-01-05 12:56:47 UTC (rev
7753)
@@ -20,10 +20,10 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- public static final int buildNumber = 309;
+ public static final int buildNumber = 310;
/** Oldest build of Fred we will talk to */
- public static final int lastGoodBuild = 305;
+ public static final int lastGoodBuild = 310;
/** The highest reported build of fred */
public static int highestSeenBuild = buildNumber;
Added: trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
===================================================================
--- trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2006-01-05
12:55:43 UTC (rev 7752)
+++ trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2006-01-05
12:56:47 UTC (rev 7753)
@@ -0,0 +1,422 @@
+package freenet.store;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.BtreeStats;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.SecondaryConfig;
+import com.sleepycat.je.SecondaryDatabase;
+import com.sleepycat.je.SecondaryKeyCreator;
+import com.sleepycat.je.Transaction;
+
+import freenet.keys.CHKBlock;
+import freenet.keys.CHKVerifyException;
+import freenet.keys.NodeCHK;
+import freenet.support.Fields;
+import freenet.support.Logger;
+
+/**
+ * Freenet datastore based on BerkelyDB Java Edition by sleepycat software
+ * More info at http://www.sleepycat.com/products/bdbje.html
+ *
+ * @author tubbie
+ *
+ * TODO: Fix ugly Exception handling
+ */
+public class BerkeleyDBFreenetStore implements FreenetStore {
+
+ static final int CHK_DATA_BLOCK_SIZE = 32*1024;
+ static final int CHK_HEADER_BLOCK_SIZE = 36;
+
+ private final Environment environment;
+ private final TupleBinding storeBlockTupleBinding;
+ private final TupleBinding longTupleBinding;
+
+ private int chkBlocksInStore;
+ private final int maxChkBlocks;
+ private final Database chkDB;
+ private final Database chkDB_accessTime;
+ private final RandomAccessFile chkStore;
+
+ private long lastRecentlyUsed;
+
+ private boolean closed = false;
+
+ /**
+ * Initializes database
+ * @param the directory where the store is located
+ * @throws FileNotFoundException if the dir does not exist and could not
be created
+ */
+ public BerkeleyDBFreenetStore(String storeDir,int maxChkBlocks) throws
Exception
+ {
+ // Percentage of the database that must contain usefull data
+ // decrease to increase performance, increase to save disk space
+ System.setProperty("je.cleaner.minUtilization","98");
+
+ // Delete empty log files
+ System.setProperty("je.cleaner.expunge","true");
+
+ // Percentage of the maximum heap size used as a cache
+ System.setProperty("je.maxMemoryPercent","30");
+
+ this.maxChkBlocks=maxChkBlocks;
+
+ // Initialize environment
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ envConfig.setTxnWriteNoSync(true);
+ File dir = new File(storeDir);
+ if(!dir.exists())
+ dir.mkdir();
+ File dbDir = new File(dir,"database");
+ if(!dbDir.exists())
+ dbDir.mkdir();
+
+ environment = new Environment(dbDir, envConfig);
+
+ // Initialize CHK database
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setAllowCreate(true);
+ dbConfig.setTransactional(true);
+ chkDB = environment.openDatabase(null,"CHK",dbConfig);
+
+ // Initialize secondary CHK database sorted on accesstime
+ SecondaryConfig secDbConfig = new SecondaryConfig();
+ secDbConfig.setAllowCreate(true);
+ secDbConfig.setSortedDuplicates(true);
+ secDbConfig.setTransactional(true);
+ storeBlockTupleBinding = new StoreBlockTupleBinding();
+ longTupleBinding = TupleBinding.getPrimitiveBinding(Long.class);
+ AccessTimeKeyCreator accessTimeKeyCreator =
+ new AccessTimeKeyCreator(storeBlockTupleBinding);
+ secDbConfig.setKeyCreator(accessTimeKeyCreator);
+ chkDB_accessTime = environment.openSecondaryDatabase
+ (null,
"CHK_accessTime", chkDB, secDbConfig);
+
+ // Initialize the store file
+ File storeFile = new File(dir,"store");
+ if(!storeFile.exists())
+ storeFile.createNewFile();
+ chkStore = new RandomAccessFile(storeFile,"rw");
+
+ chkBlocksInStore = countCHKBlocks();
+ lastRecentlyUsed = getMaxRecentlyUsed();
+
+// Add shutdownhook
+ Runtime.getRuntime().addShutdownHook(new ShutdownHook());
+ }
+
+ /**
+ * Retrieve a block.
+ * @param dontPromote If true, don't promote data if fetched.
+ * @return null if there is no such block stored, otherwise the block.
+ */
+ public CHKBlock fetch(NodeCHK chk, boolean dontPromote) throws IOException
+ {
+ if(closed)
+ return null;
+
+ byte[] routingkey = chk.getRoutingKey();
+ DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
+ DatabaseEntry blockDBE = new DatabaseEntry();
+ Cursor c = null;
+ try{
+ Transaction t = environment.beginTransaction(null,null);
+ c = chkDB.openCursor(t,null);
+
+ if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.DEFAULT)
+ !=OperationStatus.SUCCESS) {
+ c.close();
+ t.abort();
+ return null;
+ }
+
+ StoreBlock storeBlock = (StoreBlock)
storeBlockTupleBinding.entryToObject(blockDBE);
+
+ CHKBlock block = null;
+ try{
+ byte[] header = new byte[CHK_HEADER_BLOCK_SIZE];
+ byte[] data = new byte[CHK_DATA_BLOCK_SIZE];
+ synchronized(chkStore) {
+
chkStore.seek(storeBlock.offset*(long)(CHK_DATA_BLOCK_SIZE+CHK_HEADER_BLOCK_SIZE));
+ chkStore.read(header);
+ chkStore.read(data);
+ }
+
+
+ block = new CHKBlock(data,header,chk);
+
+ if(!dontPromote)
+ {
+ storeBlock.updateRecentlyUsed();
+ DatabaseEntry updateDBE = new DatabaseEntry();
+
storeBlockTupleBinding.objectToEntry(storeBlock, updateDBE);
+ c.putCurrent(updateDBE);
+ c.close();
+ t.commit();
+ }else{
+ c.close();
+ t.abort();
+ }
+
+ Logger.minor(this, "Get key: "+chk);
+ Logger.minor(this, "Headers: "+header.length+" bytes, hash
"+header);
+ Logger.minor(this, "Data: "+data.length+" bytes, hash
"+data);
+
+ }catch(CHKVerifyException ex){
+ Logger.normal(this, "Does not verify, setting
accessTime to 0 for : "+chk);
+ storeBlock.setRecentlyUsedToZero();
+ DatabaseEntry updateDBE = new DatabaseEntry();
+ storeBlockTupleBinding.objectToEntry(storeBlock,
updateDBE);
+ c.putCurrent(updateDBE);
+ c.close();
+ t.commit();
+ return null;
+ }
+ return block;
+ }catch(Exception ex) { // FIXME: ugly
+ if(c!=null) {
+ try{c.close();}catch(DatabaseException ex2){}
+ }
+ Logger.error(this, "Caught "+ex, ex);
+ ex.printStackTrace();
+ throw new IOException(ex.getMessage());
+ }
+
+// return null;
+ }
+
+ /**
+ * Store a block.
+ */
+ public void put(CHKBlock block) throws IOException
+ {
+ if(closed)
+ return;
+
+ byte[] routingkey = ((NodeCHK)block.getKey()).getRoutingKey();
+ byte[] data = block.getData();
+ byte[] header = block.getHeader();
+
+ if(data.length!=CHK_DATA_BLOCK_SIZE) {
+ Logger.minor(this, "This data is "+data.length+" bytes. Should
be "+CHK_DATA_BLOCK_SIZE);
+ return;
+ }
+ if(header.length!=CHK_HEADER_BLOCK_SIZE) {
+ Logger.minor(this, "This header is "+data.length+" bytes.
Should be "+CHK_HEADER_BLOCK_SIZE);
+ return;
+ }
+
+ Transaction t = null;
+
+ try{
+ t = environment.beginTransaction(null,null);
+ DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
+
+ synchronized(chkStore) {
+ if(chkBlocksInStore<maxChkBlocks) {
+ // Expand the store file
+ int byteOffset =
chkBlocksInStore*(CHK_DATA_BLOCK_SIZE+CHK_HEADER_BLOCK_SIZE);
+ StoreBlock storeBlock = new
StoreBlock(chkBlocksInStore);
+ DatabaseEntry blockDBE = new DatabaseEntry();
+ storeBlockTupleBinding.objectToEntry(storeBlock,
blockDBE);
+ chkDB.put(t,routingkeyDBE,blockDBE);
+ chkStore.seek(byteOffset);
+ chkStore.write(header);
+ chkStore.write(data);
+ t.commit();
+ chkBlocksInStore++;
+ }else{
+ // Overwrite an other block
+ Cursor c = chkDB_accessTime.openCursor(t,null);
+ DatabaseEntry keyDBE = new DatabaseEntry();
+ DatabaseEntry dataDBE = new DatabaseEntry();
+ c.getFirst(keyDBE,dataDBE,null);
+ StoreBlock oldStoreBlock = (StoreBlock)
storeBlockTupleBinding.entryToObject(dataDBE);
+ c.delete();
+ c.close();
+ StoreBlock storeBlock = new
StoreBlock(oldStoreBlock.getOffset());
+ DatabaseEntry blockDBE = new DatabaseEntry();
+
storeBlockTupleBinding.objectToEntry(storeBlock, blockDBE);
+ chkDB.put(t,routingkeyDBE,blockDBE);
+
chkStore.seek(storeBlock.getOffset()*(long)(CHK_DATA_BLOCK_SIZE+CHK_HEADER_BLOCK_SIZE));
+ chkStore.write(header);
+ chkStore.write(data);
+ t.commit();
+ }
+ }
+
+ Logger.minor(this, "Put key: "+block.getKey());
+ Logger.minor(this, "Headers: "+header.length+" bytes, hash
"+Fields.hashCode(header));
+ Logger.minor(this, "Data: "+data.length+" bytes, hash
"+Fields.hashCode(data));
+
+ }catch(Exception ex) { // FIXME: ugly
+ if(t!=null){
+ try{t.abort();}catch(DatabaseException ex2){};
+ }
+ Logger.error(this, "Caught "+ex, ex);
+ ex.printStackTrace();
+ throw new IOException(ex.getMessage());
+ }
+ }
+
+ private class StoreBlock
+ {
+ private long recentlyUsed;
+ private int offset;
+
+ public StoreBlock(int offset)
+ {
+ this(offset,getNewRecentlyUsed());
+ }
+
+ public StoreBlock(int offset,long recentlyUsed)
+ {
+ this.offset = offset;
+ this.recentlyUsed = recentlyUsed;
+ }
+
+
+ public long getRecentlyUsed() {
+ return recentlyUsed;
+ }
+
+ public void setRecentlyUsedToZero()
+ {
+ recentlyUsed = 0;
+ }
+
+ public void updateRecentlyUsed()
+ {
+ recentlyUsed = getNewRecentlyUsed();
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+ }
+
+ /**
+ * Convert StoreBlock's to the format used by the database
+ */
+ private class StoreBlockTupleBinding extends TupleBinding
+ {
+
+ public void objectToEntry(Object object, TupleOutput to) {
+ StoreBlock myData = (StoreBlock)object;
+
+ to.writeInt(myData.getOffset());
+ to.writeLong(myData.getRecentlyUsed());
+ }
+
+ public Object entryToObject(TupleInput ti) {
+ int offset = ti.readInt();
+ long lastAccessed = ti.readLong();
+
+ StoreBlock storeBlock = new StoreBlock(offset,lastAccessed);
+ return storeBlock;
+ }
+ }
+
+ /**
+ * Used to create the secondary database sorted on accesstime
+ */
+ private class AccessTimeKeyCreator implements SecondaryKeyCreator {
+ private TupleBinding theBinding;
+
+ public AccessTimeKeyCreator(TupleBinding theBinding1) {
+ theBinding = theBinding1;
+ }
+
+ public boolean createSecondaryKey(SecondaryDatabase secDb,
+ DatabaseEntry keyEntry,
+ DatabaseEntry dataEntry,
+ DatabaseEntry resultEntry) {
+
+ StoreBlock storeblock = (StoreBlock)
theBinding.entryToObject(dataEntry);
+ Long accessTime = new Long(storeblock.getRecentlyUsed());
+ longTupleBinding.objectToEntry(accessTime, resultEntry);
+ return true;
+ }
+ }
+
+ private class ShutdownHook extends Thread
+ {
+ public void run() {
+ close();
+ }
+ }
+
+ private void close() {
+ try{
+ // FIXME: we should be sure all access to the
database has stopped
+ // before we try to close it.
Currently we just guess
+ // This is nothing too problematic however
since the worst thing that should
+ // happen is that we miss the last few
store()'s and get an exception.
+ Logger.minor(this, "Closing database.");
+ closed=true;
+ // Give all threads some time to complete
+ Thread.sleep(5000);
+ chkStore.close();
+ chkDB_accessTime.close();
+ chkDB.close();
+ environment.close();
+ Logger.minor(this, "Closing database finished.");
+ }catch(Exception ex){
+ Logger.error(this,"Error while closing database.",ex);
+ ex.printStackTrace();
+ }
+ }
+
+ private int countCHKBlocks() {
+ int count =0;
+ try{
+ Logger.minor(this, "Started counting items in database");
+
+ count =
(int)((BtreeStats)chkDB.getStats(null)).getLeafNodeCount();
+
+ Logger.minor(this, "Counted "+count+" items in database");
+ }catch(DatabaseException ex){
+ Logger.minor(this, "Exception while counting items in
database",ex);
+ }
+ return count;
+ }
+
+ private long getMaxRecentlyUsed()
+ {
+ long maxRecentlyUsed = 0;
+
+ try{
+ Cursor c = chkDB_accessTime.openCursor(null,null);
+ DatabaseEntry keyDBE = new DatabaseEntry();
+ DatabaseEntry dataDBE = new DatabaseEntry();
+
if(c.getLast(keyDBE,dataDBE,null)==OperationStatus.SUCCESS) {
+ StoreBlock storeBlock = (StoreBlock)
storeBlockTupleBinding.entryToObject(dataDBE);
+ maxRecentlyUsed = storeBlock.getRecentlyUsed();
+ }
+ c.close();
+ }catch(DatabaseException ex){ex.printStackTrace();}
+
+ return maxRecentlyUsed;
+ }
+
+ private synchronized long getNewRecentlyUsed() {
+ lastRecentlyUsed++;
+ return lastRecentlyUsed;
+ }
+}
\ No newline at end of file