Author: jbellis
Date: Wed Apr 15 20:30:43 2009
New Revision: 765343
URL: http://svn.apache.org/viewvc?rev=765343&view=rev
Log:
Move from ExecutorService per ColumnFamily to ES per Memtable. This allows us
to
wait for the ES to quiesce completely before flushing, preventing the
possibility
of ConcurrentModificationException when a get scheduled before the switch
executes
concurrently with flush. It also provides a simpler mental model (only one
thread
touches memtable at a time, period) which is a valuable property. Finally, it
is
slightly more performant since it avoids hashing the CF name for each operation.
Patch by jbellis; reviewed by Todd Lipcon for #9
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=765343&r1=765342&r2=765343&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
(original)
+++
incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Wed Apr 15 20:30:43 2009
@@ -32,7 +32,7 @@
* Author : Avinash Lakshman ( [email protected]) & Prashant Malik (
[email protected] )
*/
-public final class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
+public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
{
private static Logger logger_ =
Logger.getLogger(DebuggableThreadPoolExecutor.class);
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=765343&r1=765342&r2=765343&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Wed Apr
15 20:30:43 2009
@@ -18,15 +18,12 @@
package org.apache.cassandra.db;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
import java.util.Comparator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -34,7 +31,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -49,6 +45,7 @@
import org.apache.cassandra.utils.LogUtil;
import org.apache.cassandra.service.IPartitioner;
import org.apache.cassandra.service.StorageService;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
/**
* Author : Avinash Lakshman ( [email protected]) & Prashant Malik (
[email protected] )
@@ -57,18 +54,19 @@
public class Memtable implements Comparable<Memtable>
{
private static Logger logger_ = Logger.getLogger( Memtable.class );
- private static Map<String, ExecutorService> apartments_ = new
HashMap<String, ExecutorService>();
+ private static Set<ExecutorService> runningExecutorServices_ = new
NonBlockingHashSet<ExecutorService>();
public static final String flushKey_ = "FlushKey";
-
+
public static void shutdown()
{
- Set<String> names = apartments_.keySet();
- for (String name : names)
- {
- apartments_.get(name).shutdownNow();
- }
+ for (ExecutorService exs : runningExecutorServices_)
+ {
+ exs.shutdownNow();
+ }
}
+ private MemtableThreadPoolExecutor executor_;
+
private int threshold_ = DatabaseDescriptor.getMemtableSize()*1024*1024;
private int thresholdCount_ =
DatabaseDescriptor.getMemtableObjectCount()*1024*1024;
private AtomicInteger currentSize_ = new AtomicInteger(0);
@@ -79,23 +77,15 @@
private String cfName_;
/* Creation time of this Memtable */
private long creationTime_;
- private boolean isFrozen_ = false;
+ private volatile boolean isFrozen_ = false;
private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String,
ColumnFamily>();
/* Lock and Condition for notifying new clients about Memtable switches */
Lock lock_ = new ReentrantLock();
Memtable(String table, String cfName)
{
- if ( apartments_.get(cfName) == null )
- {
- apartments_.put(cfName, new DebuggableThreadPoolExecutor( 1,
- 1,
- Integer.MAX_VALUE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new ThreadFactoryImpl("FAST-MEMTABLE-POOL")
- ));
- }
+ executor_ = new MemtableThreadPoolExecutor();
+ runningExecutorServices_.add(executor_);
table_ = table;
cfName_ = cfName;
@@ -145,28 +135,6 @@
}
/**
- * Flushes the current memtable to disk.
- *
- * @author alakshman
- *
- */
- class Flusher implements Runnable
- {
- private CommitLog.CommitLogContext cLogCtx_;
-
- Flusher(CommitLog.CommitLogContext cLogCtx)
- {
- cLogCtx_ = cLogCtx;
- }
-
- public void run()
- {
- ColumnFamilyStore cfStore =
Table.open(table_).getColumnFamilyStore(cfName_);
- MemtableManager.instance().submit(cfName_, Memtable.this,
cLogCtx_);
- }
- }
-
- /**
* Compares two Memtable based on creation time.
* @param rhs Memtable to compare to.
* @return a negative integer, zero, or a positive integer as this object
@@ -222,8 +190,7 @@
void printExecutorStats()
{
- DebuggableThreadPoolExecutor es =
(DebuggableThreadPoolExecutor)apartments_.get(cfName_);
- long taskCount = (es.getTaskCount() - es.getCompletedTaskCount());
+ long taskCount = (executor_.getTaskCount() -
executor_.getCompletedTaskCount());
logger_.debug("MEMTABLE TASKS : " + taskCount);
}
@@ -232,25 +199,31 @@
* the memtable. This version will respect the threshold and flush
* the memtable to disk when the size exceeds the threshold.
*/
- void put(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext
cLogCtx) throws IOException
+ void put(String key, ColumnFamily columnFamily, final
CommitLog.CommitLogContext cLogCtx) throws IOException
{
if (isThresholdViolated(key) )
{
lock_.lock();
try
{
- ColumnFamilyStore cfStore =
Table.open(table_).getColumnFamilyStore(cfName_);
+ final ColumnFamilyStore cfStore =
Table.open(table_).getColumnFamilyStore(cfName_);
if (!isFrozen_)
{
isFrozen_ = true;
- /* Submit this Memtable to be flushed. */
- Runnable flusher = new Flusher(cLogCtx);
- apartments_.get(cfName_).submit(flusher);
- /* switch the memtable */
+ Runnable flushQueuer = new Runnable()
+ {
+ public void run()
+ {
+
MemtableManager.instance().submit(cfStore.getColumnFamilyName(), Memtable.this,
cLogCtx);
+ }
+ };
cfStore.switchMemtable(key, columnFamily, cLogCtx);
+ executor_.runOnTermination(flushQueuer);
+ executor_.shutdown();
}
else
{
+ // retry the put on the new memtable
cfStore.apply(key, columnFamily, cLogCtx);
}
}
@@ -263,7 +236,7 @@
{
printExecutorStats();
Runnable putter = new Putter(key, columnFamily);
- apartments_.get(cfName_).submit(putter);
+ executor_.submit(putter);
}
}
@@ -375,7 +348,7 @@
ColumnFamily cf = null;
try
{
- cf = apartments_.get(cfName_).submit(call).get();
+ cf = executor_.submit(call).get();
}
catch ( ExecutionException ex )
{
@@ -440,4 +413,26 @@
columnFamilies_.clear();
}
+ private static class MemtableThreadPoolExecutor extends
DebuggableThreadPoolExecutor
+ {
+ private ArrayList<Runnable> terminatedHooks = new
ArrayList<Runnable>();
+
+ public MemtableThreadPoolExecutor()
+ {
+ super(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("FAST-MEMTABLE-POOL"));
+ }
+
+ protected void terminated()
+ {
+ super.terminated();
+ runningExecutorServices_.remove(this);
+ for (Runnable hook : terminatedHooks) {
+ hook.run();
+ }
+ }
+
+ public void runOnTermination(Runnable runnable) {
+ terminatedHooks.add(runnable);
+ }
+ }
}