Author: jbellis
Date: Wed Sep 23 01:57:15 2009
New Revision: 817925
URL: http://svn.apache.org/viewvc?rev=817925&view=rev
Log:
support multiple flush threads safely. automatically use up to avaiable core
count threads for flushing. pause updates when too many unflushed memtables
are generated.
patch by jbellis; reviewed by goffinet for CASSANDRA-401
Modified:
incubator/cassandra/trunk/conf/storage-conf.xml
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java
Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=817925&r1=817924&r2=817925&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Wed Sep 23 01:57:15 2009
@@ -299,12 +299,9 @@
<GCGraceSeconds>864000</GCGraceSeconds>
<!--
- ~ Number of threads to run when flushing memtables to disk. Set this to
- ~ the number of disks you physically have in your machine allocated for
DataDirectory * 2.
- ~ If you are planning to use the Binary Memtable, its recommended to
increase the max threads
- ~ to maintain a higher quality of service while under load when normal
memtables are flushing to disk.
+ ~ The threshold size in megabytes the binary memtable must grow to,
+ ~ before it's submitted for flushing to disk.
-->
- <FlushMinThreads>1</FlushMinThreads>
- <FlushMaxThreads>1</FlushMaxThreads>
+ <BinaryMemtableSizeInMB>256</BinaryMemtableSizeInMB>
</Storage>
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=817925&r1=817924&r2=817925&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Wed Sep 23 01:57:15 2009
@@ -74,8 +74,7 @@
private static int slicedReadBufferSizeInKB_ = 64;
private static List<String> tables_ = new ArrayList<String>();
private static Set<String> applicationColumnFamilies_ = new
HashSet<String>();
- private static int flushMinThreads_ = 1;
- private static int flushMaxThreads_ = 1;
+ private static int bmtThreshold_ = 256;
// Default descriptive names for introspection. The user can override
// these choices in the config file. These are not case sensitive.
@@ -272,16 +271,10 @@
slicedReadBufferSizeInKB_ = Integer.parseInt(rawSlicedBuffer);
}
- String rawflushMinThreads =
xmlUtils.getNodeValue("/Storage/FlushMinThreads");
- if (rawflushMinThreads != null)
+ String bmtThreshold =
xmlUtils.getNodeValue("/Storage/BinaryMemtableSizeInMB");
+ if (bmtThreshold != null)
{
- flushMinThreads_ = Integer.parseInt(rawflushMinThreads);
- }
-
- String rawflushMaxThreads =
xmlUtils.getNodeValue("/Storage/FlushMaxThreads");
- if (rawflushMaxThreads != null)
- {
- flushMaxThreads_ = Integer.parseInt(rawflushMaxThreads);
+ bmtThreshold_ = Integer.parseInt(bmtThreshold);
}
/* TCP port on which the storage system listens */
@@ -995,14 +988,8 @@
return slicedReadBufferSizeInKB_;
}
- public static int getFlushMinThreads()
+ public static int getBMTThreshold()
{
- return flushMinThreads_;
+ return bmtThreshold_;
}
-
- public static int getFlushMaxThreads()
- {
- return flushMaxThreads_;
- }
-
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=817925&r1=817924&r2=817925&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Wed Sep 23 01:57:15 2009
@@ -28,7 +28,6 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.log4j.Logger;
@@ -55,8 +54,14 @@
private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
private static NonBlockingHashMap<String, Set<Memtable>>
memtablesPendingFlush = new NonBlockingHashMap<String, Set<Memtable>>();
- private static ExecutorService flusher_ = new
DebuggableThreadPoolExecutor(DatabaseDescriptor.getFlushMinThreads(),
DatabaseDescriptor.getFlushMaxThreads(), Integer.MAX_VALUE, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new
ThreadFactoryImpl("MEMTABLE-FLUSHER-POOL"));
-
+ private static DebuggableThreadPoolExecutor flusher_ = new
DebuggableThreadPoolExecutor(1,
+
Runtime.getRuntime().availableProcessors(),
+
Integer.MAX_VALUE,
+
TimeUnit.SECONDS,
+
new LinkedBlockingQueue<Runnable>(2 *
Runtime.getRuntime().availableProcessors()),
+
new ThreadFactoryImpl("MEMTABLE-FLUSHER-POOL"));
+ private static ExecutorService commitLogUpdater_ = new
DebuggableThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
+
private final String table_;
public final String columnFamily_;
private final boolean isSuper_;
@@ -317,9 +322,8 @@
columnFamily_, SSTable.TEMPFILE_MARKER,
fileIndexGenerator_.incrementAndGet());
}
- void switchMemtable(Memtable oldMemtable)
+ Future<?> switchMemtable(Memtable oldMemtable) throws IOException
{
- CommitLog.CommitLogContext ctx = null;
/**
* If we can get the writelock, that means no new updates can come in
and
* all ongoing updates to memtables have completed. We can get the
tail
@@ -330,34 +334,43 @@
Table.flusherLock_.writeLock().lock();
try
{
- try
- {
- ctx = CommitLog.open().getContext();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ final CommitLog.CommitLogContext ctx =
CommitLog.open().getContext();
if (oldMemtable.isFrozen())
{
- return;
+ return null;
}
logger_.info(columnFamily_ + " has reached its threshold;
switching in a fresh Memtable");
oldMemtable.freeze();
getMemtablesPendingFlushNotNull(columnFamily_).add(oldMemtable);
// it's ok for the MT to briefly be both active and pendingFlush
- submitFlush(oldMemtable, ctx);
+ final Future<?> future = submitFlush(oldMemtable, ctx);
memtable_ = new Memtable(table_, columnFamily_);
-
- if (memtableSwitchCount == Integer.MAX_VALUE)
+ // a second executor that makes sure the onMemtableFlushes get
called in the right order,
+ // while keeping the wait-for-flush (future.get) out of anything
latency-sensitive.
+ return commitLogUpdater_.submit(new Runnable()
{
- memtableSwitchCount = 0;
- }
- memtableSwitchCount++;
+ public void run()
+ {
+ try
+ {
+ future.get();
+ onMemtableFlush(ctx);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ });
}
finally
{
Table.flusherLock_.writeLock().unlock();
+ if (memtableSwitchCount == Integer.MAX_VALUE)
+ {
+ memtableSwitchCount = 0;
+ }
+ memtableSwitchCount++;
}
}
@@ -367,27 +380,20 @@
binaryMemtable_.get().put(key, buffer);
}
- public void forceFlush()
+ public Future<?> forceFlush() throws IOException
{
if (memtable_.isClean())
- return;
+ return null;
- switchMemtable(memtable_);
+ return switchMemtable(memtable_);
}
void forceBlockingFlush() throws IOException, ExecutionException,
InterruptedException
{
Memtable oldMemtable = getMemtableThreadSafe();
- forceFlush();
- // block for flush to finish by adding a no-op action to the flush
executorservice
- // and waiting for that to finish. (this works since flush ES is
single-threaded.)
- Future f = flusher_.submit(new Runnable()
- {
- public void run()
- {
- }
- });
- f.get();
+ Future<?> future = forceFlush();
+ if (future != null)
+ future.get();
/* this assert is not threadsafe -- the memtable could have been clean
when forceFlush
checked it, but dirty now thanks to another thread. But as long as
we are only
calling this from single-threaded test code it is useful to have as
a sanity check. */
@@ -882,10 +888,10 @@
}
/* Submit memtables to be flushed to disk */
- private static void submitFlush(final Memtable memtable, final
CommitLog.CommitLogContext cLogCtx)
+ private static Future<?> submitFlush(final Memtable memtable, final
CommitLog.CommitLogContext cLogCtx)
{
logger_.info("Enqueuing flush of " + memtable);
- flusher_.submit(new Runnable()
+ return flusher_.submit(new Runnable()
{
public void run()
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=817925&r1=817924&r2=817925&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
Wed Sep 23 01:57:15 2009
@@ -18,6 +18,9 @@
package org.apache.cassandra.db;
+import java.util.concurrent.Future;
+import java.io.IOException;
+
/**
* The MBean interface for ColumnFamilyStore
*/
@@ -54,7 +57,7 @@
/**
* Triggers an immediate memtable flush.
*/
- public void forceFlush();
+ public Future<?> forceFlush() throws IOException;
/**
* @return the number of read operations on this column family in the last
minute
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=817925&r1=817924&r2=817925&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
Wed Sep 23 01:57:15 2009
@@ -219,7 +219,6 @@
}
}
SSTableReader ssTable = writer.closeAndOpenReader();
- cfStore.onMemtableFlush(cLogCtx);
cfStore.addSSTable(ssTable);
buffer.close();
isFlushed_ = true;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java?rev=817925&r1=817924&r2=817925&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java
Wed Sep 23 01:57:15 2009
@@ -22,6 +22,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.io.IOException;
import org.apache.log4j.Logger;
@@ -62,7 +63,14 @@
{
public void run()
{
- columnFamilyStore.forceFlush();
+ try
+ {
+ columnFamilyStore.forceFlush();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
};
flusher_.scheduleWithFixedDelay(runnable, flushPeriodInMinutes,
flushPeriodInMinutes, TimeUnit.MINUTES);