Author: elecharny
Date: Mon Aug 13 13:51:24 2012
New Revision: 1372422
URL: http://svn.apache.org/viewvc?rev=1372422&view=rev
Log:
o Get rid of the rootPages Map, we don't need it.
o Added a thread used to cleanup the timed out transactions
o Added a close() method, used to kill the cleanup thread
Modified:
labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java
Modified:
labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java
URL:
http://svn.apache.org/viewvc/labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java?rev=1372422&r1=1372421&r2=1372422&view=diff
==============================================================================
---
labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java
(original)
+++
labs/mavibot/trunk/mavibot/src/main/java/org/apache/mavibot/btree/BTree.java
Mon Aug 13 13:51:24 2012
@@ -30,8 +30,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Comparator;
import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.mavibot.btree.serializer.BufferHandler;
@@ -64,8 +63,9 @@ public class BTree<K, V>
/** The current rootPage */
protected volatile Page<K, V> rootPage;
- /** A map containing all the existing revisions */
- private Map<Long, Page<K, V>> roots = new ConcurrentHashMap<Long, Page<K,
V>>();
+ /** The list of read transactions being executed */
+ //private ConcurrentDoublyLinkedList<Transaction<K, V>> readTransactions;
+ private ConcurrentLinkedQueue<Transaction<K, V>> readTransactions;
/** Number of entries in each Page. */
protected int pageSize;
@@ -84,6 +84,69 @@ public class BTree<K, V>
/** The number of elements in the current revision */
private AtomicLong nbElems = new AtomicLong( 0 );
+ Thread readTransactionsThread;
+
+
+ /**
+ * Create a thread that is responsible of cleaning the transactions when
+ * they hit the timeout
+ */
+ private void createTransactionManager()
+ {
+ Runnable readTransactionTask = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Transaction<K, V> transaction = null;
+
+ while ( !Thread.currentThread().isInterrupted() )
+ {
+ long timeoutDate = System.currentTimeMillis() - 10000L;
+
+ // Loop on all the transactions from the queue
+ while ( ( transaction = readTransactions.peek() ) !=
null )
+ {
+ if ( transaction.isClosed() )
+ {
+ // The transaction is already closed, remove
it from the queue
+ readTransactions.poll();
+ continue;
+ }
+
+ // Check if the transaction has timed out
+ if ( transaction.getCreationDate() < timeoutDate )
+ {
+ transaction.close();
+ readTransactions.poll();
+ continue;
+ }
+
+ // We need to stop now
+ break;
+ }
+
+ //System.out.println( "Sleep for 10 seconds" );
+ Thread.sleep( 10000L );
+ }
+ }
+ catch ( InterruptedException ie )
+ {
+ //System.out.println( "Interrupted" );
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException( e );
+ }
+ }
+ };
+
+ readTransactionsThread = new Thread( readTransactionTask );
+ readTransactionsThread.setDaemon( true );
+ readTransactionsThread.start();
+ }
+
/**
* Creates a new in-memory BTree using the BTreeConfiguration to
initialize the
@@ -243,9 +306,9 @@ public class BTree<K, V>
*/
public void init() throws IOException
{
-
- // Create the map containing all the revisions
- roots = new ConcurrentHashMap<Long, Page<K, V>>();
+ // Create the queue containing the pending read transactions
+ //readTransactions = new ConcurrentDoublyLinkedList<Transaction<K,
V>>();
+ readTransactions = new ConcurrentLinkedQueue<Transaction<K, V>>();
// Initialize the PageId counter
pageRecordIdGenerator = new AtomicLong( 0 );
@@ -256,7 +319,6 @@ public class BTree<K, V>
// Create the first root page, with revision 0L. It will be empty
// and increment the revision at the same time
rootPage = new Leaf<K, V>( this );
- roots.put( revision.getAndIncrement(), rootPage );
// We will extract the Type to use for keys, using the comparator for
that
Class<?> comparatorClass = comparator.getClass();
@@ -267,6 +329,20 @@ public class BTree<K, V>
{
keyType = ( Class<?> ) argumentTypes[0];
}
+
+ // Initialize the txnManager thread
+ createTransactionManager();
+ }
+
+
+ /**
+ * Close the BTree, cleaning up all the data structure
+ */
+ public void close()
+ {
+ readTransactionsThread.interrupt();
+ readTransactions.clear();
+ rootPage = null;
}
@@ -319,7 +395,6 @@ public class BTree<K, V>
/* No qualifier */void setRoot( Page<K, V> root )
{
rootPage = root;
- roots.put( revision.getAndIncrement(), rootPage );
}
@@ -452,9 +527,6 @@ public class BTree<K, V>
tuple = removeResult.getRemovedElement();
}
- // Save the rootPage into the roots with the new revision.
- roots.put( revision, rootPage );
-
// Return the value we have found if it was modified
return tuple;
}
@@ -493,7 +565,7 @@ public class BTree<K, V>
Transaction<K, V> transaction = beginReadTransaction();
// Fetch the root page for this revision
- Page<K, V> root = roots.get( transaction.getRevision() );
+ Page<K, V> root = rootPage;
Cursor<K, V> cursor = root.browse( key, transaction, new
LinkedList<ParentPos<K, V>>() );
return cursor;
@@ -511,7 +583,7 @@ public class BTree<K, V>
Transaction<K, V> transaction = beginReadTransaction();
// Fetch the root page for this revision
- Page<K, V> root = roots.get( transaction.getRevision() );
+ Page<K, V> root = rootPage;
LinkedList<ParentPos<K, V>> stack = new LinkedList<ParentPos<K, V>>();
Cursor<K, V> cursor = root.browse( transaction, stack );
@@ -583,9 +655,6 @@ public class BTree<K, V>
rootPage = new Node<K, V>( this, revision, pivot, leftPage,
rightPage );
}
- // Save the rootPage into the roots with the new revision.
- roots.put( revision, rootPage );
-
// Return the value we have found if it was modified
return modifiedValue;
}
@@ -602,11 +671,13 @@ public class BTree<K, V>
* automatically closed after the timeout
* @return The created transaction
*/
- public Transaction<K, V> beginReadTransaction()
+ private Transaction<K, V> beginReadTransaction()
{
Transaction<K, V> readTransaction = new Transaction<K, V>( rootPage,
revision.get() - 1,
System.currentTimeMillis() );
+ readTransactions.add( readTransaction );
+
return readTransaction;
}
@@ -766,7 +837,7 @@ public class BTree<K, V>
long nbElems = LongSerializer.deserialize( bufferHandler.read( 8 ) );
- // Prepare a list of keys and values rad from the disk
+ // Prepare a list of keys and values read from the disk
//List<K> keys = new ArrayList<K>();
//List<V> values = new ArrayList<V>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]