move flushWriter into Memtable now that there's only one such service (used to have different executors for memtables/binarymemtables)
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fc0f2472 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fc0f2472 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fc0f2472 Branch: refs/heads/trunk Commit: fc0f2472d874db1cdbf8f49da61fa70b08da17f8 Parents: 1fac06a Author: Jonathan Ellis <[email protected]> Authored: Mon Jan 14 16:29:09 2013 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Mon Jan 14 16:29:09 2013 -0600 ---------------------------------------------------------------------- .../org/apache/cassandra/db/ColumnFamilyStore.java | 22 +----------- src/java/org/apache/cassandra/db/Memtable.java | 27 +++++++++++++- 2 files changed, 26 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc0f2472/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index b01545a..b832b98 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -80,26 +80,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); - /* - * switchMemtable puts Memtable.getSortedContents on the writer executor. When the write is complete, - * we turn the writer into an SSTableReader and add it to ssTables where it is available for reads. - * - * There are two other things that switchMemtable does. - * First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete - * and it's been added as an SSTableReader to ssTables_. Second, it adds an entry to commitLogUpdater - * that waits for the flush to complete, then calls onMemtableFlush. This allows multiple flushes - * to happen simultaneously on multicore systems, while still calling onMF in the correct order, - * which is necessary for replay in case of a restart since CommitLog assumes that when onMF is - * called, all data up to the given context has been persisted to SSTables. - */ - private static final ExecutorService flushWriter - = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), - StageManager.KEEPALIVE, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushQueueSize()), - new NamedThreadFactory("FlushWriter"), - "internal"); - public static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor("MemtablePostFlusher"); static @@ -658,7 +638,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean else { logger.info("Enqueuing flush of {}", memtable); - memtable.flushAndSignal(latch, flushWriter, ctx); + memtable.flushAndSignal(latch, ctx); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc0f2472/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 56e2bf5..990ad84 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -26,6 +26,9 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Function; import com.google.common.base.Throwables; +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.io.util.DiskAwareRunnable; import org.cliffc.high_scale_lib.NonBlockingHashSet; @@ -51,6 +54,26 @@ public class Memtable { private static final Logger logger = LoggerFactory.getLogger(Memtable.class); + /* + * switchMemtable puts Memtable.getSortedContents on the writer executor. When the write is complete, + * we turn the writer into an SSTableReader and add it to ssTables where it is available for reads. + * + * There are two other things that switchMemtable does. + * First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete + * and it's been added as an SSTableReader to ssTables_. Second, it adds an entry to commitLogUpdater + * that waits for the flush to complete, then calls onMemtableFlush. This allows multiple flushes + * to happen simultaneously on multicore systems, while still calling onMF in the correct order, + * which is necessary for replay in case of a restart since CommitLog assumes that when onMF is + * called, all data up to the given context has been persisted to SSTables. + */ + private static final ExecutorService flushWriter + = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), + StageManager.KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushQueueSize()), + new NamedThreadFactory("FlushWriter"), + "internal"); + // size in memory can never be less than serialized size private static final double MIN_SANE_LIVE_RATIO = 1.0; // max liveratio seen w/ 1-byte columns on a 64-bit jvm was 19. If it gets higher than 64 something is probably broken. @@ -252,9 +275,9 @@ public class Memtable return builder.toString(); } - public void flushAndSignal(final CountDownLatch latch, ExecutorService writer, final Future<ReplayPosition> context) + public void flushAndSignal(final CountDownLatch latch, final Future<ReplayPosition> context) { - writer.execute(new FlushRunnable(latch, context)); + flushWriter.execute(new FlushRunnable(latch, context)); } public String toString()
