Author: tubbie
Date: 2005-12-15 09:31:34 +0000 (Thu, 15 Dec 2005)
New Revision: 7710
Modified:
branches/bdb/src/freenet/store/BerkelyDBFreenetStore.java
Log:
The data and headers are now stored in a seperate file
Modified: branches/bdb/src/freenet/store/BerkelyDBFreenetStore.java
===================================================================
--- branches/bdb/src/freenet/store/BerkelyDBFreenetStore.java 2005-12-12
22:27:27 UTC (rev 7709)
+++ branches/bdb/src/freenet/store/BerkelyDBFreenetStore.java 2005-12-15
09:31:34 UTC (rev 7710)
@@ -3,6 +3,7 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.RandomAccessFile;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
@@ -35,27 +36,31 @@
* @author tubbie
*
* TODO: Fix ugly Exception handling
+ * TODO: Don't use timestamps
*/
public class BerkelyDBFreenetStore implements FreenetStore {
+ static final int CHK_DATA_BLOCK_SIZE = 32*1024;
+ static final int CHK_HEADER_BLOCK_SIZE = 36;
+
private final Environment environment;
private final TupleBinding storeBlockTupleBinding;
private final TupleBinding longTupleBinding;
+
+ private int chkBlocksInStore;
+ private final int maxChkBlocks;
private final Database chkDB;
private final Database chkDB_accessTime;
+ private final RandomAccessFile chkStore;
private boolean closed = false;
- private final StoreDeleter storeDeleter;
-
- private final int maxBlocks;
-
/**
* Initializes database
* @param the directory where the store is located
* @throws FileNotFoundException if the dir does not exist and could not
be created
*/
- public BerkelyDBFreenetStore(String storeDir,int maxBlocks) throws
Exception
+ public BerkelyDBFreenetStore(String storeDir,int maxChkBlocks) throws
Exception
{
// Percentage of the database that must contain usefull data
// decrease to increase performance, increase to save disk space
@@ -67,7 +72,7 @@
// Percentage of the maximum heap size used as a cache
System.setProperty("je.maxMemoryPercent","30");
- this.maxBlocks=maxBlocks;
+ this.maxChkBlocks=maxChkBlocks;
// Initialize environment
EnvironmentConfig envConfig = new EnvironmentConfig();
@@ -77,8 +82,11 @@
File dir = new File(storeDir);
if(!dir.exists())
dir.mkdir();
+ File dbDir = new File(dir,"database");
+ if(!dbDir.exists())
+ dbDir.mkdir();
- environment = new Environment(new File(storeDir), envConfig);
+ environment = new Environment(dbDir, envConfig);
// Initialize CHK database
DatabaseConfig dbConfig = new DatabaseConfig();
@@ -99,12 +107,16 @@
chkDB_accessTime = environment.openSecondaryDatabase
(null,
"CHK_accessTime", chkDB, secDbConfig);
- // Initialize thread that deletes items when the store gets full
- storeDeleter = new StoreDeleter();
- (new Thread(storeDeleter,"BDBStore: StoreDeleter")).start();
+ // Initialize the store file
+ File storeFile = new File(dir,"store");
+ if(!storeFile.exists())
+ storeFile.createNewFile();
+ chkStore = new RandomAccessFile(storeFile,"rw");
// Add shutdownhook
Runtime.getRuntime().addShutdownHook(new ShutdownHook());
+
+ chkBlocksInStore = countCHKBlocks();
}
/**
@@ -127,6 +139,8 @@
if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.DEFAULT)
!=OperationStatus.SUCCESS) {
+ c.close();
+ t.abort();
return null;
}
@@ -134,8 +148,17 @@
CHKBlock block = null;
try{
- block = new
CHKBlock(storeBlock.getData(),storeBlock.getHeader(),chk);
+ byte[] header = new byte[CHK_HEADER_BLOCK_SIZE];
+ byte[] data = new byte[CHK_DATA_BLOCK_SIZE];
+ synchronized(chkStore) {
+
chkStore.seek(storeBlock.offset*(long)(CHK_DATA_BLOCK_SIZE+CHK_HEADER_BLOCK_SIZE));
+ chkStore.read(header);
+ chkStore.read(data);
+ }
+
+ block = new CHKBlock(data,header,chk);
+
if(!dontPromote)
{
storeBlock.updateAccessTime();
@@ -144,24 +167,30 @@
c.putCurrent(updateDBE);
c.close();
t.commit();
+ }else{
+ c.close();
+ t.abort();
}
Logger.minor(this, "Get key: "+chk);
- Logger.minor(this, "Headers:
"+storeBlock.getHeader().length+" bytes, hash
"+Fields.hashCode(storeBlock.getHeader()));
- Logger.minor(this, "Data: "+storeBlock.getData().length+"
bytes, hash "+Fields.hashCode(storeBlock.getData()));
+ Logger.minor(this, "Headers: "+header.length+" bytes, hash
"+header);
+ Logger.minor(this, "Data: "+data.length+" bytes, hash
"+data);
}catch(CHKVerifyException ex){
- Logger.normal(this, "Does not verify, deleting: "+chk);
- c.delete();
+ Logger.normal(this, "Does not verify, setting
accessTime to 0 for : "+chk);
+ storeBlock.setAccessTime(0);
+ DatabaseEntry updateDBE = new DatabaseEntry();
+ storeBlockTupleBinding.objectToEntry(storeBlock,
updateDBE);
+ c.putCurrent(updateDBE);
c.close();
- t.abort();
- storeDeleter.deletedItem();
+ t.commit();
return null;
}
return block;
- }catch(DatabaseException ex) { // FIXME: ugly
+ }catch(Exception ex) { // FIXME: ugly
if(c!=null)
try{c.close();}catch(DatabaseException ex2){}
+ ex.printStackTrace();
new IOException(ex.getMessage());
}
@@ -175,28 +204,63 @@
{
if(closed)
return;
-
+
byte[] routingkey = ((NodeCHK)block.getKey()).getRoutingKey();
byte[] data = block.getData();
byte[] header = block.getHeader();
- StoreBlock storeBlock = new StoreBlock(data,header);
-
+
+ if(data.length!=CHK_DATA_BLOCK_SIZE) {
+ Logger.minor(this, "This data is "+data.length+" bytes. Should
be "+CHK_DATA_BLOCK_SIZE);
+ return;
+ }
+ if(header.length!=CHK_HEADER_BLOCK_SIZE) {
+ Logger.minor(this, "This header is "+data.length+" bytes.
Should be "+CHK_HEADER_BLOCK_SIZE);
+ return;
+ }
+
Transaction t = null;
+
try{
- DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
-
- DatabaseEntry blockDBE = new DatabaseEntry();
- storeBlockTupleBinding.objectToEntry(storeBlock, blockDBE);
-
- t = environment.beginTransaction(null,null);
+ t = environment.beginTransaction(null,null);
+ DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
+
+ synchronized(chkStore) {
+ if(chkBlocksInStore<maxChkBlocks) {
+ // Expand the store file
+ int byteOffset =
chkBlocksInStore*(CHK_DATA_BLOCK_SIZE+CHK_HEADER_BLOCK_SIZE);
+ StoreBlock storeBlock = new
StoreBlock(chkBlocksInStore);
+ DatabaseEntry blockDBE = new DatabaseEntry();
+ storeBlockTupleBinding.objectToEntry(storeBlock,
blockDBE);
+ chkDB.put(t,routingkeyDBE,blockDBE);
+ chkStore.seek(byteOffset);
+ chkStore.write(header);
+ chkStore.write(data);
+ t.commit();
+ chkBlocksInStore++;
+ }else{
+ // Overwrite an other block
+ Cursor c = chkDB_accessTime.openCursor(t,null);
+ DatabaseEntry keyDBE = new DatabaseEntry();
+ DatabaseEntry dataDBE = new DatabaseEntry();
+ c.getFirst(keyDBE,dataDBE,null);
+ StoreBlock oldStoreBlock = (StoreBlock)
storeBlockTupleBinding.entryToObject(dataDBE);
+ c.delete();
+ c.close();
+ StoreBlock storeBlock = new
StoreBlock(oldStoreBlock.getOffset());
+ DatabaseEntry blockDBE = new DatabaseEntry();
+
storeBlockTupleBinding.objectToEntry(storeBlock, blockDBE);
+ chkDB.put(t,routingkeyDBE,blockDBE);
+
chkStore.seek(storeBlock.getOffset()*(long)(CHK_DATA_BLOCK_SIZE+CHK_HEADER_BLOCK_SIZE));
+ chkStore.write(header);
+ chkStore.write(data);
+ t.commit();
+ }
+ }
+
Logger.minor(this, "Put key: "+block.getKey());
Logger.minor(this, "Headers: "+header.length+" bytes, hash
"+Fields.hashCode(header));
Logger.minor(this, "Data: "+data.length+" bytes, hash
"+Fields.hashCode(data));
- chkDB.put(t,routingkeyDBE,blockDBE);
- t.commit();
-
- storeDeleter.addedItem();
-
+
}catch(Exception ex) { // FIXME: ugly
if(t!=null){
try{t.abort();}catch(DatabaseException ex2){};
@@ -208,31 +272,36 @@
private class StoreBlock
{
- private final byte[] data, header;
private long lastAccessed;
- public StoreBlock(byte[] data,byte[] header)
+ private int offset;
+
+ public StoreBlock(int offset)
{
- this(data,header,System.currentTimeMillis());
+ this(offset,System.currentTimeMillis());
}
- public StoreBlock(byte[] data,byte[] header,long lastAccessed)
+ public StoreBlock(int offset,long lastAccessed)
{
- this.data=data;
- this.header=header;
+ this.offset = offset;
this.lastAccessed = lastAccessed;
}
-
- public byte[] getData() {return data;}
- public byte[] getHeader() {return header;}
-
+
public void updateAccessTime() {
lastAccessed = System.currentTimeMillis();
}
- public long getLastAccessed()
- {
+ public long getLastAccessed() {
return lastAccessed;
}
+
+ public void setAccessTime(long time)
+ {
+ lastAccessed = time;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
}
/**
@@ -244,41 +313,17 @@
public void objectToEntry(Object object, TupleOutput to) {
StoreBlock myData = (StoreBlock)object;
- writeByteArray(myData.getHeader(),to);
- writeByteArray(myData.getData(),to);
+ to.writeInt(myData.getOffset());
to.writeLong(myData.getLastAccessed());
-
}
public Object entryToObject(TupleInput ti) {
- byte[] header = readByteArray(ti);
- byte[] data = readByteArray(ti);
+ int offset = ti.readInt();
long lastAccessed = ti.readLong();
- StoreBlock storeBlock = new
StoreBlock(data,header,lastAccessed);
+ StoreBlock storeBlock = new StoreBlock(offset,lastAccessed);
return storeBlock;
}
-
- private byte[] readByteArray(TupleInput ti) {
- try{
- char size = ti.readChar();
- byte[] data = new byte[size];
- char read = 0;
- while(read<size)
- read +=ti.read(data,read,size-read);
-
- return data;
- }catch(IOException ex){}
- return null;
- }
-
- private void writeByteArray(byte[] data,TupleOutput to)
- {
- try {
- to.writeChar(data.length);
- to.write(data);
- }catch(IOException ex){}
- }
}
/**
@@ -303,85 +348,6 @@
}
}
- /**
- * Cleans items from the store when it's full
- */
- private class StoreDeleter implements Runnable
- {
- private boolean running=true;
- private boolean stopped=false;
- private int currentItems;
-
- public StoreDeleter() {
- currentItems = countCHKBlocks();
- }
-
-
-
- public synchronized void addedItem() {
- currentItems++;
- this.notify();
- }
-
- public synchronized void deletedItem() {
- currentItems--;
- }
-
- public void run() {
- while(running){
- synchronized(this) {
- while(currentItems < maxBlocks && running)
- try{wait();}catch(InterruptedException
ex){}
- }
- while(currentItems>maxBlocks){
- removeOldestItem();
- }
- }
-
- stopped=true;
- synchronized(this){
- this.notify();
- }
- }
-
- private void removeOldestItem() {
- Logger.normal(this, "Deleting oldest item in store");
- Cursor c = null;
- try{
- Transaction t = environment.beginTransaction(null,null);
- c = chkDB_accessTime.openCursor(t,null);
-
- DatabaseEntry keyDBE = new DatabaseEntry();
- DatabaseEntry dataDBE = new DatabaseEntry();
-
-
if(c.getFirst(keyDBE,dataDBE,null).equals(OperationStatus.SUCCESS))
- {
- c.delete();
- c.close();
- t.commit();
- deletedItem();
- }else{
- t.abort();
- }
-
-
- }catch(Exception ex) {
- ex.printStackTrace();
- if(c!=null)
- try{c.close();}catch(DatabaseException ex2){}
- }
- }
-
- public synchronized void close() {
- running = false;
- this.notify();
-
- // Block till we actually closed
- while(!stopped)
- try{wait();}catch(InterruptedException ex){}
- }
- }
-
private class ShutdownHook extends Thread
{
public void run() {
@@ -396,10 +362,10 @@
// This is nothing too problematic however
since the worst thing that should
// happen is that we miss the last few
store()'s and get an exception.
Logger.minor(this, "Closing database.");
- storeDeleter.close();
closed=true;
// Give all threads some time to complete
- Thread.sleep(3000);
+ Thread.sleep(5000);
+ chkStore.close();
chkDB_accessTime.close();
chkDB.close();
environment.close();