Author: elecharny
Date: Fri Aug 17 15:56:44 2012
New Revision: 1374333

URL: http://svn.apache.org/viewvc?rev=1374333&view=rev
Log:
o Added the suffixes
o The journal thread now stops correctly when the queue has been fully flushed
o Dealing with various case of Journal and File name initialization
o The BTree now uses a path and a filename
o Increased the buffer used to write data on disk to speed up the flush
o Empty the journal when flushing the data instead of deleting it

Modified:
    labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java

Modified: 
labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java
URL: 
http://svn.apache.org/viewvc/labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java?rev=1374333&r1=1374332&r2=1374333&view=diff
==============================================================================
--- 
labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java 
(original)
+++ 
labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java 
Fri Aug 17 15:56:44 2012
@@ -58,6 +58,12 @@ public class BTree<K, V>
     /** The default journal name */
     public static final String DEFAULT_JOURNAL = "mavibot.log";
 
+    /** The default data file suffix */
+    public static final String DATA_SUFFIX = ".data";
+
+    /** The default journal file suffix */
+    public static final String JOURNAL_SUFFIX = ".log";
+
     /** A field used to generate new revisions in a thread safe way */
     private AtomicLong revision;
 
@@ -90,6 +96,9 @@ public class BTree<K, V>
     /** A flag set to true when the BTree is a in-memory BTree */
     private boolean inMemory;
 
+    /** A flag used to tell the BTree that the journal is activated */
+    private boolean withJournal;
+
     /** The associated journal. If null, this is an in-memory btree  */
     private File journal;
 
@@ -97,7 +106,7 @@ public class BTree<K, V>
     private AtomicLong nbElems;
 
     /** A lock used to protect the write operation against concurrent access */
-    private final ReentrantLock writeLock = new ReentrantLock();
+    private ReentrantLock writeLock;
 
     /** The thread responsible for the cleanup of timed out reads */
     private Thread readTransactionsThread;
