Author: toad
Date: 2006-06-20 19:28:02 +0000 (Tue, 20 Jun 2006)
New Revision: 9318

Modified:
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
Log:
Don't use the node lock to serialize accesses to the datastore.
One big lock per datastore.
Also the store code is just about ready to have no big lock, but I want to be 
sure it works with a big lock first as we've been having some wierd reports.
Also some minor fixes to the datastore.

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2006-06-20 18:52:48 UTC (rev 
9317)
+++ trunk/freenet/src/freenet/node/Node.java    2006-06-20 19:28:02 UTC (rev 
9318)
@@ -1812,17 +1812,15 @@
                        throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
                }
                long startTime = System.currentTimeMillis();
-               synchronized(this) {
-                       if(cache) {
-                               try {
-                                       chkDatastore.put(block);
-                               } catch (IOException e) {
-                                       Logger.error(this, "Datastore failure: 
"+e, e);
-                               }
+               if(cache) {
+                       try {
+                               chkDatastore.put(block);
+                       } catch (IOException e) {
+                               Logger.error(this, "Datastore failure: "+e, e);
                        }
-                       is = 
makeInsertSender((NodeCHK)block.getClientKey().getNodeKey(), 
-                                       MAX_HTL, uid, null, headers, prb, 
false, lm.getLocation().getValue(), cache);
                }
+               is = 
makeInsertSender((NodeCHK)block.getClientKey().getNodeKey(), 
+                               MAX_HTL, uid, null, headers, prb, false, 
lm.getLocation().getValue(), cache);
                boolean hasForwardedRejectedOverload = false;
                // Wait for status
                while(true) {
@@ -1914,19 +1912,17 @@
                        throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
                }
                long startTime = System.currentTimeMillis();
-               synchronized(this) {
-                       if(cache) {
-                               try {
-                                       sskDatastore.put(block, false);
-                               } catch (IOException e) {
-                                       Logger.error(this, "Datastore failure: 
"+e, e);
-                               } catch (KeyCollisionException e) {
-                                       throw new 
LowLevelPutException(LowLevelPutException.COLLISION);
-                               }
+               if(cache) {
+                       try {
+                               sskDatastore.put(block, false);
+                       } catch (IOException e) {
+                               Logger.error(this, "Datastore failure: "+e, e);
+                       } catch (KeyCollisionException e) {
+                               throw new 
LowLevelPutException(LowLevelPutException.COLLISION);
                        }
-                       is = makeInsertSender(block, 
-                                       MAX_HTL, uid, null, false, 
lm.getLocation().getValue(), cache);
                }
+               is = makeInsertSender(block, 
+                               MAX_HTL, uid, null, false, 
lm.getLocation().getValue(), cache);
                boolean hasForwardedRejectedOverload = false;
                // Wait for status
                while(true) {
@@ -1976,9 +1972,7 @@
                if(is.hasCollided()) {
                        // Store it locally so it can be fetched immediately, 
and overwrites any locally inserted.
                        try {
-                               synchronized(this) {
-                                       sskDatastore.put(is.getBlock(), true);
-                               }
+                               sskDatastore.put(is.getBlock(), true);
                        } catch (KeyCollisionException e) {
                                // Impossible
                        } catch (IOException e) {
@@ -2368,7 +2362,7 @@
                }
        }

-       public synchronized SSKBlock fetch(NodeSSK key) {
+       public SSKBlock fetch(NodeSSK key) {
                // Can we just lock on sskDatastore here?  **FIXME**
                try {
                        return sskDatastore.fetch(key, false);
@@ -2378,8 +2372,7 @@
                }
        }

-       public synchronized CHKBlock fetch(NodeCHK key) {
-               // Can we just lock on chkDatastore here?  **FIXME**
+       public CHKBlock fetch(NodeCHK key) {
                try {
                        return chkDatastore.fetch(key, false);
                } catch (IOException e) {
@@ -2391,8 +2384,7 @@
        /**
         * Store a datum.
         */
-       public synchronized void store(CHKBlock block) {
-               // Can we just lock on chkDatastore here?  **FIXME**
+       public void store(CHKBlock block) {
                try {
                        chkDatastore.put(block);
                } catch (IOException e) {
@@ -2400,7 +2392,7 @@
                }
        }

-       public synchronized void store(SSKBlock block) throws 
KeyCollisionException {
+       public void store(SSKBlock block) throws KeyCollisionException {
                try {
                        sskDatastore.put(block, false);
                        cacheKey(((NodeSSK)block.getKey()).getPubKeyHash(), 
((NodeSSK)block.getKey()).getPubKey());

Modified: trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
===================================================================
--- trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2006-06-20 
18:52:48 UTC (rev 9317)
+++ trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2006-06-20 
19:28:02 UTC (rev 9318)
@@ -44,6 +44,11 @@
  * @author tubbie
  * 
  * TODO: Fix ugly Exception handling
+ * 
+ * 
+ * FIXME:
+ * This should in theory be threadsafe without the Big Lock.
+ * Remove the big lock, when we are sure that the major issues are sorted.
  */
 public class BerkeleyDBFreenetStore implements FreenetStore {

@@ -56,6 +61,7 @@
        private final File fixSecondaryFile;

        private long chkBlocksInStore;
+       private final Object chkBlocksInStoreLock = new Object();
        private long maxChkBlocks;
        private final Database chkDB;
        private final Database chkDB_accessTime;
@@ -63,6 +69,7 @@
        private final RandomAccessFile chkStore;

        private long lastRecentlyUsed;
+       private final Object lastRecentlyUsedSync = new Object();

        private boolean closed = false;

@@ -235,8 +242,7 @@
      * @param dontPromote If true, don't promote data if fetched.
      * @return null if there is no such block stored, otherwise the block.
      */
-    public CHKBlock fetch(NodeCHK chk, boolean dontPromote) throws IOException
-    {
+    public synchronized CHKBlock fetch(NodeCHK chk, boolean dontPromote) 
throws IOException {
        if(closed)
                return null;

@@ -261,6 +267,8 @@
                                !=OperationStatus.SUCCESS) {
                        c.close();
                        t.abort();
+                       t = null;
+                       c = null;
                        return null;
                }

@@ -272,7 +280,7 @@
                        byte[] data = new byte[dataBlockSize];
                        synchronized(chkStore) {
                                long seekTarget = 
storeBlock.offset*(long)(dataBlockSize+headerBlockSize);
-                       try {
+                               try {
                                        chkStore.seek(seekTarget);
                                } catch (IOException ioe) {
                                        if(seekTarget > (2*1024*1024*1024)) {
@@ -297,9 +305,13 @@
                                c.putCurrent(updateDBE);
                                c.close();
                                t.commit();
+                               c = null;
+                               t = null;
                        }else{
                                c.close();
                                t.abort();
+                               c = null;
+                               t = null;
                        }

                        Logger.minor(this, "Get key: "+chk);
@@ -337,8 +349,7 @@
      * @param dontPromote If true, don't promote data if fetched.
      * @return null if there is no such block stored, otherwise the block.
      */
-    public SSKBlock fetch(NodeSSK chk, boolean dontPromote) throws IOException
-    {
+    public synchronized SSKBlock fetch(NodeSSK chk, boolean dontPromote) 
throws IOException {
        if(closed)
                return null;

@@ -355,6 +366,8 @@
                                !=OperationStatus.SUCCESS) {
                        c.close();
                        t.abort();
+                       c = null;
+                       t = null;
                        return null;
                }

@@ -373,17 +386,20 @@

                        block = new SSKBlock(data,header,chk, true);

-                       if(!dontPromote)
-                       {
+                       if(!dontPromote) {
                                storeBlock.updateRecentlyUsed();
                                DatabaseEntry updateDBE = new DatabaseEntry();
                                
storeBlockTupleBinding.objectToEntry(storeBlock, updateDBE);
                                c.putCurrent(updateDBE);
                                c.close();
                                t.commit();
+                               c = null;
+                               t = null;
                        }else{
                                c.close();
                                t.abort();
+                               c = null;
+                               t = null;
                        }

                        Logger.minor(this, "Get key: "+chk);
@@ -398,6 +414,8 @@
                        c.putCurrent(updateDBE);
                        c.close();
                        t.commit();
+                       c = null;
+                       t = null;
                    return null;
                }
                return block;
@@ -419,7 +437,7 @@

     // FIXME do this with interfaces etc.

-    public DSAPublicKey fetchPubKey(byte[] hash, boolean dontPromote) throws 
IOException {
+    public synchronized DSAPublicKey fetchPubKey(byte[] hash, boolean 
dontPromote) throws IOException {
        return fetchPubKey(hash, null, dontPromote);
     }

@@ -429,8 +447,7 @@
      * @param replacement If non-null, and the data exists but is corrupt, 
replace it with this.
      * @return null if there is no such block stored, otherwise the block.
      */
-    public DSAPublicKey fetchPubKey(byte[] hash, DSAPublicKey replacement, 
boolean dontPromote) throws IOException
-    {
+    public synchronized DSAPublicKey fetchPubKey(byte[] hash, DSAPublicKey 
replacement, boolean dontPromote) throws IOException {
        if(closed)
                return null;

@@ -447,6 +464,8 @@
                                !=OperationStatus.SUCCESS) {
                        c.close();
                        t.abort();
+                       c = null;
+                       t = null;
                        return null;
                }

@@ -478,6 +497,8 @@
                                Logger.error(this, "Could not read key");
                                c.close();
                                t.abort();
+                               c = null;
+                               t = null;
                                return null;
                        }

@@ -498,6 +519,8 @@
                                        c.putCurrent(updateDBE);
                                        c.close();
                                        t.commit();
+                                       t = null;
+                                       c = null;
                                        return null;
                                }
                        }
@@ -505,6 +528,8 @@
                        // Finished, commit.
                        c.close();
                        t.commit();
+                       c = null;
+                       t = null;

                        Logger.minor(this, "Get key: 
"+HexUtil.bytesToHex(hash));
                    Logger.minor(this, "Data: "+data.length+" bytes, hash 
"+data);
@@ -526,7 +551,7 @@
 //     return null;
     }

-    public void put(CHKBlock b) throws IOException {
+    public synchronized void put(CHKBlock b) throws IOException {
                NodeCHK chk = (NodeCHK) b.getKey();
                CHKBlock oldBlock = fetch(chk, false);
                if(oldBlock != null)
@@ -534,7 +559,7 @@
                innerPut(b);
     }

-    public void put(SSKBlock b, boolean overwrite) throws IOException, 
KeyCollisionException {
+    public synchronized void put(SSKBlock b, boolean overwrite) throws 
IOException, KeyCollisionException {
                NodeSSK ssk = (NodeSSK) b.getKey();
                SSKBlock oldBlock = fetch(ssk, false);
                if(oldBlock != null) {
@@ -542,18 +567,15 @@
                                if(!overwrite)
                                        throw new KeyCollisionException();
                                else {
-                                       if(overwrite(b)) {
-                                               fetch(ssk, false); // promote it
-                                               return;
-                                       }
+                                       overwrite(b);
                                }
                        }
-                       return;
+               } else {
+                       innerPut(b);
                }
-               innerPut(b);
     }

-    private boolean overwrite(SSKBlock b) throws IOException {
+    private synchronized boolean overwrite(SSKBlock b) throws IOException {
        NodeSSK chk = (NodeSSK) b.getKey();
        byte[] routingkey = chk.getRoutingKey();
        DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
@@ -563,7 +585,8 @@
        try{
                t = environment.beginTransaction(null,null);
                c = chkDB.openCursor(t,null);
-               
+
+               // Lock the record.
                if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.RMW)
                                !=OperationStatus.SUCCESS) {
                        c.close();
@@ -577,12 +600,22 @@
                byte[] data = b.getRawData();
                synchronized(chkStore) {
                        
chkStore.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
-                       
                        chkStore.write(header);
                        chkStore.write(data);
                }
-                       
+               
+               // Unlock record.
+               c.close();
+               c = null;
+               t.commit();
+               t = null;
+               
            } catch(Throwable ex) {  // FIXME: ugly  
+               checkSecondaryDatabaseError(ex);
+               Logger.error(this, "Caught "+ex, ex);
+               ex.printStackTrace();
+               throw new IOException(ex.getMessage());
+        } finally {
                if(c!=null) {
                        try{c.close();}catch(DatabaseException ex2){}

@@ -590,11 +623,7 @@
                if(t!=null) {
                        try{t.abort();}catch(DatabaseException ex2){}
                }
-       
-               checkSecondaryDatabaseError(ex);
-               Logger.error(this, "Caught "+ex, ex);
-               ex.printStackTrace();
-               throw new IOException(ex.getMessage());
+               
         }

        return true;
@@ -603,8 +632,7 @@
        /**
      * Store a block.
      */
-    void innerPut(KeyBlock block) throws IOException
-    {          
+    private synchronized void innerPut(KeyBlock block) throws IOException {    
        if(closed)
                return;

@@ -627,47 +655,57 @@
                t = environment.beginTransaction(null,null);
                DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);

-               synchronized(chkStore) {
-                       if(chkBlocksInStore<maxChkBlocks) {
-                               // Expand the store file
-                               long byteOffset = 
chkBlocksInStore*(dataBlockSize+headerBlockSize);
-                               StoreBlock storeBlock = new 
StoreBlock(chkBlocksInStore);
-                               DatabaseEntry blockDBE = new DatabaseEntry();
-                       storeBlockTupleBinding.objectToEntry(storeBlock, 
blockDBE);
-                       chkDB.put(t,routingkeyDBE,blockDBE);
-                       try {
-                         chkStore.seek(byteOffset);
-                       } catch (IOException ioe) {
-                         if(byteOffset > (2*1024*1024*1024)) {
-                           Logger.error(this, "Environment does not support 
files bigger than 2 GB?");
-                           System.out.println("Environment does not support 
files bigger than 2 GB? (exception to follow)");
-                         }
-                         Logger.error(this, "Caught IOException on 
chkStore.seek("+byteOffset+")");
-                             throw ioe;
-                       }
-                       chkStore.write(header);
-                       chkStore.write(data);
-                       t.commit();
-                       chkBlocksInStore++;
-                       }else{
-                               // Overwrite an other block
-                               Cursor c = chkDB_accessTime.openCursor(t,null);
-                               DatabaseEntry keyDBE = new DatabaseEntry();
-                               DatabaseEntry dataDBE = new DatabaseEntry();
-                               c.getFirst(keyDBE,dataDBE,null);
-                               StoreBlock oldStoreBlock = (StoreBlock) 
storeBlockTupleBinding.entryToObject(dataDBE);
-                               c.delete();
-                               c.close();
-                               StoreBlock storeBlock = new 
StoreBlock(oldStoreBlock.getOffset());
-                               DatabaseEntry blockDBE = new DatabaseEntry();
-                               
storeBlockTupleBinding.objectToEntry(storeBlock, blockDBE);
-                               chkDB.put(t,routingkeyDBE,blockDBE);
-                       
chkStore.seek(storeBlock.getOffset()*(long)(dataBlockSize+headerBlockSize));
-                       chkStore.write(header);
-                       chkStore.write(data);
-                               t.commit();
-                       }
-               }
+               if(chkBlocksInStore<maxChkBlocks) {
+                       // Expand the store file
+                       long blockNum;
+                       synchronized(chkBlocksInStoreLock) {
+                               blockNum = chkBlocksInStore;
+                               chkBlocksInStore++;
+                       }
+                       long byteOffset = 
blockNum*(dataBlockSize+headerBlockSize);
+                       StoreBlock storeBlock = new StoreBlock(blockNum);
+                       DatabaseEntry blockDBE = new DatabaseEntry();
+                       storeBlockTupleBinding.objectToEntry(storeBlock, 
blockDBE);
+                       chkDB.put(t,routingkeyDBE,blockDBE);
+                       synchronized(chkStore) {
+                               try {
+                                       chkStore.seek(byteOffset);
+                               } catch (IOException ioe) {
+                                       if(byteOffset > (2*1024*1024*1024)) {
+                                               Logger.error(this, "Environment 
does not support files bigger than 2 GB?");
+                                               System.out.println("Environment 
does not support files bigger than 2 GB? (exception to follow)");
+                                       }
+                                       Logger.error(this, "Caught IOException 
on chkStore.seek("+byteOffset+")");
+                                       throw ioe;
+                               }
+                               chkStore.write(header);
+                               chkStore.write(data);
+                       }
+                       t.commit();
+                       t = null;
+               }else{
+                       // Overwrite an other block
+                       Cursor c = chkDB_accessTime.openCursor(t,null);
+                       DatabaseEntry keyDBE = new DatabaseEntry();
+                       DatabaseEntry dataDBE = new DatabaseEntry();
+                       c.getFirst(keyDBE,dataDBE,LockMode.RMW);
+                       StoreBlock oldStoreBlock = (StoreBlock) 
storeBlockTupleBinding.entryToObject(dataDBE);
+                       c.delete();
+                       c.close();
+                       // Deleted, so we can now reuse it.
+                       // Because we acquired a write lock, nobody else has 
taken it.
+                       StoreBlock storeBlock = new 
StoreBlock(oldStoreBlock.getOffset());
+                       DatabaseEntry blockDBE = new DatabaseEntry();
+                       storeBlockTupleBinding.objectToEntry(storeBlock, 
blockDBE);
+                       chkDB.put(t,routingkeyDBE,blockDBE);
+                       synchronized(chkStore) {
+                               
chkStore.seek(storeBlock.getOffset()*(long)(dataBlockSize+headerBlockSize));
+                               chkStore.write(header);
+                               chkStore.write(data);
+                       }
+                       t.commit();
+                       t = null;
+               }

                Logger.minor(this, "Put key: "+block.getKey());
                Logger.minor(this, "Headers: "+header.length+" bytes, hash 
"+Fields.hashCode(header));
@@ -680,11 +718,12 @@
                checkSecondaryDatabaseError(ex);
                Logger.error(this, "Caught "+ex, ex);
                ex.printStackTrace();
-               throw new IOException(ex.getMessage());
+               if(ex instanceof IOException) throw (IOException) ex;
+               else throw new IOException(ex.getMessage());
         }
     }

-    private void checkSecondaryDatabaseError(Throwable ex) {
+    private synchronized void checkSecondaryDatabaseError(Throwable ex) {
        if(ex instanceof DatabaseException && ex.getMessage().indexOf("missing 
key in the primary database") > -1) {
                try {
                                fixSecondaryFile.createNewFile();
@@ -702,7 +741,7 @@
     /**
      * Store a pubkey.
      */
-    public void put(byte[] hash, DSAPublicKey key) throws IOException {
+    public synchronized void put(byte[] hash, DSAPublicKey key) throws 
IOException {
                DSAPublicKey k = fetchPubKey(hash, key, true);
                if(k == null)
                        innerPut(hash, key);
@@ -711,8 +750,7 @@
        /**
      * Store a block.
      */
-    public void innerPut(byte[] hash, DSAPublicKey key) throws IOException
-    {          
+    private void innerPut(byte[] hash, DSAPublicKey key) throws IOException {  
        
        if(closed)
                return;

@@ -738,31 +776,44 @@
                synchronized(chkStore) {
                        if(chkBlocksInStore<maxChkBlocks) {
                                // Expand the store file
-                               long byteOffset = 
chkBlocksInStore*(dataBlockSize+headerBlockSize);
+                               long blockNum;
+                               synchronized(chkBlocksInStoreLock) {
+                                       blockNum = chkBlocksInStore;
+                                       chkBlocksInStore++;
+                               }
+                               
+                               long byteOffset = 
blockNum*(dataBlockSize+headerBlockSize);
                                StoreBlock storeBlock = new 
StoreBlock(chkBlocksInStore);
                                DatabaseEntry blockDBE = new DatabaseEntry();
                        storeBlockTupleBinding.objectToEntry(storeBlock, 
blockDBE);
-                       chkDB.put(t,routingkeyDBE,blockDBE);
-                       chkStore.seek(byteOffset);
-                       chkStore.write(data);
+                       chkDB.put(t,routingkeyDBE,blockDBE);
+                       synchronized(chkStore) {
+                               chkStore.seek(byteOffset);
+                               chkStore.write(data);
+                       }
                        t.commit();
-                       chkBlocksInStore++;
+                       t = null;
                        }else{
                                // Overwrite an other block
                                Cursor c = chkDB_accessTime.openCursor(t,null);
                                DatabaseEntry keyDBE = new DatabaseEntry();
                                DatabaseEntry dataDBE = new DatabaseEntry();
-                               c.getFirst(keyDBE,dataDBE,null);
+                               c.getFirst(keyDBE,dataDBE,LockMode.RMW);
                                StoreBlock oldStoreBlock = (StoreBlock) 
storeBlockTupleBinding.entryToObject(dataDBE);
                                c.delete();
                                c.close();
+                               // Deleted, so we can now reuse it.
+                               // Because we acquired a write lock, nobody 
else has taken it.
                                StoreBlock storeBlock = new 
StoreBlock(oldStoreBlock.getOffset());
                                DatabaseEntry blockDBE = new DatabaseEntry();
                                
storeBlockTupleBinding.objectToEntry(storeBlock, blockDBE);
                                chkDB.put(t,routingkeyDBE,blockDBE);
-                       
chkStore.seek(storeBlock.getOffset()*(long)(dataBlockSize+headerBlockSize));
-                       chkStore.write(data);
+                               synchronized(chkStore) {
+                                       
chkStore.seek(storeBlock.getOffset()*(long)(dataBlockSize+headerBlockSize));
+                                       chkStore.write(data);
+                               }
                                t.commit();
+                               t = null;
                        }
                }

@@ -776,22 +827,20 @@
                checkSecondaryDatabaseError(ex);
                Logger.error(this, "Caught "+ex, ex);
                ex.printStackTrace();
-               throw new IOException(ex.getMessage());
+               if(ex instanceof IOException) throw (IOException) ex;
+               else throw new IOException(ex.getMessage());
         }
     }

-    private class StoreBlock
-    {
+    private class StoreBlock {
        private long recentlyUsed;
        private long offset;

-       public StoreBlock(long offset)
-       {
+       public StoreBlock(long offset) {
                this(offset,getNewRecentlyUsed());
        }

-       public StoreBlock(long offset,long recentlyUsed)
-       {
+       public StoreBlock(long offset,long recentlyUsed) {
                this.offset = offset;
                this.recentlyUsed = recentlyUsed;
        }
@@ -801,13 +850,11 @@
                return recentlyUsed;
        }

-       public void setRecentlyUsedToZero()
-       {
+       public void setRecentlyUsedToZero() {
                recentlyUsed = 0;
        }

-       public void updateRecentlyUsed()
-       {
+       public void updateRecentlyUsed() {
                recentlyUsed = getNewRecentlyUsed();
        }

@@ -819,8 +866,7 @@
     /**
      * Convert StoreBlock's to the format used by the database
      */
-    private class StoreBlockTupleBinding extends TupleBinding
-    {
+    private class StoreBlockTupleBinding extends TupleBinding {

        public void objectToEntry(Object object, TupleOutput to)  {
                StoreBlock myData = (StoreBlock)object;
@@ -882,8 +928,7 @@

     }

-    private class ShutdownHook extends Thread
-    {
+    private class ShutdownHook extends Thread {
        public void run() {
                close();
        }
@@ -938,8 +983,7 @@
        return count;
     }

-    private long getMaxRecentlyUsed()
-    {
+    private long getMaxRecentlyUsed() {
        long maxRecentlyUsed = 0;

        try{
@@ -956,12 +1000,14 @@
        return maxRecentlyUsed;
     }

-    private synchronized long getNewRecentlyUsed() {
-       lastRecentlyUsed++;
-       return lastRecentlyUsed;
+    private long getNewRecentlyUsed() {
+       synchronized(lastRecentlyUsedSync) {
+               lastRecentlyUsed++;
+               return lastRecentlyUsed;
+       }
     }

        public void setMaxKeys(long maxStoreKeys) {
                maxChkBlocks = maxStoreKeys;
        }
-}
\ No newline at end of file
+}


Reply via email to