Author: j16sdiz
Date: 2008-04-26 03:57:34 +0000 (Sat, 26 Apr 2008)
New Revision: 19567

Modified:
   trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
Log:
use FileChannel.read()/write() in BDBFS


Modified: trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
===================================================================
--- trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2008-04-25 
23:18:06 UTC (rev 19566)
+++ trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2008-04-26 
03:57:34 UTC (rev 19567)
@@ -4,6 +4,8 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -79,6 +81,9 @@
        private RandomAccessFile storeRAF;
        private RandomAccessFile keysRAF;
        private RandomAccessFile lruRAF;
+       private FileChannel storeFC;
+       private FileChannel keysFC;
+       private FileChannel lruFC;
        private final SortedLongSet freeBlocks;
        private final String name;
        /** Callback which translates records to blocks and back, specifies the 
size of blocks etc. */
@@ -252,17 +257,20 @@
                                if(!storeFile.createNewFile())
                                        throw new IOException("Can't create a 
new file " + storeFile + " !");
                        storeRAF = new RandomAccessFile(storeFile,"rw");
+                       storeFC = storeRAF.getChannel();

                        if(!lruFile.exists()) 
                                if(!lruFile.createNewFile())
                                        throw new IOException("Can't create a 
new file " + lruFile + " !");
                        lruRAF = new RandomAccessFile(lruFile,"rw");
+                       lruFC = lruRAF.getChannel();

                        if(keysFile != null) {
                                if(!keysFile.exists())
                                        if(!keysFile.createNewFile())
                                                throw new IOException("Can't 
create a new file " + keysFile + " !");
                                keysRAF = new RandomAccessFile(keysFile,"rw");
+                               keysFC = keysRAF.getChannel();
                        } else keysRAF = null;

                        if (wipe) {
@@ -569,7 +577,7 @@

                WrapperManager.signalStarting((int)Math.min(Integer.MAX_VALUE, 
(5*60*1000 + wantedMoveNums.length*1000L + alreadyDropped.size() * 100L))); // 
1 per second

-               byte[] buf = new byte[headerBlockSize + dataBlockSize];
+               ByteBuffer buf = ByteBuffer.allocate(headerBlockSize + 
dataBlockSize);
                long lruValue;
                byte[] keyBuf = new byte[keyLength];
                t = null;
@@ -622,18 +630,21 @@
                                boolean readLRU = false;
                                boolean readKey = false;
                                try {
-                                       storeRAF.seek(entry * (headerBlockSize 
+ dataBlockSize));
-                                       storeRAF.readFully(buf);
+                                       buf.rewind();
+                                       do {
+                                               int byteRead = 
storeFC.read(buf, entry * (headerBlockSize + dataBlockSize) + buf.position());
+                                               if (byteRead == -1)
+                                                       throw new 
EOFException();
+                                       } while (buf.hasRemaining());
+                                       buf.flip();
                                        lruValue = 0;
                                        if(lruRAF.length() > ((entry + 1) * 8)) 
{
                                                readLRU = true;
-                                               lruRAF.seek(entry * 8);
-                                               lruValue = lruRAF.readLong();
+                                               lruValue = fcReadLRU(entry);
                                        }
                                        if(keysRAF != null && keysRAF.length() 
> ((entry + 1) * keyLength)) {
                                                readKey = true;
-                                               keysRAF.seek(entry * keyLength);
-                                               keysRAF.readFully(keyBuf);
+                                               fcReadKey(entry, keyBuf);
                                        }
                                } catch (EOFException e) {
                                        System.err.println("Was reading 
"+wantedBlock+" to write to "+unwantedBlock);
@@ -642,15 +653,16 @@
                                        throw e;
                                }
                                entry = unwantedBlock.longValue();
-                               storeRAF.seek(entry * (headerBlockSize + 
dataBlockSize));
-                               storeRAF.write(buf);
+                               do {
+                                       int byteWritten = storeFC.write(buf, 
entry * (headerBlockSize + dataBlockSize) + buf.position());
+                                       if (byteWritten == -1)
+                                               throw new EOFException();
+                               } while (buf.hasRemaining());
                                if(readLRU) {
-                                       lruRAF.seek(entry * 8);
-                                       lruRAF.writeLong(lruValue);
+                                       fcWriteLRU(entry, lruValue);
                                }
                                if(readKey) {
-                                       keysRAF.seek(entry * keyLength);
-                                       keysRAF.write(keyBuf);
+                                       fcWriteKey(entry, keyBuf);
                                }

                                // Update the database w.r.t. the old block.
@@ -1158,11 +1170,7 @@
                                byte[] header = new byte[headerBlockSize];
                                byte[] data = new byte[dataBlockSize];
                                try {
-                                       synchronized(storeRAF) {
-                                               
storeRAF.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
-                                               storeRAF.readFully(header);
-                                               storeRAF.readFully(data);
-                                       }
+                                       fcReadStore(storeBlock.offset, header, 
data);
                                } catch (EOFException e) {
                                        Logger.error(this, "No block");
                                        c.close();
@@ -1201,16 +1209,15 @@
                                                
keysDB.put(t,routingkeyDBE,blockDBE);
                                                if(fullKey == null)
                                                        fullKey = 
block.getFullKey();
-                                               synchronized(storeRAF) {
+                                               
                                                        if(keysRAF != null) {
-                                                               
keysRAF.seek(storeBlock.offset * keyLength);
-                                                               
keysRAF.write(fullKey);
+                                                               
fcWriteKey(storeBlock.offset, fullKey);
                                                                if(logDEBUG)
                                                                        
Logger.debug(this, "Written full key length "+fullKey.length+" to block 
"+storeBlock.offset+" at "+(storeBlock.offset * keyLength)+" for "+callback);
                                                        } else if(logDEBUG) {
                                                                
Logger.debug(this, "Not writing full key length "+fullKey.length+" for block 
"+storeBlock.offset+" for "+callback);
                                                        }
-                                               }
+                                               
                                        } catch (DatabaseException e) {
                                                Logger.error(this, "Caught 
database exception "+e+" while replacing element");
                                                addFreeBlock(storeBlock.offset, 
true, "Bogus key");
@@ -1237,10 +1244,7 @@
                                        c = null;
                                        t.commit();
                                        t = null;
-                                       synchronized(storeRAF) {
-                                               lruRAF.seek(storeBlock.offset * 
8);
-                                               
lruRAF.writeLong(storeBlock.recentlyUsed);
-                                       }
+                                       fcWriteLRU(storeBlock.offset, 
storeBlock.recentlyUsed);
                                } else {
                                        c.close();
                                        c = null;
@@ -1258,16 +1262,14 @@
                                synchronized(this) {
                                        misses++;
                                }
-                               synchronized(storeRAF) {
+                               
                                        // Clear the key in the keys file.
                                        byte[] buf = new byte[keyLength];
                                        for(int i=0;i<buf.length;i++) buf[i] = 
0; // FIXME unnecessary?
                                        if(keysRAF != null) {
-                                               keysRAF.seek(storeBlock.offset 
* keyLength);
-                                               keysRAF.write(buf);
+                                               fcWriteKey(storeBlock.offset, 
buf);
                                        }
-                               }
-
+                               
                                keysDB.delete(t, routingkeyDBE);

                                // Insert the block into the index with a 
random key, so that it's part of the LRU.
@@ -1394,14 +1396,9 @@

                        StoreBlock storeBlock = (StoreBlock) 
storeBlockTupleBinding.entryToObject(blockDBE);

-                       synchronized(storeRAF) {
-                               
storeRAF.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
-                               storeRAF.write(header);
-                               storeRAF.write(data);
-                               if(keysRAF != null) {
-                                       keysRAF.seek(storeBlock.offset * 
keyLength);
-                                       keysRAF.write(fullKey);
-                               }
+                       fcWriteStore(storeBlock.offset, header, data);
+                       if (keysRAF != null) {
+                               fcWriteKey(storeBlock.offset, fullKey);
                        }

                        // Unlock record.
@@ -1522,22 +1519,17 @@
                DatabaseEntry blockDBE = new DatabaseEntry();
                storeBlockTupleBinding.objectToEntry(storeBlock, blockDBE);
                keysDB.put(t,routingkeyDBE,blockDBE);
-               synchronized(storeRAF) {
-                       
storeRAF.seek(storeBlock.getOffset()*(long)(dataBlockSize+headerBlockSize));
-                       storeRAF.write(header);
-                       storeRAF.write(data);
-                       lruRAF.seek(storeBlock.getOffset() * 8);
-                       lruRAF.writeLong(storeBlock.recentlyUsed);
-                       if(keysRAF != null) {
-                               keysRAF.seek(storeBlock.getOffset() * 
keyLength);
-                               keysRAF.write(fullKey);
-                       }
+               
+               fcWriteStore(storeBlock.getOffset(), header, data);
+               fcWriteLRU( storeBlock.getOffset(),storeBlock.recentlyUsed);
+               if (keysRAF != null)
+                       fcWriteKey(storeBlock.getOffset(), fullKey);
+               synchronized (this) {
                        writes++;
                }
        }

        private boolean writeNewBlock(long blockNum, byte[] header, byte[] 
data, Transaction t, DatabaseEntry routingkeyDBE, byte[] fullKey) throws 
DatabaseException, IOException {
-               long byteOffset = blockNum*(dataBlockSize+headerBlockSize);
                StoreBlock storeBlock = new StoreBlock(this, blockNum);
                long lruValue = storeBlock.recentlyUsed;
                DatabaseEntry blockDBE = new DatabaseEntry();
@@ -1562,31 +1554,20 @@
                                throw e;
                        }
                }
-               synchronized(storeRAF) {
-                       try {
-                               storeRAF.seek(byteOffset);
-                       } catch (IOException ioe) {
-                               if(byteOffset > (2l*1024*1024*1024)) {
-                                       Logger.error(this, "Environment does 
not support files bigger than 2 GB?");
-                                       System.out.println("Environment does 
not support files bigger than 2 GB? (exception to follow)");
-                               }
-                               Logger.error(this, "Caught IOException on 
storeRAF.seek("+byteOffset+ ')');
-                               throw ioe;
-                       }
-                       storeRAF.write(header);
-                       storeRAF.write(data);
-                       lruRAF.seek(blockNum * 8);
-                       lruRAF.writeLong(lruValue);
+               
+                       fcWriteStore(blockNum, header, data);
+                       fcWriteLRU(blockNum, lruValue);
                        if(keysRAF != null) {
-                               keysRAF.seek(blockNum * keyLength);
-                               keysRAF.write(fullKey);
+                               fcWriteKey(blockNum, fullKey);
                                if(logDEBUG)
                                        Logger.debug(this, "Written full key 
length "+fullKey.length+" to block "+blockNum+" at "+(blockNum * keyLength)+" 
for "+callback);
                        } else if(logDEBUG) {
                                Logger.debug(this, "Not writing full key length 
"+fullKey.length+" for block "+blockNum+" for "+callback);
                        }
+                       synchronized (this) {
                        writes++;
                }
+               
                return true;
        }

@@ -1802,7 +1783,7 @@
        private static void flushAndCloseRAF(RandomAccessFile file) {
                try {
                        if (file != null)
-                               file.getFD().sync();
+                               file.getChannel().force(true);
                } catch (IOException e) {
                        // ignore
                }
@@ -2144,6 +2125,67 @@
                return db;
        }

+       private void fcWriteLRU(long entry, long data) throws IOException {
+               ByteBuffer bf = ByteBuffer.allocateDirect(8);
+               bf.putLong(data);
+               bf.flip();
+               do {
+                       int byteWritten = lruFC.write(bf, entry * 8 + 
bf.position());
+                       if (byteWritten == -1)
+                               throw new EOFException();
+               } while (bf.hasRemaining());
+       }
+       private long fcReadLRU(long entry) throws IOException {
+               ByteBuffer bf = ByteBuffer.allocateDirect(8);
+               do {
+                       int byteRead = lruFC.read(bf, entry * 8 + 
bf.position());
+                       if (byteRead == -1)
+                               throw new EOFException();
+               } while (bf.hasRemaining());
+               bf.flip();
+               return bf.getLong();
+       }
+       private void fcReadKey(long entry, byte[] data) throws IOException {    
+               ByteBuffer bf = ByteBuffer.wrap(data);
+               do {
+                       int byteRead = keysFC.read(bf, entry * keyLength + 
bf.position());
+                       if (byteRead == -1)
+                               throw new EOFException();
+               } while (bf.hasRemaining());
+       }
+       private void fcWriteKey(long entry, byte[] data) throws IOException {
+               ByteBuffer bf = ByteBuffer.wrap(data);
+               do {
+                       int byteWritten = keysFC.write(bf, entry * keyLength + 
bf.position());
+                       if (byteWritten == -1)
+                               throw new EOFException();
+               } while (bf.hasRemaining());
+       }
+       private void fcWriteStore(long entry, byte[] header, byte[] data) 
throws IOException {
+               ByteBuffer bf = ByteBuffer.allocateDirect(headerBlockSize + 
dataBlockSize);
+               bf.put(header);
+               bf.put(data);
+               bf.flip();
+               do {
+                       int byteWritten = storeFC.write(bf, (headerBlockSize + 
dataBlockSize) * entry + bf.position());
+                       if (byteWritten == -1)
+                               throw new EOFException();
+               } while (bf.hasRemaining());
+       }
+       private void fcReadStore(long entry,byte[] header, byte[] data ) throws 
IOException {
+               ByteBuffer bf = ByteBuffer.allocateDirect(headerBlockSize + 
dataBlockSize);
+               
+               do {
+                       int dataRead = storeFC.read(bf, (headerBlockSize + 
dataBlockSize) * entry);
+                       if (dataRead == -1)
+                               throw new EOFException();
+               } while (bf.hasRemaining());
+               
+               bf.flip();
+               bf.get(header);
+               bf.get(data);
+       }
+       
     public void handleOOM() throws Exception {
                if (storeRAF != null)
                        storeRAF.getFD().sync();


Reply via email to