@@ -132,10 +141,14 @@ public class BTree<K, V>
                     while ( !Thread.currentThread().isInterrupted() )
                     {
                         long timeoutDate = System.currentTimeMillis() - 
readTimeOut;
+                        long t0 = System.currentTimeMillis();
+                        int nbTxns = 0;
 
                         // Loop on all the transactions from the queue
                         while ( ( transaction = readTransactions.peek() ) != 
null )
                         {
+                            nbTxns++;
+
                             if ( transaction.isClosed() )
                             {
                                 // The transaction is already closed, remove 
it from the queue
@@ -155,6 +168,13 @@ public class BTree<K, V>
                             break;
                         }
 
+                        long t1 = System.currentTimeMillis();
+
+                        if ( nbTxns > 0 )
+                        {
+                            System.out.println( "Processing old txn : " + 
nbTxns + ", " + ( t1 - t0 ) + "ms" );
+                        }
+
                         // Wait until we reach the timeout
                         Thread.sleep( readTimeOut );
                     }
@@ -186,52 +206,85 @@ public class BTree<K, V>
     {
         Runnable journalTask = new Runnable()
         {
+            private boolean flushModification( FileChannel channel, 
Modification<K, V> modification )
+                throws IOException
+            {
+                if ( modification instanceof Addition )
+                {
+                    byte[] keyBuffer = serializer.serializeKey( 
modification.getKey() );
+                    ByteBuffer bb = ByteBuffer.allocateDirect( 
keyBuffer.length + 1 );
+                    bb.put( Modification.ADDITION );
+                    bb.put( keyBuffer );
+                    bb.flip();
+
+                    channel.write( bb );
+
+                    byte[] valueBuffer = serializer.serializeValue( 
modification.getValue() );
+                    bb = ByteBuffer.allocateDirect( valueBuffer.length );
+                    bb.put( valueBuffer );
+                    bb.flip();
+
+                    channel.write( bb );
+                }
+                else if ( modification instanceof Deletion )
+                {
+                    byte[] keyBuffer = serializer.serializeKey( 
modification.getKey() );
+                    ByteBuffer bb = ByteBuffer.allocateDirect( 
keyBuffer.length + 1 );
+                    bb.put( Modification.DELETION );
+                    bb.put( keyBuffer );
+                    bb.flip();
+
+                    channel.write( bb );
+                }
+                else
+                // This is the poison pill, just exit
+                {
+                    return false;
+                }
+
+                // Flush to the disk for real
+                channel.force( true );
+
+                return true;
+            }
+
+
             public void run()
             {
+                Modification<K, V> modification = null;
+                FileOutputStream stream;
+                FileChannel channel = null;
+
                 try
                 {
-                    FileOutputStream stream = new FileOutputStream( journal );
-                    FileChannel channel = stream.getChannel();
+                    stream = new FileOutputStream( journal );
+                    channel = stream.getChannel();
 
                     while ( !Thread.currentThread().isInterrupted() )
                     {
-                        Modification<K, V> modification = 
modificationsQueue.take();
+                        modification = modificationsQueue.take();
 
-                        if ( modification instanceof Addition )
-                        {
-                            byte[] keyBuffer = serializer.serializeKey( 
modification.getKey() );
-                            ByteBuffer bb = ByteBuffer.allocateDirect( 
keyBuffer.length + 1 );
-                            bb.put( Modification.ADDITION );
-                            bb.put( keyBuffer );
-                            bb.flip();
-
-                            channel.write( bb );
-
-                            byte[] valueBuffer = serializer.serializeValue( 
modification.getValue() );
-                            bb = ByteBuffer.allocateDirect( valueBuffer.length 
);
-                            bb.put( valueBuffer );
-                            bb.flip();
+                        boolean stop = flushModification( channel, 
modification );
 
-                            channel.write( bb );
-                        }
-                        else
+                        if ( stop )
                         {
-                            byte[] keyBuffer = serializer.serializeKey( 
modification.getKey() );
-                            ByteBuffer bb = ByteBuffer.allocateDirect( 
keyBuffer.length + 1 );
-                            bb.put( Modification.DELETION );
-                            bb.put( keyBuffer );
-                            bb.flip();
-
-                            channel.write( bb );
+                            break;
                         }
-
-                        // Flush to the disk for real
-                        channel.force( true );
                     }
                 }
                 catch ( InterruptedException ie )
                 {
                     //System.out.println( "Interrupted" );
+                    while ( ( modification = modificationsQueue.peek() ) != 
null );
+
+                    try
+                    {
+                        flushModification( channel, modification );
+                    }
+                    catch ( IOException ioe )
+                    {
+                        // There is little we can do here...
+                    }
                 }
                 catch ( Exception e )
                 {
@@ -257,8 +310,29 @@ public class BTree<K, V>
         String fileName = configuration.getFileName();
         String journalName = configuration.getJournalName();
 
-        File btreeFile = new File( configuration.getFilePath(), fileName );
-        File journalFile = new File( configuration.getJournalPath(), 
journalName );
+        if ( fileName == null )
+        {
+            inMemory = true;
+        }
+        else
+        {
+            file = new File( configuration.getFilePath(), fileName );
+
+            String journalPath = configuration.getJournalPath();
+
+            if ( journalPath == null )
+            {
+                journalPath = configuration.getFilePath();
+            }
+
+            if ( journalName == null )
+            {
+                journalName = fileName + JOURNAL_SUFFIX;
+            }
+
+            journal = new File( journalPath, journalName );
+            inMemory = false;
+        }
 
         pageSize = configuration.getPageSize();
         comparator = configuration.getComparator();
@@ -270,9 +344,6 @@ public class BTree<K, V>
             throw new IllegalArgumentException( "Comparator should not be 
null" );
         }
 
-        file = btreeFile;
-        journal = journalFile;
-
         // Now, initialize the BTree
         init();
     }
@@ -285,7 +356,7 @@ public class BTree<K, V>
      */
     public BTree( Comparator<K> comparator ) throws IOException
     {
-        this( null, comparator, null, DEFAULT_PAGE_SIZE );
+        this( null, null, comparator, null, DEFAULT_PAGE_SIZE );
     }
 
 
@@ -297,19 +368,20 @@ public class BTree<K, V>
      */
     public BTree( Comparator<K> comparator, Serializer<K, V> serializer ) 
throws IOException
     {
-        this( null, comparator, serializer, DEFAULT_PAGE_SIZE );
+        this( null, null, comparator, serializer, DEFAULT_PAGE_SIZE );
     }
 
 
     /**
      * Creates a new BTree with a default page size and a comparator, with an 
associated file.
      * 
+     * @param path the File path
      * @param file The file storing the BTree data
      * @param comparator The comparator to use
      */
-    public BTree( File file, Comparator<K> comparator ) throws IOException
+    public BTree( String path, String file, Comparator<K> comparator ) throws 
IOException
     {
-        this( file, comparator, null, DEFAULT_PAGE_SIZE );
+        this( path, file, comparator, null, DEFAULT_PAGE_SIZE );
     }
 
 
@@ -320,9 +392,9 @@ public class BTree<K, V>
      * @param comparator The comparator to use
      * @param serializer The serializer to use
      */
-    public BTree( File file, Comparator<K> comparator, Serializer<K, V> 
serializer ) throws IOException
+    public BTree( String path, String file, Comparator<K> comparator, 
Serializer<K, V> serializer ) throws IOException
     {
-        this( file, comparator, serializer, DEFAULT_PAGE_SIZE );
+        this( path, file, comparator, serializer, DEFAULT_PAGE_SIZE );
     }
 
 
@@ -334,7 +406,7 @@ public class BTree<K, V>
      */
     public BTree( Comparator<K> comparator, int pageSize ) throws IOException
     {
-        this( null, comparator, null, pageSize );
+        this( null, null, comparator, null, pageSize );
     }
 
 
@@ -347,20 +419,20 @@ public class BTree<K, V>
      */
     public BTree( Comparator<K> comparator, Serializer<K, V> serializer, int 
pageSize ) throws IOException
     {
-        this( null, comparator, serializer, pageSize );
+        this( null, null, comparator, serializer, pageSize );
     }
 
 
     /**
-     * Creates a new in-memory BTree with a specific page size and a 
comparator.
+     * Creates a new BTree with a specific page size and a comparator.
      * 
      * @param file The file storing the BTree data
      * @param comparator The comparator to use
      * @param pageSize The number of elements we can store in a page
      */
-    public BTree( File file, Comparator<K> comparator, int pageSize ) throws 
IOException
+    public BTree( String path, String file, Comparator<K> comparator, int 
pageSize ) throws IOException
     {
-        this( file, comparator, null, pageSize );
+        this( path, file, comparator, null, pageSize );
     }
 
 
@@ -372,7 +444,7 @@ public class BTree<K, V>
      * @param serializer The serializer to use
      * @param pageSize The number of elements we can store in a page
      */
-    public BTree( File file, Comparator<K> comparator, Serializer<K, V> 
serializer, int pageSize )
+    public BTree( String path, String file, Comparator<K> comparator, 
Serializer<K, V> serializer, int pageSize )
         throws IOException
     {
         if ( comparator == null )
@@ -381,7 +453,34 @@ public class BTree<K, V>
         }
 
         this.comparator = comparator;
-        this.file = file;
+
+        if ( ( path == null ) && ( file == null ) )
+        {
+            inMemory = true;
+        }
+        else
+        {
+            if ( new File( path, file ).exists() )
+            {
+                this.file = new File( path, file );
+            }
+            else
+            {
+                this.file = new File( path, file + DATA_SUFFIX );
+            }
+
+            if ( new File( path, file + JOURNAL_SUFFIX ).exists() )
+            {
+                this.journal = new File( path, file );
+            }
+            else
+            {
+                this.journal = new File( path, file + JOURNAL_SUFFIX );
+            }
+
+            inMemory = false;
+        }
+
         setPageSize( pageSize );
         this.serializer = serializer;
 
@@ -401,14 +500,9 @@ public class BTree<K, V>
         readTransactions = new ConcurrentLinkedQueue<Transaction<K, V>>();
 
         // Create the queue containing the modifications, if it's not a 
in-memory btree
-        if ( file != null )
+        if ( !inMemory )
         {
             modificationsQueue = new LinkedBlockingDeque<Modification<K, V>>();
-            inMemory = false;
-        }
-        else
-        {
-            inMemory = true;
         }
 
         // Initialize the PageId counter
@@ -432,6 +526,7 @@ public class BTree<K, V>
         }
 
         nbElems = new AtomicLong( 0 );
+        writeLock = new ReentrantLock();
 
         // Check the files and create them if missing
         if ( file != null )
@@ -446,6 +541,7 @@ public class BTree<K, V>
                 }
 
                 journal.createNewFile();
+                withJournal = true;
             }
             else
             {
@@ -461,6 +557,7 @@ public class BTree<K, V>
                 }
 
                 journal.createNewFile();
+                withJournal = true;
 
                 // If the journal is not empty, we have to read it
                 // and to apply all the modifications to the current file
@@ -470,12 +567,16 @@ public class BTree<K, V>
                 }
             }
         }
