Author: elecharny
Date: Mon Aug 13 13:51:24 2012
New Revision: 1372422

URL: http://svn.apache.org/viewvc?rev=1372422&view=rev
Log:
o Get rid of the rootPages Map, we don't need it.
o Added a thread used to cleanup the timed out transactions
o Added a close() method, used to kill the cleanup thread

Modified:
    labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java

Modified: 
labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java
URL: 
http://svn.apache.org/viewvc/labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java?rev=1372422&r1=1372421&r2=1372422&view=diff
==============================================================================
--- 
labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java 
(original)
+++ 
labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java 
Mon Aug 13 13:51:24 2012
@@ -30,8 +30,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.Comparator;
 import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.mavibot.btree.serializer.BufferHandler;
@@ -64,8 +63,9 @@ public class BTree<K, V>
     /** The current rootPage */
     protected volatile Page<K, V> rootPage;
 
-    /** A map containing all the existing revisions */
-    private Map<Long, Page<K, V>> roots = new ConcurrentHashMap<Long, Page<K, 
V>>();
+    /** The list of read transactions being executed */
+    //private ConcurrentDoublyLinkedList<Transaction<K, V>> readTransactions;
+    private ConcurrentLinkedQueue<Transaction<K, V>> readTransactions;
 
     /** Number of entries in each Page. */
     protected int pageSize;
@@ -84,6 +84,69 @@ public class BTree<K, V>
     /** The number of elements in the current revision */
     private AtomicLong nbElems = new AtomicLong( 0 );
 
+    Thread readTransactionsThread;
+
+
+    /**
+     * Create a thread that is responsible of cleaning the transactions when
+     * they hit the timeout
+     */
+    private void createTransactionManager()
+    {
+        Runnable readTransactionTask = new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    Transaction<K, V> transaction = null;
+
+                    while ( !Thread.currentThread().isInterrupted() )
+                    {
+                        long timeoutDate = System.currentTimeMillis() - 10000L;
+
+                        // Loop on all the transactions from the queue
+                        while ( ( transaction = readTransactions.peek() ) != 
null )
+                        {
+                            if ( transaction.isClosed() )
+                            {
+                                // The transaction is already closed, remove 
it from the queue
+                                readTransactions.poll();
+                                continue;
+                            }
+
+                            // Check if the transaction has timed out
+                            if ( transaction.getCreationDate() < timeoutDate )
+                            {
+                                transaction.close();
+                                readTransactions.poll();
+                                continue;
+                            }
+
+                            // We need to stop now
+                            break;
+                        }
+
+                        //System.out.println( "Sleep for 10 seconds" );
+                        Thread.sleep( 10000L );
+                    }
+                }
+                catch ( InterruptedException ie )
+                {
+                    //System.out.println( "Interrupted" );
+                }
+                catch ( Exception e )
+                {
+                    throw new RuntimeException( e );
+                }
+            }
+        };
+
+        readTransactionsThread = new Thread( readTransactionTask );
+        readTransactionsThread.setDaemon( true );
+        readTransactionsThread.start();
+    }
+
 
     /**
      * Creates a new in-memory BTree using the BTreeConfiguration to 
initialize the 
@@ -243,9 +306,9 @@ public class BTree<K, V>
      */
     public void init() throws IOException
     {
-
-        // Create the map containing all the revisions
-        roots = new ConcurrentHashMap<Long, Page<K, V>>();
+        // Create the queue containing the pending read transactions
+        //readTransactions = new ConcurrentDoublyLinkedList<Transaction<K, 
V>>();
+        readTransactions = new ConcurrentLinkedQueue<Transaction<K, V>>();
 
         // Initialize the PageId counter
         pageRecordIdGenerator = new AtomicLong( 0 );
@@ -256,7 +319,6 @@ public class BTree<K, V>
         // Create the first root page, with revision 0L. It will be empty
         // and increment the revision at the same time
         rootPage = new Leaf<K, V>( this );
-        roots.put( revision.getAndIncrement(), rootPage );
 
         // We will extract the Type to use for keys, using the comparator for 
that
         Class<?> comparatorClass = comparator.getClass();
@@ -267,6 +329,20 @@ public class BTree<K, V>
         {
             keyType = ( Class<?> ) argumentTypes[0];
         }
+
+        // Initialize the txnManager thread
+        createTransactionManager();
+    }
+
+
+    /**
+     * Close the BTree, cleaning up all the data structure
+     */
+    public void close()
+    {
+        readTransactionsThread.interrupt();
+        readTransactions.clear();
+        rootPage = null;
     }
 
 
@@ -319,7 +395,6 @@ public class BTree<K, V>
     /* No qualifier */void setRoot( Page<K, V> root )
     {
         rootPage = root;
-        roots.put( revision.getAndIncrement(), rootPage );
     }
 
 
@@ -452,9 +527,6 @@ public class BTree<K, V>
                 tuple = removeResult.getRemovedElement();
             }
 
-            // Save the rootPage into the roots with the new revision.
-            roots.put( revision, rootPage );
-
             // Return the value we have found if it was modified
             return tuple;
         }
@@ -493,7 +565,7 @@ public class BTree<K, V>
         Transaction<K, V> transaction = beginReadTransaction();
 
         // Fetch the root page for this revision
-        Page<K, V> root = roots.get( transaction.getRevision() );
+        Page<K, V> root = rootPage;
         Cursor<K, V> cursor = root.browse( key, transaction, new 
LinkedList<ParentPos<K, V>>() );
 
         return cursor;
@@ -511,7 +583,7 @@ public class BTree<K, V>
         Transaction<K, V> transaction = beginReadTransaction();
 
         // Fetch the root page for this revision
-        Page<K, V> root = roots.get( transaction.getRevision() );
+        Page<K, V> root = rootPage;
         LinkedList<ParentPos<K, V>> stack = new LinkedList<ParentPos<K, V>>();
 
         Cursor<K, V> cursor = root.browse( transaction, stack );
@@ -583,9 +655,6 @@ public class BTree<K, V>
                 rootPage = new Node<K, V>( this, revision, pivot, leftPage, 
rightPage );
             }
 
-            // Save the rootPage into the roots with the new revision.
-            roots.put( revision, rootPage );
-
             // Return the value we have found if it was modified
             return modifiedValue;
         }
@@ -602,11 +671,13 @@ public class BTree<K, V>
      * automatically closed after the timeout
      * @return The created transaction
      */
-    public Transaction<K, V> beginReadTransaction()
+    private Transaction<K, V> beginReadTransaction()
     {
         Transaction<K, V> readTransaction = new Transaction<K, V>( rootPage, 
revision.get() - 1,
             System.currentTimeMillis() );
 
+        readTransactions.add( readTransaction );
+
         return readTransaction;
     }
 
@@ -766,7 +837,7 @@ public class BTree<K, V>
 
         long nbElems = LongSerializer.deserialize( bufferHandler.read( 8 ) );
 
-        // Prepare a list of keys and values rad from the disk
+        // Prepare a list of keys and values read from the disk
         //List<K> keys = new ArrayList<K>();
         //List<V> values = new ArrayList<V>();
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to