Author: j16sdiz
Date: 2008-04-26 03:57:34 +0000 (Sat, 26 Apr 2008)
New Revision: 19567
Modified:
trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
Log:
use FileChannel.read()/write() in BDBFS
Modified: trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
===================================================================
--- trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2008-04-25
23:18:06 UTC (rev 19566)
+++ trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2008-04-26
03:57:34 UTC (rev 19567)
@@ -4,6 +4,8 @@
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -79,6 +81,9 @@
private RandomAccessFile storeRAF;
private RandomAccessFile keysRAF;
private RandomAccessFile lruRAF;
+ private FileChannel storeFC;
+ private FileChannel keysFC;
+ private FileChannel lruFC;
private final SortedLongSet freeBlocks;
private final String name;
/** Callback which translates records to blocks and back, specifies the
size of blocks etc. */
@@ -252,17 +257,20 @@
if(!storeFile.createNewFile())
throw new IOException("Can't create a
new file " + storeFile + " !");
storeRAF = new RandomAccessFile(storeFile,"rw");
+ storeFC = storeRAF.getChannel();
if(!lruFile.exists())
if(!lruFile.createNewFile())
throw new IOException("Can't create a
new file " + lruFile + " !");
lruRAF = new RandomAccessFile(lruFile,"rw");
+ lruFC = lruRAF.getChannel();
if(keysFile != null) {
if(!keysFile.exists())
if(!keysFile.createNewFile())
throw new IOException("Can't
create a new file " + keysFile + " !");
keysRAF = new RandomAccessFile(keysFile,"rw");
+ keysFC = keysRAF.getChannel();
} else keysRAF = null;
if (wipe) {
@@ -569,7 +577,7 @@
WrapperManager.signalStarting((int)Math.min(Integer.MAX_VALUE,
(5*60*1000 + wantedMoveNums.length*1000L + alreadyDropped.size() * 100L))); //
1 per second
- byte[] buf = new byte[headerBlockSize + dataBlockSize];
+ ByteBuffer buf = ByteBuffer.allocate(headerBlockSize +
dataBlockSize);
long lruValue;
byte[] keyBuf = new byte[keyLength];
t = null;
@@ -622,18 +630,21 @@
boolean readLRU = false;
boolean readKey = false;
try {
- storeRAF.seek(entry * (headerBlockSize
+ dataBlockSize));
- storeRAF.readFully(buf);
+ buf.rewind();
+ do {
+ int byteRead =
storeFC.read(buf, entry * (headerBlockSize + dataBlockSize) + buf.position());
+ if (byteRead == -1)
+ throw new
EOFException();
+ } while (buf.hasRemaining());
+ buf.flip();
lruValue = 0;
if(lruRAF.length() > ((entry + 1) * 8))
{
readLRU = true;
- lruRAF.seek(entry * 8);
- lruValue = lruRAF.readLong();
+ lruValue = fcReadLRU(entry);
}
if(keysRAF != null && keysRAF.length()
> ((entry + 1) * keyLength)) {
readKey = true;
- keysRAF.seek(entry * keyLength);
- keysRAF.readFully(keyBuf);
+ fcReadKey(entry, keyBuf);
}
} catch (EOFException e) {
System.err.println("Was reading
"+wantedBlock+" to write to "+unwantedBlock);
@@ -642,15 +653,16 @@
throw e;
}
entry = unwantedBlock.longValue();
- storeRAF.seek(entry * (headerBlockSize +
dataBlockSize));
- storeRAF.write(buf);
+ do {
+ int byteWritten = storeFC.write(buf,
entry * (headerBlockSize + dataBlockSize) + buf.position());
+ if (byteWritten == -1)
+ throw new EOFException();
+ } while (buf.hasRemaining());
if(readLRU) {
- lruRAF.seek(entry * 8);
- lruRAF.writeLong(lruValue);
+ fcWriteLRU(entry, lruValue);
}
if(readKey) {
- keysRAF.seek(entry * keyLength);
- keysRAF.write(keyBuf);
+ fcWriteKey(entry, keyBuf);
}
// Update the database w.r.t. the old block.
@@ -1158,11 +1170,7 @@
byte[] header = new byte[headerBlockSize];
byte[] data = new byte[dataBlockSize];
try {
- synchronized(storeRAF) {
-
storeRAF.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
- storeRAF.readFully(header);
- storeRAF.readFully(data);
- }
+ fcReadStore(storeBlock.offset, header,
data);
} catch (EOFException e) {
Logger.error(this, "No block");
c.close();
@@ -1201,16 +1209,15 @@
keysDB.put(t,routingkeyDBE,blockDBE);
if(fullKey == null)
fullKey =
block.getFullKey();
- synchronized(storeRAF) {
+
if(keysRAF != null) {
-
keysRAF.seek(storeBlock.offset * keyLength);
-
keysRAF.write(fullKey);
+
fcWriteKey(storeBlock.offset, fullKey);
if(logDEBUG)
Logger.debug(this, "Written full key length "+fullKey.length+" to block
"+storeBlock.offset+" at "+(storeBlock.offset * keyLength)+" for "+callback);
} else if(logDEBUG) {
Logger.debug(this, "Not writing full key length "+fullKey.length+" for block
"+storeBlock.offset+" for "+callback);
}
- }
+
} catch (DatabaseException e) {
Logger.error(this, "Caught
database exception "+e+" while replacing element");
addFreeBlock(storeBlock.offset,
true, "Bogus key");
@@ -1237,10 +1244,7 @@
c = null;
t.commit();
t = null;
- synchronized(storeRAF) {
- lruRAF.seek(storeBlock.offset *
8);
-
lruRAF.writeLong(storeBlock.recentlyUsed);
- }
+ fcWriteLRU(storeBlock.offset,
storeBlock.recentlyUsed);
} else {
c.close();
c = null;
@@ -1258,16 +1262,14 @@
synchronized(this) {
misses++;
}
- synchronized(storeRAF) {
+
// Clear the key in the keys file.
byte[] buf = new byte[keyLength];
for(int i=0;i<buf.length;i++) buf[i] =
0; // FIXME unnecessary?
if(keysRAF != null) {
- keysRAF.seek(storeBlock.offset
* keyLength);
- keysRAF.write(buf);
+ fcWriteKey(storeBlock.offset,
buf);
}
- }
-
+
keysDB.delete(t, routingkeyDBE);
// Insert the block into the index with a
random key, so that it's part of the LRU.
@@ -1394,14 +1396,9 @@
StoreBlock storeBlock = (StoreBlock)
storeBlockTupleBinding.entryToObject(blockDBE);
- synchronized(storeRAF) {
-
storeRAF.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
- storeRAF.write(header);
- storeRAF.write(data);
- if(keysRAF != null) {
- keysRAF.seek(storeBlock.offset *
keyLength);
- keysRAF.write(fullKey);
- }
+ fcWriteStore(storeBlock.offset, header, data);
+ if (keysRAF != null) {
+ fcWriteKey(storeBlock.offset, fullKey);
}
// Unlock record.
@@ -1522,22 +1519,17 @@
DatabaseEntry blockDBE = new DatabaseEntry();
storeBlockTupleBinding.objectToEntry(storeBlock, blockDBE);
keysDB.put(t,routingkeyDBE,blockDBE);
- synchronized(storeRAF) {
-
storeRAF.seek(storeBlock.getOffset()*(long)(dataBlockSize+headerBlockSize));
- storeRAF.write(header);
- storeRAF.write(data);
- lruRAF.seek(storeBlock.getOffset() * 8);
- lruRAF.writeLong(storeBlock.recentlyUsed);
- if(keysRAF != null) {
- keysRAF.seek(storeBlock.getOffset() *
keyLength);
- keysRAF.write(fullKey);
- }
+
+ fcWriteStore(storeBlock.getOffset(), header, data);
+ fcWriteLRU( storeBlock.getOffset(),storeBlock.recentlyUsed);
+ if (keysRAF != null)
+ fcWriteKey(storeBlock.getOffset(), fullKey);
+ synchronized (this) {
writes++;
}
}
private boolean writeNewBlock(long blockNum, byte[] header, byte[]
data, Transaction t, DatabaseEntry routingkeyDBE, byte[] fullKey) throws
DatabaseException, IOException {
- long byteOffset = blockNum*(dataBlockSize+headerBlockSize);
StoreBlock storeBlock = new StoreBlock(this, blockNum);
long lruValue = storeBlock.recentlyUsed;
DatabaseEntry blockDBE = new DatabaseEntry();
@@ -1562,31 +1554,20 @@
throw e;
}
}
- synchronized(storeRAF) {
- try {
- storeRAF.seek(byteOffset);
- } catch (IOException ioe) {
- if(byteOffset > (2l*1024*1024*1024)) {
- Logger.error(this, "Environment does
not support files bigger than 2 GB?");
- System.out.println("Environment does
not support files bigger than 2 GB? (exception to follow)");
- }
- Logger.error(this, "Caught IOException on
storeRAF.seek("+byteOffset+ ')');
- throw ioe;
- }
- storeRAF.write(header);
- storeRAF.write(data);
- lruRAF.seek(blockNum * 8);
- lruRAF.writeLong(lruValue);
+
+ fcWriteStore(blockNum, header, data);
+ fcWriteLRU(blockNum, lruValue);
if(keysRAF != null) {
- keysRAF.seek(blockNum * keyLength);
- keysRAF.write(fullKey);
+ fcWriteKey(blockNum, fullKey);
if(logDEBUG)
Logger.debug(this, "Written full key
length "+fullKey.length+" to block "+blockNum+" at "+(blockNum * keyLength)+"
for "+callback);
} else if(logDEBUG) {
Logger.debug(this, "Not writing full key length
"+fullKey.length+" for block "+blockNum+" for "+callback);
}
+ synchronized (this) {
writes++;
}
+
return true;
}
@@ -1802,7 +1783,7 @@
private static void flushAndCloseRAF(RandomAccessFile file) {
try {
if (file != null)
- file.getFD().sync();
+ file.getChannel().force(true);
} catch (IOException e) {
// ignore
}
@@ -2144,6 +2125,67 @@
return db;
}
+ private void fcWriteLRU(long entry, long data) throws IOException {
+ ByteBuffer bf = ByteBuffer.allocateDirect(8);
+ bf.putLong(data);
+ bf.flip();
+ do {
+ int byteWritten = lruFC.write(bf, entry * 8 +
bf.position());
+ if (byteWritten == -1)
+ throw new EOFException();
+ } while (bf.hasRemaining());
+ }
+ private long fcReadLRU(long entry) throws IOException {
+ ByteBuffer bf = ByteBuffer.allocateDirect(8);
+ do {
+ int byteRead = lruFC.read(bf, entry * 8 +
bf.position());
+ if (byteRead == -1)
+ throw new EOFException();
+ } while (bf.hasRemaining());
+ bf.flip();
+ return bf.getLong();
+ }
+ private void fcReadKey(long entry, byte[] data) throws IOException {
+ ByteBuffer bf = ByteBuffer.wrap(data);
+ do {
+ int byteRead = keysFC.read(bf, entry * keyLength +
bf.position());
+ if (byteRead == -1)
+ throw new EOFException();
+ } while (bf.hasRemaining());
+ }
+ private void fcWriteKey(long entry, byte[] data) throws IOException {
+ ByteBuffer bf = ByteBuffer.wrap(data);
+ do {
+ int byteWritten = keysFC.write(bf, entry * keyLength +
bf.position());
+ if (byteWritten == -1)
+ throw new EOFException();
+ } while (bf.hasRemaining());
+ }
+ private void fcWriteStore(long entry, byte[] header, byte[] data)
throws IOException {
+ ByteBuffer bf = ByteBuffer.allocateDirect(headerBlockSize +
dataBlockSize);
+ bf.put(header);
+ bf.put(data);
+ bf.flip();
+ do {
+ int byteWritten = storeFC.write(bf, (headerBlockSize +
dataBlockSize) * entry + bf.position());
+ if (byteWritten == -1)
+ throw new EOFException();
+ } while (bf.hasRemaining());
+ }
+ private void fcReadStore(long entry,byte[] header, byte[] data ) throws
IOException {
+ ByteBuffer bf = ByteBuffer.allocateDirect(headerBlockSize +
dataBlockSize);
+
+ do {
+ int dataRead = storeFC.read(bf, (headerBlockSize +
dataBlockSize) * entry);
+ if (dataRead == -1)
+ throw new EOFException();
+ } while (bf.hasRemaining());
+
+ bf.flip();
+ bf.get(header);
+ bf.get(data);
+ }
+
public void handleOOM() throws Exception {
if (storeRAF != null)
storeRAF.getFD().sync();