+        else
+        {
+            withJournal = false;
+        }
 
         // Initialize the txnManager thread
         createTransactionManager();
 
         // Initialize the Journal manager thread if it's not a in-memory btree
-        if ( !inMemory )
+        if ( !inMemory && withJournal )
         {
             createJournalManager();
         }
@@ -493,8 +594,10 @@ public class BTree<K, V>
 
         if ( !inMemory )
         {
-            // Stop the journal manager thread
-            journalManagerThread.interrupt();
+            // Stop the journal manager thread, by injecting a poison pill into
+            // the queue this thread is using, so that all the epnding data
+            // will be written before it shuts down
+            modificationsQueue.add( new PoisonPill<K, V>() );
 
             // Flush the data
             flush();
@@ -918,7 +1021,9 @@ public class BTree<K, V>
                 pos += len;
 
                 bb.flip();
+
                 channel.write( bb );
+
                 bb.clear();
             }
         }
@@ -948,7 +1053,9 @@ public class BTree<K, V>
         File tmpFileFD = File.createTempFile( "mavibot", null, baseDirectory );
         FileOutputStream stream = new FileOutputStream( tmpFileFD );
         FileChannel ch = stream.getChannel();
-        ByteBuffer bb = ByteBuffer.allocateDirect( 65536 );
+
+        // Create a buffer containing 200 4Kb pages (around 1Mb)
+        ByteBuffer bb = ByteBuffer.allocateDirect( 4096 * 200 );
 
         Cursor<K, V> cursor = browse();
 
