Author: elecharny
Date: Fri Aug 17 15:56:44 2012
New Revision: 1374333
URL: http://svn.apache.org/viewvc?rev=1374333&view=rev
Log:
o Added the suffixes
o The journal thread now stops correctly when the queue has been fully flushed
o Dealing with various case of Journal and File name initialization
o The BTree now uses a path and a filename
o Increased the buffer used to write data on disk to speed up the flush
o Empty the journal when flushing the data instead of deleting it
Modified:
labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java
Modified:
labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java
URL:
http://svn.apache.org/viewvc/labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java?rev=1374333&r1=1374332&r2=1374333&view=diff
==============================================================================
---
labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java
(original)
+++
labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java
Fri Aug 17 15:56:44 2012
@@ -58,6 +58,12 @@ public class BTree<K, V>
/** The default journal name */
public static final String DEFAULT_JOURNAL = "mavibot.log";
+ /** The default data file suffix */
+ public static final String DATA_SUFFIX = ".data";
+
+ /** The default journal file suffix */
+ public static final String JOURNAL_SUFFIX = ".log";
+
/** A field used to generate new revisions in a thread safe way */
private AtomicLong revision;
@@ -90,6 +96,9 @@ public class BTree<K, V>
/** A flag set to true when the BTree is a in-memory BTree */
private boolean inMemory;
+ /** A flag used to tell the BTree that the journal is activated */
+ private boolean withJournal;
+
/** The associated journal. If null, this is an in-memory btree */
private File journal;
@@ -97,7 +106,7 @@ public class BTree<K, V>
private AtomicLong nbElems;
/** A lock used to protect the write operation against concurrent access */
- private final ReentrantLock writeLock = new ReentrantLock();
+ private ReentrantLock writeLock;
/** The thread responsible for the cleanup of timed out reads */
private Thread readTransactionsThread;
@@ -132,10 +141,14 @@ public class BTree<K, V>
while ( !Thread.currentThread().isInterrupted() )
{
long timeoutDate = System.currentTimeMillis() -
readTimeOut;
+ long t0 = System.currentTimeMillis();
+ int nbTxns = 0;
// Loop on all the transactions from the queue
while ( ( transaction = readTransactions.peek() ) !=
null )
{
+ nbTxns++;
+
if ( transaction.isClosed() )
{
// The transaction is already closed, remove
it from the queue
@@ -155,6 +168,13 @@ public class BTree<K, V>
break;
}
+ long t1 = System.currentTimeMillis();
+
+ if ( nbTxns > 0 )
+ {
+ System.out.println( "Processing old txn : " +
nbTxns + ", " + ( t1 - t0 ) + "ms" );
+ }
+
// Wait until we reach the timeout
Thread.sleep( readTimeOut );
}
@@ -186,52 +206,85 @@ public class BTree<K, V>
{
Runnable journalTask = new Runnable()
{
+ private boolean flushModification( FileChannel channel,
Modification<K, V> modification )
+ throws IOException
+ {
+ if ( modification instanceof Addition )
+ {
+ byte[] keyBuffer = serializer.serializeKey(
modification.getKey() );
+ ByteBuffer bb = ByteBuffer.allocateDirect(
keyBuffer.length + 1 );
+ bb.put( Modification.ADDITION );
+ bb.put( keyBuffer );
+ bb.flip();
+
+ channel.write( bb );
+
+ byte[] valueBuffer = serializer.serializeValue(
modification.getValue() );
+ bb = ByteBuffer.allocateDirect( valueBuffer.length );
+ bb.put( valueBuffer );
+ bb.flip();
+
+ channel.write( bb );
+ }
+ else if ( modification instanceof Deletion )
+ {
+ byte[] keyBuffer = serializer.serializeKey(
modification.getKey() );
+ ByteBuffer bb = ByteBuffer.allocateDirect(
keyBuffer.length + 1 );
+ bb.put( Modification.DELETION );
+ bb.put( keyBuffer );
+ bb.flip();
+
+ channel.write( bb );
+ }
+ else
+ // This is the poison pill, just exit
+ {
+ return false;
+ }
+
+ // Flush to the disk for real
+ channel.force( true );
+
+ return true;
+ }
+
+
public void run()
{
+ Modification<K, V> modification = null;
+ FileOutputStream stream;
+ FileChannel channel = null;
+
try
{
- FileOutputStream stream = new FileOutputStream( journal );
- FileChannel channel = stream.getChannel();
+ stream = new FileOutputStream( journal );
+ channel = stream.getChannel();
while ( !Thread.currentThread().isInterrupted() )
{
- Modification<K, V> modification =
modificationsQueue.take();
+ modification = modificationsQueue.take();
- if ( modification instanceof Addition )
- {
- byte[] keyBuffer = serializer.serializeKey(
modification.getKey() );
- ByteBuffer bb = ByteBuffer.allocateDirect(
keyBuffer.length + 1 );
- bb.put( Modification.ADDITION );
- bb.put( keyBuffer );
- bb.flip();
-
- channel.write( bb );
-
- byte[] valueBuffer = serializer.serializeValue(
modification.getValue() );
- bb = ByteBuffer.allocateDirect( valueBuffer.length
);
- bb.put( valueBuffer );
- bb.flip();
+ boolean stop = flushModification( channel,
modification );
- channel.write( bb );
- }
- else
+ if ( stop )
{
- byte[] keyBuffer = serializer.serializeKey(
modification.getKey() );
- ByteBuffer bb = ByteBuffer.allocateDirect(
keyBuffer.length + 1 );
- bb.put( Modification.DELETION );
- bb.put( keyBuffer );
- bb.flip();
-
- channel.write( bb );
+ break;
}
-
- // Flush to the disk for real
- channel.force( true );
}
}
catch ( InterruptedException ie )
{
//System.out.println( "Interrupted" );
+ while ( ( modification = modificationsQueue.peek() ) !=
null );
+
+ try
+ {
+ flushModification( channel, modification );
+ }
+ catch ( IOException ioe )
+ {
+ // There is little we can do here...
+ }
}
catch ( Exception e )
{
@@ -257,8 +310,29 @@ public class BTree<K, V>
String fileName = configuration.getFileName();
String journalName = configuration.getJournalName();
- File btreeFile = new File( configuration.getFilePath(), fileName );
- File journalFile = new File( configuration.getJournalPath(),
journalName );
+ if ( fileName == null )
+ {
+ inMemory = true;
+ }
+ else
+ {
+ file = new File( configuration.getFilePath(), fileName );
+
+ String journalPath = configuration.getJournalPath();
+
+ if ( journalPath == null )
+ {
+ journalPath = configuration.getFilePath();
+ }
+
+ if ( journalName == null )
+ {
+ journalName = fileName + JOURNAL_SUFFIX;
+ }
+
+ journal = new File( journalPath, journalName );
+ inMemory = false;
+ }
pageSize = configuration.getPageSize();
comparator = configuration.getComparator();
@@ -270,9 +344,6 @@ public class BTree<K, V>
throw new IllegalArgumentException( "Comparator should not be
null" );
}
- file = btreeFile;
- journal = journalFile;
-
// Now, initialize the BTree
init();
}
@@ -285,7 +356,7 @@ public class BTree<K, V>
*/
public BTree( Comparator<K> comparator ) throws IOException
{
- this( null, comparator, null, DEFAULT_PAGE_SIZE );
+ this( null, null, comparator, null, DEFAULT_PAGE_SIZE );
}
@@ -297,19 +368,20 @@ public class BTree<K, V>
*/
public BTree( Comparator<K> comparator, Serializer<K, V> serializer )
throws IOException
{
- this( null, comparator, serializer, DEFAULT_PAGE_SIZE );
+ this( null, null, comparator, serializer, DEFAULT_PAGE_SIZE );
}
/**
* Creates a new BTree with a default page size and a comparator, with an
associated file.
*
+ * @param path the File path
* @param file The file storing the BTree data
* @param comparator The comparator to use
*/
- public BTree( File file, Comparator<K> comparator ) throws IOException
+ public BTree( String path, String file, Comparator<K> comparator ) throws
IOException
{
- this( file, comparator, null, DEFAULT_PAGE_SIZE );
+ this( path, file, comparator, null, DEFAULT_PAGE_SIZE );
}
@@ -320,9 +392,9 @@ public class BTree<K, V>
* @param comparator The comparator to use
* @param serializer The serializer to use
*/
- public BTree( File file, Comparator<K> comparator, Serializer<K, V>
serializer ) throws IOException
+ public BTree( String path, String file, Comparator<K> comparator,
Serializer<K, V> serializer ) throws IOException
{
- this( file, comparator, serializer, DEFAULT_PAGE_SIZE );
+ this( path, file, comparator, serializer, DEFAULT_PAGE_SIZE );
}
@@ -334,7 +406,7 @@ public class BTree<K, V>
*/
public BTree( Comparator<K> comparator, int pageSize ) throws IOException
{
- this( null, comparator, null, pageSize );
+ this( null, null, comparator, null, pageSize );
}
@@ -347,20 +419,20 @@ public class BTree<K, V>
*/
public BTree( Comparator<K> comparator, Serializer<K, V> serializer, int
pageSize ) throws IOException
{
- this( null, comparator, serializer, pageSize );
+ this( null, null, comparator, serializer, pageSize );
}
/**
- * Creates a new in-memory BTree with a specific page size and a
comparator.
+ * Creates a new BTree with a specific page size and a comparator.
*
* @param file The file storing the BTree data
* @param comparator The comparator to use
* @param pageSize The number of elements we can store in a page
*/
- public BTree( File file, Comparator<K> comparator, int pageSize ) throws
IOException
+ public BTree( String path, String file, Comparator<K> comparator, int
pageSize ) throws IOException
{
- this( file, comparator, null, pageSize );
+ this( path, file, comparator, null, pageSize );
}
@@ -372,7 +444,7 @@ public class BTree<K, V>
* @param serializer The serializer to use
* @param pageSize The number of elements we can store in a page
*/
- public BTree( File file, Comparator<K> comparator, Serializer<K, V>
serializer, int pageSize )
+ public BTree( String path, String file, Comparator<K> comparator,
Serializer<K, V> serializer, int pageSize )
throws IOException
{
if ( comparator == null )
@@ -381,7 +453,34 @@ public class BTree<K, V>
}
this.comparator = comparator;
- this.file = file;
+
+ if ( ( path == null ) && ( file == null ) )
+ {
+ inMemory = true;
+ }
+ else
+ {
+ if ( new File( path, file ).exists() )
+ {
+ this.file = new File( path, file );
+ }
+ else
+ {
+ this.file = new File( path, file + DATA_SUFFIX );
+ }
+
+ if ( new File( path, file + JOURNAL_SUFFIX ).exists() )
+ {
+ this.journal = new File( path, file );
+ }
+ else
+ {
+ this.journal = new File( path, file + JOURNAL_SUFFIX );
+ }
+
+ inMemory = false;
+ }
+
setPageSize( pageSize );
this.serializer = serializer;
@@ -401,14 +500,9 @@ public class BTree<K, V>
readTransactions = new ConcurrentLinkedQueue<Transaction<K, V>>();
// Create the queue containing the modifications, if it's not a
in-memory btree
- if ( file != null )
+ if ( !inMemory )
{
modificationsQueue = new LinkedBlockingDeque<Modification<K, V>>();
- inMemory = false;
- }
- else
- {
- inMemory = true;
}
// Initialize the PageId counter
@@ -432,6 +526,7 @@ public class BTree<K, V>
}
nbElems = new AtomicLong( 0 );
+ writeLock = new ReentrantLock();
// Check the files and create them if missing
if ( file != null )
@@ -446,6 +541,7 @@ public class BTree<K, V>
}
journal.createNewFile();
+ withJournal = true;
}
else
{
@@ -461,6 +557,7 @@ public class BTree<K, V>
}
journal.createNewFile();
+ withJournal = true;
// If the journal is not empty, we have to read it
// and to apply all the modifications to the current file
@@ -470,12 +567,16 @@ public class BTree<K, V>
}
}
}
+ else
+ {
+ withJournal = false;
+ }
// Initialize the txnManager thread
createTransactionManager();
// Initialize the Journal manager thread if it's not a in-memory btree
- if ( !inMemory )
+ if ( !inMemory && withJournal )
{
createJournalManager();
}
@@ -493,8 +594,10 @@ public class BTree<K, V>
if ( !inMemory )
{
- // Stop the journal manager thread
- journalManagerThread.interrupt();
+ // Stop the journal manager thread, by injecting a poison pill into
+ // the queue this thread is using, so that all the epnding data
+ // will be written before it shuts down
+ modificationsQueue.add( new PoisonPill<K, V>() );
// Flush the data
flush();
@@ -918,7 +1021,9 @@ public class BTree<K, V>
pos += len;
bb.flip();
+
channel.write( bb );
+
bb.clear();
}
}
@@ -948,7 +1053,9 @@ public class BTree<K, V>
File tmpFileFD = File.createTempFile( "mavibot", null, baseDirectory );
FileOutputStream stream = new FileOutputStream( tmpFileFD );
FileChannel ch = stream.getChannel();
- ByteBuffer bb = ByteBuffer.allocateDirect( 65536 );
+
+ // Create a buffer containing 200 4Kb pages (around 1Mb)
+ ByteBuffer bb = ByteBuffer.allocateDirect( 4096 * 200 );
Cursor<K, V> cursor = browse();
@@ -1061,10 +1168,11 @@ public class BTree<K, V>
/**
* Read the data from the disk into this BTree. All the existing data in
the
* BTree are kept, the read data will be associated with a new revision.
+ *
* @param file
* @throws IOException
*/
- public void load( File file ) throws IOException
+ private void load( File file ) throws IOException
{
long revision = generateRevision();
@@ -1086,6 +1194,11 @@ public class BTree<K, V>
//List<K> keys = new ArrayList<K>();
//List<V> values = new ArrayList<V>();
+ // desactivate the journal while we load the file
+ boolean isJournalActivated = withJournal;
+
+ withJournal = false;
+
// Loop on all the elements, store them in lists atm
for ( long i = 0; i < nbElems; i++ )
{
@@ -1103,6 +1216,9 @@ public class BTree<K, V>
insert( key, value, revision );
}
+ // Restore the withJournal value
+ withJournal = isJournalActivated;
+
// Now, process the lists to create the btree
// TODO... BulkLoad
}
@@ -1119,8 +1235,11 @@ public class BTree<K, V>
// Then flush the file
flush( file );
- // And delete the journal
- journal.delete();
+ // And empty the journal
+ FileOutputStream stream = new FileOutputStream( journal );
+ FileChannel channel = stream.getChannel();
+ channel.position( 0 );
+ channel.force( true );
}
}
@@ -1144,12 +1263,35 @@ public class BTree<K, V>
/**
+ * @return the file
+ */
+ public File getFile()
+ {
+ return file;
+ }
+
+
+ /**
+ * @return the journal
+ */
+ public File getJournal()
+ {
+ return journal;
+ }
+
+
+ /**
* @see Object#toString()
*/
public String toString()
{
StringBuilder sb = new StringBuilder();
+ if ( inMemory )
+ {
+ sb.append( "In-memory " );
+ }
+
sb.append( "BTree" );
sb.append( "( pageSize:" ).append( pageSize );
@@ -1173,6 +1315,38 @@ public class BTree<K, V>
sb.append( comparator.getClass().getSimpleName() );
}
+ if ( !inMemory )
+ {
+ try
+ {
+ sb.append( ", file : " );
+
+ if ( file != null )
+ {
+ sb.append( file.getCanonicalPath() );
+ }
+ else
+ {
+ sb.append( "Unknown" );
+ }
+
+ sb.append( ", journal : " );
+
+ if ( journal != null )
+ {
+ sb.append( journal.getCanonicalPath() );
+ }
+ else
+ {
+ sb.append( "Unkown" );
+ }
+ }
+ catch ( IOException ioe )
+ {
+ // There is little we can do here...
+ }
+ }
+
sb.append( ") : \n" );
sb.append( rootPage.dumpPage( "" ) );
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]