Author: jbellis
Date: Thu Apr 16 20:42:00 2009
New Revision: 765754
URL: http://svn.apache.org/viewvc?rev=765754&view=rev
Log:
make forceFlush block until the flush action is queued on MemtableManager. That
way calling forceFlush; waitForFlush will be guaranteed that the action waitFF
puts
on MtM will run after the flush completes, i.e., the wait will actually do what
it's supposed to.
patch by jbellis; reviewed by Eric Evans for #59
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=765754&r1=765753&r2=765754&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Thu Apr
16 20:42:00 2009
@@ -30,6 +30,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -210,15 +211,8 @@
if (!isFrozen_)
{
isFrozen_ = true;
- Runnable flushQueuer = new Runnable()
- {
- public void run()
- {
-
MemtableManager.instance().submit(cfStore.getColumnFamilyName(), Memtable.this,
cLogCtx);
- }
- };
cfStore.switchMemtable(key, columnFamily, cLogCtx);
- executor_.runOnTermination(flushQueuer);
+ executor_.flushWhenTerminated(cLogCtx);
executor_.shutdown();
}
else
@@ -242,7 +236,8 @@
/*
* This version is used to switch memtable and force flush.
- * Flushing is still done in a separate executor -- forceFlush does not
block.
+ * Flushing is still done in a separate executor -- forceFlush only blocks
+ * until the flush runnable is queued.
*/
public void forceflush(ColumnFamilyStore cfStore) throws IOException
{
@@ -257,10 +252,11 @@
rm.add(cfStore.getColumnFamilyName() + ":Column",
"0".getBytes(), 0);
}
rm.apply();
+ executor_.flushQueuer.get();
}
- catch(ColumnFamilyNotDefinedException ex)
+ catch (Exception ex)
{
- logger_.debug(LogUtil.throwableToString(ex));
+ throw new RuntimeException(ex);
}
}
@@ -413,9 +409,9 @@
columnFamilies_.clear();
}
- private static class MemtableThreadPoolExecutor extends
DebuggableThreadPoolExecutor
+ private class MemtableThreadPoolExecutor extends
DebuggableThreadPoolExecutor
{
- private ArrayList<Runnable> terminatedHooks = new
ArrayList<Runnable>();
+ FutureTask flushQueuer;
public MemtableThreadPoolExecutor()
{
@@ -426,13 +422,22 @@
{
super.terminated();
runningExecutorServices_.remove(this);
- for (Runnable hook : terminatedHooks) {
- hook.run();
+ if (flushQueuer != null)
+ {
+ flushQueuer.run();
}
}
- public void runOnTermination(Runnable runnable) {
- terminatedHooks.add(runnable);
+ public void flushWhenTerminated(final CommitLog.CommitLogContext
cLogCtx)
+ {
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ MemtableManager.instance().submit(cfName_, Memtable.this,
cLogCtx);
+ }
+ };
+ flushQueuer = new FutureTask(runnable, null);
}
}
}
Modified:
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=765754&r1=765753&r2=765754&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
(original)
+++
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Thu Apr 16 20:42:00 2009
@@ -164,7 +164,6 @@
}
});
f.get();
- Thread.sleep(1000);
}
private void validateNameSort(Table table)