@@ -1061,10 +1168,11 @@ public class BTree<K, V>
     /**
      * Read the data from the disk into this BTree. All the existing data in 
the 
      * BTree are kept, the read data will be associated with a new revision.
+     * 
      * @param file
      * @throws IOException
      */
-    public void load( File file ) throws IOException
+    private void load( File file ) throws IOException
     {
         long revision = generateRevision();
 
@@ -1086,6 +1194,11 @@ public class BTree<K, V>
         //List<K> keys = new ArrayList<K>();
         //List<V> values = new ArrayList<V>();
 
+        // desactivate the journal while we load the file
+        boolean isJournalActivated = withJournal;
+
+        withJournal = false;
+
         // Loop on all the elements, store them in lists atm
         for ( long i = 0; i < nbElems; i++ )
         {
@@ -1103,6 +1216,9 @@ public class BTree<K, V>
             insert( key, value, revision );
         }
 
+        // Restore the withJournal value
+        withJournal = isJournalActivated;
+
         // Now, process the lists to create the btree
         // TODO... BulkLoad
     }
@@ -1119,8 +1235,11 @@ public class BTree<K, V>
             // Then flush the file
             flush( file );
 
-            // And delete the journal
-            journal.delete();
+            // And empty the journal
+            FileOutputStream stream = new FileOutputStream( journal );
+            FileChannel channel = stream.getChannel();
+            channel.position( 0 );
+            channel.force( true );
         }
     }
 
@@ -1144,12 +1263,35 @@ public class BTree<K, V>
 
 
     /**
+     * @return the file
+     */
+    public File getFile()
+    {
+        return file;
+    }
+
+
+    /**
+     * @return the journal
+     */
+    public File getJournal()
+    {
+        return journal;
+    }
+
+
+    /**
      * @see Object#toString()
      */
     public String toString()
     {
         StringBuilder sb = new StringBuilder();
 
+        if ( inMemory )
+        {
+            sb.append( "In-memory " );
+        }
+
         sb.append( "BTree" );
         sb.append( "( pageSize:" ).append( pageSize );
 
@@ -1173,6 +1315,38 @@ public class BTree<K, V>
             sb.append( comparator.getClass().getSimpleName() );
         }
 
+        if ( !inMemory )
+        {
+            try
+            {
+                sb.append( ", file : " );
+
+                if ( file != null )
+                {
+                    sb.append( file.getCanonicalPath() );
+                }
+                else
+                {
+                    sb.append( "Unknown" );
+                }
+
+                sb.append( ", journal : " );
+
+                if ( journal != null )
+                {
+                    sb.append( journal.getCanonicalPath() );
+                }
+                else
+                {
+                    sb.append( "Unkown" );
+                }
+            }
+            catch ( IOException ioe )
+            {
+                // There is little we can do here...
+            }
+        }
+
         sb.append( ") : \n" );
         sb.append( rootPage.dumpPage( "" ) );
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to