Author: tubbie
Date: 2005-12-15 09:31:34 +0000 (Thu, 15 Dec 2005)
New Revision: 7710

Modified:
   branches/bdb/src/freenet/store/BerkelyDBFreenetStore.java
Log:
The data and headers are now stored in a seperate file

Modified: branches/bdb/src/freenet/store/BerkelyDBFreenetStore.java
===================================================================
--- branches/bdb/src/freenet/store/BerkelyDBFreenetStore.java   2005-12-12 
22:27:27 UTC (rev 7709)
+++ branches/bdb/src/freenet/store/BerkelyDBFreenetStore.java   2005-12-15 
09:31:34 UTC (rev 7710)
@@ -3,6 +3,7 @@
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.RandomAccessFile;

 import com.sleepycat.bind.tuple.TupleBinding;
 import com.sleepycat.bind.tuple.TupleInput;
@@ -35,27 +36,31 @@
  * @author tubbie
  * 
  * TODO: Fix ugly Exception handling
+ * TODO: Don't use timestamps
  */
 public class BerkelyDBFreenetStore implements FreenetStore {

+    static final int CHK_DATA_BLOCK_SIZE = 32*1024;
+    static final int CHK_HEADER_BLOCK_SIZE = 36;
+       
        private final Environment environment;
        private final TupleBinding storeBlockTupleBinding;
        private final TupleBinding longTupleBinding;
+       
+       private int chkBlocksInStore;
+       private final int maxChkBlocks;
        private final Database chkDB;
        private final Database chkDB_accessTime;
+       private final RandomAccessFile chkStore;

        private boolean closed = false;

-       private final StoreDeleter storeDeleter;
-       
-       private final int maxBlocks;
-       
        /**
      * Initializes database
      * @param the directory where the store is located
      * @throws FileNotFoundException if the dir does not exist and could not 
be created
      */
-       public BerkelyDBFreenetStore(String storeDir,int maxBlocks) throws 
Exception
+       public BerkelyDBFreenetStore(String storeDir,int maxChkBlocks) throws 
Exception
        {
                // Percentage of the database that must contain usefull data
                // decrease to increase performance, increase to save disk space
@@ -67,7 +72,7 @@
                // Percentage of the maximum heap size used as a cache
                System.setProperty("je.maxMemoryPercent","30");

-               this.maxBlocks=maxBlocks;
+               this.maxChkBlocks=maxChkBlocks;

                // Initialize environment
                EnvironmentConfig envConfig = new EnvironmentConfig();
@@ -77,8 +82,11 @@
                File dir = new File(storeDir);
                if(!dir.exists())
                        dir.mkdir();
+               File dbDir = new File(dir,"database");
+               if(!dbDir.exists())
+                       dbDir.mkdir();

-               environment = new Environment(new File(storeDir), envConfig);
+               environment = new Environment(dbDir, envConfig);

                // Initialize CHK database
                DatabaseConfig dbConfig = new DatabaseConfig();
@@ -99,12 +107,16 @@
                chkDB_accessTime = environment.openSecondaryDatabase
                                                        (null, 
"CHK_accessTime", chkDB, secDbConfig);

-               // Initialize thread that deletes items when the store gets full
-               storeDeleter = new StoreDeleter();
-               (new Thread(storeDeleter,"BDBStore: StoreDeleter")).start();
+               // Initialize the store file
+               File storeFile = new File(dir,"store");
+               if(!storeFile.exists())
+                       storeFile.createNewFile();
+               chkStore = new RandomAccessFile(storeFile,"rw");

                // Add shutdownhook
                Runtime.getRuntime().addShutdownHook(new ShutdownHook());
+               
+               chkBlocksInStore = countCHKBlocks();
        }

        /**
@@ -127,6 +139,8 @@

                if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.DEFAULT)
                                !=OperationStatus.SUCCESS) {
+                       c.close();
+                       t.abort();
                        return null;
                }

@@ -134,8 +148,17 @@

                CHKBlock block = null;
                try{
-                       block = new 
CHKBlock(storeBlock.getData(),storeBlock.getHeader(),chk);
+                       byte[] header = new byte[CHK_HEADER_BLOCK_SIZE];
+                       byte[] data = new byte[CHK_DATA_BLOCK_SIZE];
+                       synchronized(chkStore) {
+                               
chkStore.seek(storeBlock.offset*(long)(CHK_DATA_BLOCK_SIZE+CHK_HEADER_BLOCK_SIZE));
+                               chkStore.read(header);
+                               chkStore.read(data);
+                       }

+                       
+                       block = new CHKBlock(data,header,chk);
+                       
                        if(!dontPromote)
                        {
                                storeBlock.updateAccessTime();
@@ -144,24 +167,30 @@
                                c.putCurrent(updateDBE);
                                c.close();
                                t.commit();
+                       }else{
+                               c.close();
+                               t.abort();
                        }

                        Logger.minor(this, "Get key: "+chk);
-                   Logger.minor(this, "Headers: 
"+storeBlock.getHeader().length+" bytes, hash 
"+Fields.hashCode(storeBlock.getHeader()));
-                   Logger.minor(this, "Data: "+storeBlock.getData().length+" 
bytes, hash "+Fields.hashCode(storeBlock.getData()));
+                   Logger.minor(this, "Headers: "+header.length+" bytes, hash 
"+header);
+                   Logger.minor(this, "Data: "+data.length+" bytes, hash 
"+data);

                }catch(CHKVerifyException ex){
-                       Logger.normal(this, "Does not verify, deleting: "+chk);
-                       c.delete();
+                       Logger.normal(this, "Does not verify, setting 
accessTime to 0 for : "+chk);
+                       storeBlock.setAccessTime(0);
+                       DatabaseEntry updateDBE = new DatabaseEntry();
+                       storeBlockTupleBinding.objectToEntry(storeBlock, 
updateDBE);
+                       c.putCurrent(updateDBE);
                        c.close();
-                       t.abort();
-                       storeDeleter.deletedItem();
+                       t.commit();
                    return null;
                }
                return block;
-       }catch(DatabaseException ex) {  // FIXME: ugly  
+       }catch(Exception ex) {  // FIXME: ugly  
                if(c!=null)
                        try{c.close();}catch(DatabaseException ex2){}
+                       ex.printStackTrace();
                new IOException(ex.getMessage());
         }

@@ -175,28 +204,63 @@
     {          
        if(closed)
                return;
-       
+               
        byte[] routingkey = ((NodeCHK)block.getKey()).getRoutingKey();
         byte[] data = block.getData();
         byte[] header = block.getHeader();
-        StoreBlock storeBlock = new StoreBlock(data,header);
-
+        
+        if(data.length!=CHK_DATA_BLOCK_SIZE) {
+               Logger.minor(this, "This data is "+data.length+" bytes. Should 
be "+CHK_DATA_BLOCK_SIZE);
+               return;
+        }
+        if(header.length!=CHK_HEADER_BLOCK_SIZE) {
+               Logger.minor(this, "This header is "+data.length+" bytes. 
Should be "+CHK_HEADER_BLOCK_SIZE);
+               return;
+        }
+        
         Transaction t = null;
+        
         try{
-               DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
-               
-               DatabaseEntry blockDBE = new DatabaseEntry();
-               storeBlockTupleBinding.objectToEntry(storeBlock, blockDBE);
-               
-               t = environment.beginTransaction(null,null); 
+               t = environment.beginTransaction(null,null);
+               DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
+               
+               synchronized(chkStore) {
+                       if(chkBlocksInStore<maxChkBlocks) {
+                               // Expand the store file
+                               int byteOffset = 
chkBlocksInStore*(CHK_DATA_BLOCK_SIZE+CHK_HEADER_BLOCK_SIZE);
+                               StoreBlock storeBlock = new 
StoreBlock(chkBlocksInStore);
+                               DatabaseEntry blockDBE = new DatabaseEntry();
+                       storeBlockTupleBinding.objectToEntry(storeBlock, 
blockDBE);
+                       chkDB.put(t,routingkeyDBE,blockDBE);
+                       chkStore.seek(byteOffset);
+                       chkStore.write(header);
+                       chkStore.write(data);
+                       t.commit();
+                       chkBlocksInStore++;
+                       }else{
+                               // Overwrite an other block
+                               Cursor c = chkDB_accessTime.openCursor(t,null);
+                               DatabaseEntry keyDBE = new DatabaseEntry();
+                               DatabaseEntry dataDBE = new DatabaseEntry();
+                               c.getFirst(keyDBE,dataDBE,null);
+                               StoreBlock oldStoreBlock = (StoreBlock) 
storeBlockTupleBinding.entryToObject(dataDBE);
+                               c.delete();
+                               c.close();
+                               StoreBlock storeBlock = new 
StoreBlock(oldStoreBlock.getOffset());
+                               DatabaseEntry blockDBE = new DatabaseEntry();
+                               
storeBlockTupleBinding.objectToEntry(storeBlock, blockDBE);
+                               chkDB.put(t,routingkeyDBE,blockDBE);
+                       
chkStore.seek(storeBlock.getOffset()*(long)(CHK_DATA_BLOCK_SIZE+CHK_HEADER_BLOCK_SIZE));
+                       chkStore.write(header);
+                       chkStore.write(data);
+                               t.commit();
+                       }
+               }
+               
                Logger.minor(this, "Put key: "+block.getKey());
                Logger.minor(this, "Headers: "+header.length+" bytes, hash 
"+Fields.hashCode(header));
                Logger.minor(this, "Data: "+data.length+" bytes, hash 
"+Fields.hashCode(data));
-               chkDB.put(t,routingkeyDBE,blockDBE);
-               t.commit();
-               
-               storeDeleter.addedItem();
-               
+                
         }catch(Exception ex) {  // FIXME: ugly  
                if(t!=null){
                        try{t.abort();}catch(DatabaseException ex2){};
@@ -208,31 +272,36 @@

     private class StoreBlock
     {
-       private final byte[] data, header;
        private long lastAccessed;
-       public StoreBlock(byte[] data,byte[] header)
+       private int offset;
+       
+       public StoreBlock(int offset)
        {
-               this(data,header,System.currentTimeMillis());
+               this(offset,System.currentTimeMillis());
        }

-       public StoreBlock(byte[] data,byte[] header,long lastAccessed)
+       public StoreBlock(int offset,long lastAccessed)
        {
-               this.data=data;
-               this.header=header;
+               this.offset = offset;
                this.lastAccessed = lastAccessed;
        }
-       
-       public byte[] getData()         {return data;}
-       public byte[] getHeader()       {return header;}
-       
+               
        public void updateAccessTime() {
                lastAccessed = System.currentTimeMillis();
        }

-       public long getLastAccessed()
-       {
+       public long getLastAccessed() {
                return lastAccessed;
        }
+       
+       public void setAccessTime(long time)
+       {
+               lastAccessed = time;
+       }
+       
+       public int getOffset() {
+               return offset;
+       }
     }

     /**
@@ -244,41 +313,17 @@
        public void objectToEntry(Object object, TupleOutput to)  {
                StoreBlock myData = (StoreBlock)object;

-               writeByteArray(myData.getHeader(),to);
-               writeByteArray(myData.getData(),to);
+               to.writeInt(myData.getOffset());
                to.writeLong(myData.getLastAccessed());
-
        }

        public Object entryToObject(TupleInput ti) {
-               byte[] header = readByteArray(ti);
-               byte[] data = readByteArray(ti);
+               int offset = ti.readInt();
                long lastAccessed = ti.readLong();

-               StoreBlock storeBlock = new 
StoreBlock(data,header,lastAccessed);
+               StoreBlock storeBlock = new StoreBlock(offset,lastAccessed);
                return storeBlock;
        }
-       
-       private byte[] readByteArray(TupleInput ti) {
-               try{
-                       char size = ti.readChar();
-                       byte[] data = new byte[size];
-                       char read = 0;
-                       while(read<size)
-                               read +=ti.read(data,read,size-read);
-                       
-                       return data;
-               }catch(IOException ex){}
-               return null;
-       }
-       
-       private void writeByteArray(byte[] data,TupleOutput to)
-       {
-               try {
-                       to.writeChar(data.length);
-                       to.write(data);
-               }catch(IOException ex){}
-       }
     }

     /**
@@ -303,85 +348,6 @@
        }
     }

-    /** 
-     * Cleans items from the store when it's full
-     */
-    private class StoreDeleter implements Runnable
-    {
-       private boolean running=true;
-       private boolean stopped=false;
-       private int currentItems;
-       
-       public StoreDeleter() {
-               currentItems = countCHKBlocks();
-       }
-       
-       
-       
-       public synchronized void addedItem() {
-               currentItems++;
-               this.notify();
-       }
-       
-       public synchronized void deletedItem() {
-               currentItems--;
-       }
-       
-       public void run() {
-               while(running){         
-                       synchronized(this) {
-                               while(currentItems < maxBlocks && running)
-                                       try{wait();}catch(InterruptedException 
ex){}
-                       }
-                       while(currentItems>maxBlocks){
-                               removeOldestItem();
-                       }
-               }
-
-               stopped=true;
-               synchronized(this){
-                       this.notify();
-               }
-       }
-       
-       private void removeOldestItem() {
-               Logger.normal(this, "Deleting oldest item in store");
-               Cursor c = null;
-               try{
-                       Transaction t = environment.beginTransaction(null,null);
-                       c = chkDB_accessTime.openCursor(t,null);
-                       
-                       DatabaseEntry keyDBE = new DatabaseEntry();
-                       DatabaseEntry dataDBE = new DatabaseEntry();
-                       
-                       
if(c.getFirst(keyDBE,dataDBE,null).equals(OperationStatus.SUCCESS))
-                       {
-                               c.delete();
-                               c.close();
-                               t.commit();
-                               deletedItem();
-                       }else{
-                               t.abort();
-                       }
-
-                       
-               }catch(Exception ex) {
-                       ex.printStackTrace();
-                       if(c!=null)
-                               try{c.close();}catch(DatabaseException ex2){}
-                       }
-        }
-       
-       public synchronized void close() {
-               running = false;
-               this.notify();
-               
-               // Block till we actually closed
-               while(!stopped)
-                       try{wait();}catch(InterruptedException ex){}
-       }
-    }
-
     private class ShutdownHook extends Thread
     {
        public void run() {
@@ -396,10 +362,10 @@
                //                      This is nothing too problematic however 
since the worst thing that should
                //                      happen is that we miss the last few 
store()'s and get an exception.
                        Logger.minor(this, "Closing database.");
-                       storeDeleter.close();
                        closed=true;
                        // Give all threads some time to complete
-                       Thread.sleep(3000);
+                       Thread.sleep(5000);
+                       chkStore.close();
                chkDB_accessTime.close();
                chkDB.close();
                environment.close();


Reply via email to