Author: tubbie
Date: 2005-12-12 22:27:27 +0000 (Mon, 12 Dec 2005)
New Revision: 7709

Added:
   branches/bdb/src/freenet/store/BerkelyDBFreenetStore.java
Modified:
   branches/bdb/src/freenet/keys/NodeCHK.java
Log:
Initial commit of a store based on BerkelyDB. This store stores everything in 
the database itself.

Modified: branches/bdb/src/freenet/keys/NodeCHK.java
===================================================================
--- branches/bdb/src/freenet/keys/NodeCHK.java  2005-12-12 22:19:04 UTC (rev 
7708)
+++ branches/bdb/src/freenet/keys/NodeCHK.java  2005-12-12 22:27:27 UTC (rev 
7709)
@@ -90,4 +90,8 @@
         cachedNormalizedDouble = ((double)asLong)/((double)Long.MAX_VALUE);
         return cachedNormalizedDouble;
     }
+    
+    public byte[] getRoutingKey(){
+       return routingKey;
+    }
 }

Added: branches/bdb/src/freenet/store/BerkelyDBFreenetStore.java
===================================================================
--- branches/bdb/src/freenet/store/BerkelyDBFreenetStore.java   2005-12-12 
22:19:04 UTC (rev 7708)
+++ branches/bdb/src/freenet/store/BerkelyDBFreenetStore.java   2005-12-12 
22:27:27 UTC (rev 7709)
@@ -0,0 +1,426 @@
+package freenet.store;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.BtreeStats;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.SecondaryConfig;
+import com.sleepycat.je.SecondaryDatabase;
+import com.sleepycat.je.SecondaryKeyCreator;
+import com.sleepycat.je.Transaction;
+
+import freenet.keys.CHKBlock;
+import freenet.keys.CHKVerifyException;
+import freenet.keys.NodeCHK;
+import freenet.support.Fields;
+import freenet.support.Logger;
+
+/** 
+ * Freenet datastore based on BerkelyDB Java Edition by sleepycat software
+ * More info at http://www.sleepycat.com/products/bdbje.html
+ * 
+ * @author tubbie
+ * 
+ * TODO: Fix ugly Exception handling
+ */
+public class BerkelyDBFreenetStore implements FreenetStore {
+
+       private final Environment environment;
+       private final TupleBinding storeBlockTupleBinding;
+       private final TupleBinding longTupleBinding;
+       private final Database chkDB;
+       private final Database chkDB_accessTime;
+       
+       private boolean closed = false;
+       
+       private final StoreDeleter storeDeleter;
+       
+       private final int maxBlocks;
+       
+       /**
+     * Initializes database
+     * @param the directory where the store is located
+     * @throws FileNotFoundException if the dir does not exist and could not 
be created
+     */
+       public BerkelyDBFreenetStore(String storeDir,int maxBlocks) throws 
Exception
+       {
+               // Percentage of the database that must contain usefull data
+               // decrease to increase performance, increase to save disk space
+               System.setProperty("je.cleaner.minUtilization","98");
+               
+               // Delete empty log files
+               System.setProperty("je.cleaner.expunge","true");
+               
+               // Percentage of the maximum heap size used as a cache
+               System.setProperty("je.maxMemoryPercent","30");
+               
+               this.maxBlocks=maxBlocks;
+               
+               // Initialize environment
+               EnvironmentConfig envConfig = new EnvironmentConfig();
+               envConfig.setAllowCreate(true);
+               envConfig.setTransactional(true);
+               envConfig.setTxnWriteNoSync(true);
+               File dir = new File(storeDir);
+               if(!dir.exists())
+                       dir.mkdir();
+
+               environment = new Environment(new File(storeDir), envConfig);
+               
+               // Initialize CHK database
+               DatabaseConfig dbConfig = new DatabaseConfig();
+               dbConfig.setAllowCreate(true);
+               dbConfig.setTransactional(true);
+               chkDB = environment.openDatabase(null,"CHK",dbConfig);
+                               
+               // Initialize secondary CHK database sorted on accesstime
+               SecondaryConfig secDbConfig = new SecondaryConfig();
+               secDbConfig.setAllowCreate(true);
+               secDbConfig.setSortedDuplicates(true);
+               secDbConfig.setTransactional(true);
+               storeBlockTupleBinding = new StoreBlockTupleBinding();
+               longTupleBinding = TupleBinding.getPrimitiveBinding(Long.class);
+               AccessTimeKeyCreator accessTimeKeyCreator = 
+                       new AccessTimeKeyCreator(storeBlockTupleBinding);
+               secDbConfig.setKeyCreator(accessTimeKeyCreator);
+               chkDB_accessTime = environment.openSecondaryDatabase
+                                                       (null, 
"CHK_accessTime", chkDB, secDbConfig);
+               
+               // Initialize thread that deletes items when the store gets full
+               storeDeleter = new StoreDeleter();
+               (new Thread(storeDeleter,"BDBStore: StoreDeleter")).start();
+               
+               // Add shutdownhook
+               Runtime.getRuntime().addShutdownHook(new ShutdownHook());
+       }
+       
+       /**
+     * Retrieve a block.
+     * @param dontPromote If true, don't promote data if fetched.
+     * @return null if there is no such block stored, otherwise the block.
+     */
+    public CHKBlock fetch(NodeCHK chk, boolean dontPromote) throws IOException
+    {
+       if(closed)
+               return null;
+       
+       byte[] routingkey = chk.getRoutingKey();
+       DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
+       DatabaseEntry blockDBE = new DatabaseEntry();
+       Cursor c = null;
+       try{
+               Transaction t = environment.beginTransaction(null,null);
+               c = chkDB.openCursor(t,null);
+               
+               if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.DEFAULT)
+                               !=OperationStatus.SUCCESS) {
+                       return null;
+               }
+
+               StoreBlock storeBlock = (StoreBlock) 
storeBlockTupleBinding.entryToObject(blockDBE);
+                               
+               CHKBlock block = null;
+               try{
+                       block = new 
CHKBlock(storeBlock.getData(),storeBlock.getHeader(),chk);
+                       
+                       if(!dontPromote)
+                       {
+                               storeBlock.updateAccessTime();
+                               DatabaseEntry updateDBE = new DatabaseEntry();
+                               
storeBlockTupleBinding.objectToEntry(storeBlock, updateDBE);
+                               c.putCurrent(updateDBE);
+                               c.close();
+                               t.commit();
+                       }
+                       
+                       Logger.minor(this, "Get key: "+chk);
+                   Logger.minor(this, "Headers: 
"+storeBlock.getHeader().length+" bytes, hash 
"+Fields.hashCode(storeBlock.getHeader()));
+                   Logger.minor(this, "Data: "+storeBlock.getData().length+" 
bytes, hash "+Fields.hashCode(storeBlock.getData()));
+                       
+               }catch(CHKVerifyException ex){
+                       Logger.normal(this, "Does not verify, deleting: "+chk);
+                       c.delete();
+                       c.close();
+                       t.abort();
+                       storeDeleter.deletedItem();
+                   return null;
+               }
+               return block;
+       }catch(DatabaseException ex) {  // FIXME: ugly  
+               if(c!=null)
+                       try{c.close();}catch(DatabaseException ex2){}
+               new IOException(ex.getMessage());
+        }
+       
+       return null;
+    }
+
+    /**
+     * Store a block.
+     */
+    public void put(CHKBlock block) throws IOException
+    {          
+       if(closed)
+               return;
+       
+       byte[] routingkey = ((NodeCHK)block.getKey()).getRoutingKey();
+        byte[] data = block.getData();
+        byte[] header = block.getHeader();
+        StoreBlock storeBlock = new StoreBlock(data,header);
+
+        Transaction t = null;
+        try{
+               DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
+               
+               DatabaseEntry blockDBE = new DatabaseEntry();
+               storeBlockTupleBinding.objectToEntry(storeBlock, blockDBE);
+               
+               t = environment.beginTransaction(null,null); 
+               Logger.minor(this, "Put key: "+block.getKey());
+               Logger.minor(this, "Headers: "+header.length+" bytes, hash 
"+Fields.hashCode(header));
+               Logger.minor(this, "Data: "+data.length+" bytes, hash 
"+Fields.hashCode(data));
+               chkDB.put(t,routingkeyDBE,blockDBE);
+               t.commit();
+               
+               storeDeleter.addedItem();
+               
+        }catch(Exception ex) {  // FIXME: ugly  
+               if(t!=null){
+                       try{t.abort();}catch(DatabaseException ex2){};
+               }
+               ex.printStackTrace();
+               new IOException(ex.getMessage());
+        }
+    }
+    
+    private class StoreBlock
+    {
+       private final byte[] data, header;
+       private long lastAccessed;
+       public StoreBlock(byte[] data,byte[] header)
+       {
+               this(data,header,System.currentTimeMillis());
+       }
+       
+       public StoreBlock(byte[] data,byte[] header,long lastAccessed)
+       {
+               this.data=data;
+               this.header=header;
+               this.lastAccessed = lastAccessed;
+       }
+       
+       public byte[] getData()         {return data;}
+       public byte[] getHeader()       {return header;}
+       
+       public void updateAccessTime() {
+               lastAccessed = System.currentTimeMillis();
+       }
+       
+       public long getLastAccessed()
+       {
+               return lastAccessed;
+       }
+    }
+        
+    /**
+     * Convert StoreBlock's to the format used by the database
+     */
+    private class StoreBlockTupleBinding extends TupleBinding
+    {
+
+       public void objectToEntry(Object object, TupleOutput to)  {
+               StoreBlock myData = (StoreBlock)object;
+
+               writeByteArray(myData.getHeader(),to);
+               writeByteArray(myData.getData(),to);
+               to.writeLong(myData.getLastAccessed());
+
+       }
+
+       public Object entryToObject(TupleInput ti) {
+               byte[] header = readByteArray(ti);
+               byte[] data = readByteArray(ti);
+               long lastAccessed = ti.readLong();
+               
+               StoreBlock storeBlock = new 
StoreBlock(data,header,lastAccessed);
+               return storeBlock;
+       }
+       
+       private byte[] readByteArray(TupleInput ti) {
+               try{
+                       char size = ti.readChar();
+                       byte[] data = new byte[size];
+                       char read = 0;
+                       while(read<size)
+                               read +=ti.read(data,read,size-read);
+                       
+                       return data;
+               }catch(IOException ex){}
+               return null;
+       }
+       
+       private void writeByteArray(byte[] data,TupleOutput to)
+       {
+               try {
+                       to.writeChar(data.length);
+                       to.write(data);
+               }catch(IOException ex){}
+       }
+    }
+      
+    /**
+     * Used to create the secondary database sorted on accesstime
+     */
+    private class AccessTimeKeyCreator implements SecondaryKeyCreator {
+       private TupleBinding theBinding;
+       
+       public AccessTimeKeyCreator(TupleBinding theBinding1) {
+               theBinding = theBinding1;
+       }
+       
+       public boolean createSecondaryKey(SecondaryDatabase secDb,
+       DatabaseEntry keyEntry,
+       DatabaseEntry dataEntry,
+       DatabaseEntry resultEntry) {
+
+               StoreBlock storeblock = (StoreBlock) 
theBinding.entryToObject(dataEntry);
+               Long accessTime = new Long(storeblock.getLastAccessed());
+               longTupleBinding.objectToEntry(accessTime, resultEntry);
+               return true;
+       }
+    }
+
+    /** 
+     * Cleans items from the store when it's full
+     */
+    private class StoreDeleter implements Runnable
+    {
+       private boolean running=true;
+       private boolean stopped=false;
+       private int currentItems;
+       
+       public StoreDeleter() {
+               currentItems = countCHKBlocks();
+       }
+       
+       
+       
+       public synchronized void addedItem() {
+               currentItems++;
+               this.notify();
+       }
+       
+       public synchronized void deletedItem() {
+               currentItems--;
+       }
+       
+       public void run() {
+               while(running){         
+                       synchronized(this) {
+                               while(currentItems < maxBlocks && running)
+                                       try{wait();}catch(InterruptedException 
ex){}
+                       }
+                       while(currentItems>maxBlocks){
+                               removeOldestItem();
+                       }
+               }
+
+               stopped=true;
+               synchronized(this){
+                       this.notify();
+               }
+       }
+       
+       private void removeOldestItem() {
+               Logger.normal(this, "Deleting oldest item in store");
+               Cursor c = null;
+               try{
+                       Transaction t = environment.beginTransaction(null,null);
+                       c = chkDB_accessTime.openCursor(t,null);
+                       
+                       DatabaseEntry keyDBE = new DatabaseEntry();
+                       DatabaseEntry dataDBE = new DatabaseEntry();
+                       
+                       
if(c.getFirst(keyDBE,dataDBE,null).equals(OperationStatus.SUCCESS))
+                       {
+                               c.delete();
+                               c.close();
+                               t.commit();
+                               deletedItem();
+                       }else{
+                               t.abort();
+                       }
+
+                       
+               }catch(Exception ex) {
+                       ex.printStackTrace();
+                       if(c!=null)
+                               try{c.close();}catch(DatabaseException ex2){}
+                       }
+        }
+       
+       public synchronized void close() {
+               running = false;
+               this.notify();
+               
+               // Block till we actually closed
+               while(!stopped)
+                       try{wait();}catch(InterruptedException ex){}
+       }
+    }
+
+    private class ShutdownHook extends Thread
+    {
+       public void run() {
+               close();
+       }
+    }
+    
+    private void close() {
+       try{
+                       // FIXME:       we should be sure all access to the 
database has stopped
+                       //                      before we try to close it. 
Currently we just guess
+               //                      This is nothing too problematic however 
since the worst thing that should
+               //                      happen is that we miss the last few 
store()'s and get an exception.
+                       Logger.minor(this, "Closing database.");
+                       storeDeleter.close();
+                       closed=true;
+                       // Give all threads some time to complete
+                       Thread.sleep(3000);
+               chkDB_accessTime.close();
+               chkDB.close();
+               environment.close();
+               Logger.minor(this, "Closing database finished.");
+               }catch(Exception ex){
+                       Logger.error(this,"Error while closing database.",ex);
+                       ex.printStackTrace();
+               }
+    }
+    
+    private int countCHKBlocks() {
+       int count =0;
+       try{
+               Logger.minor(this, "Started counting items in database");
+               
+               count = 
(int)((BtreeStats)chkDB.getStats(null)).getLeafNodeCount();
+               
+               Logger.minor(this, "Counted "+count+" items in database");
+       }catch(DatabaseException ex){
+               Logger.minor(this, "Exception while counting items in 
database",ex);
+       }
+       return count;
+    }
+}
\ No newline at end of file


Reply via email to