Author: jbellis
Date: Tue Mar 15 19:08:11 2011
New Revision: 1081908
URL: http://svn.apache.org/viewvc?rev=1081908&view=rev
Log:
add memtable_flush_queue_size defaulting to 4
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-2333
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/conf/cassandra.yaml
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Mar 15 19:08:11 2011
@@ -3,6 +3,7 @@
* fix tombstone handling in repair and sstable2json (CASSANDRA-2279)
* clear Built flag in system table when dropping an index (CASSANDRA-2320)
* validate index names (CASSANDRA-1761)
+ * add memtable_flush_queue_size defaulting to 4 (CASSANDRA-2333)
* allow job configuration to set the CL used in Hadoop jobs (CASSANDRA-2331)
* queue secondary indexes for flush before the parent (CASSANDRA-2330)
Modified: cassandra/branches/cassandra-0.7/conf/cassandra.yaml
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/conf/cassandra.yaml?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-0.7/conf/cassandra.yaml Tue Mar 15 19:08:11
2011
@@ -143,6 +143,11 @@ concurrent_writes: 32
# By default this will be set to the amount of data directories defined.
#memtable_flush_writers: 1
+# the number of full memtables to allow pending flush, that is,
+# waiting for a writer thread. At a minimum, this should be set to
+# the maximum number of secondary indexes created on a single CF.
+memtable_flush_queue_size: 4
+
# Buffer size to use when performing contiguous column slices.
# Increase this to the size of the column slices you typically perform
sliced_buffer_size_in_kb: 64
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Tue Mar 15 19:08:11 2011
@@ -26,53 +26,63 @@ import java.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * This class encorporates some Executor best practices for Cassandra. Most
of the executors in the system
+ * should use or extend this. There are two main improvements over a vanilla
TPE:
+ *
+ * - If a task throws an exception, the default uncaught exception handler
will be invoked; if there is
+ * no such handler, the exception will be logged.
+ * - MaximumPoolSize is not supported. Here is what that means (quoting TPE
javadoc):
+ *
+ * If fewer than corePoolSize threads are running, the Executor always
prefers adding a new thread rather than queuing.
+ * If corePoolSize or more threads are running, the Executor always
prefers queuing a request rather than adding a new thread.
+ * If a request cannot be queued, a new thread is created unless this
would exceed maximumPoolSize, in which case, the task will be rejected.
+ *
+ * We don't want this last stage of creating new threads if the queue is
full; it makes it needlessly difficult to
+ * reason about the system's behavior. In other words, if DebuggableTPE has
allocated our maximum number of (core)
+ * threads and the queue is full, we want the enqueuer to block. But to
allow the number of threads to drop if a
+ * stage is less busy, core thread timeout is enabled.
+ */
public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
{
protected static Logger logger =
LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
public DebuggableThreadPoolExecutor(String threadPoolName, int priority)
{
- this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName,
priority));
+ this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName,
priority));
}
- public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
+ public DebuggableThreadPoolExecutor(int corePoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
{
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
+ super(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue,
threadFactory);
+ allowCoreThreadTimeOut(true);
- if (maximumPoolSize > 1)
+ // preserve task serialization. this is more complicated than it
needs to be,
+ // since TPE rejects if queue.offer reports a full queue. we'll just
+ // override this with a handler that retries until it gets in. ugly,
but effective.
+ // (there is an extensive analysis of the options here at
+ //
http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
+ this.setRejectedExecutionHandler(new RejectedExecutionHandler()
{
- // clearly strict serialization is not a requirement. just make
the calling thread execute.
- this.setRejectedExecutionHandler(new
ThreadPoolExecutor.CallerRunsPolicy());
- }
- else
- {
- // preserve task serialization. this is more complicated than it
needs to be,
- // since TPE rejects if queue.offer reports a full queue. we'll
just
- // override this with a handler that retries until it gets in.
ugly, but effective.
- // (there is an extensive analysis of the options here at
- //
http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
- this.setRejectedExecutionHandler(new RejectedExecutionHandler()
+ public void rejectedExecution(Runnable task, ThreadPoolExecutor
executor)
{
- public void rejectedExecution(Runnable task,
ThreadPoolExecutor executor)
+ BlockingQueue<Runnable> queue = executor.getQueue();
+ while (true)
{
- BlockingQueue<Runnable> queue = executor.getQueue();
- while (true)
+ if (executor.isShutdown())
+ throw new
RejectedExecutionException("ThreadPoolExecutor has shut down");
+ try
+ {
+ if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
+ break;
+ }
+ catch (InterruptedException e)
{
- if (executor.isShutdown())
- throw new
RejectedExecutionException("ThreadPoolExecutor has shut down");
- try
- {
- if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
- break;
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
+ throw new AssertionError(e);
}
}
- });
- }
+ }
+ });
}
public void afterExecute(Runnable r, Throwable t)
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
Tue Mar 15 19:08:11 2011
@@ -25,14 +25,13 @@ public class JMXConfigurableThreadPoolEx
{
public JMXConfigurableThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
+ long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable>
workQueue,
NamedThreadFactory threadFactory,
String jmxPath)
{
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, jmxPath);
+ super(corePoolSize, keepAliveTime, unit, workQueue, threadFactory,
jmxPath);
}
}
\ No newline at end of file
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java
Tue Mar 15 19:08:11 2011
@@ -20,9 +20,7 @@ package org.apache.cassandra.concurrent;
public interface JMXConfigurableThreadPoolExecutorMBean extends
JMXEnabledThreadPoolExecutorMBean
{
-
void setCorePoolSize(int n);
int getCorePoolSize();
-
}
\ No newline at end of file
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
Tue Mar 15 19:08:11 2011
@@ -38,28 +38,27 @@ public class JMXEnabledThreadPoolExecuto
public JMXEnabledThreadPoolExecutor(String threadPoolName)
{
- this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName),
"internal");
+ this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName),
"internal");
}
public JMXEnabledThreadPoolExecutor(String threadPoolName, String jmxPath)
{
- this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName),
jmxPath);
+ this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName),
jmxPath);
}
public JMXEnabledThreadPoolExecutor(String threadPoolName, int priority)
{
- this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName,
priority), "internal");
+ this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName,
priority), "internal");
}
public JMXEnabledThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
NamedThreadFactory threadFactory,
String jmxPath)
{
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
+ super(corePoolSize, keepAliveTime, unit, workQueue, threadFactory);
super.prestartAllCoreThreads();
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
Tue Mar 15 19:08:11 2011
@@ -42,25 +42,20 @@ public class StageManager
{
stages.put(Stage.MUTATION,
multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ,
getConcurrentReaders()));
- stages.put(Stage.REQUEST_RESPONSE,
multiThreadedStage(Stage.REQUEST_RESPONSE, Math.max(2,
Runtime.getRuntime().availableProcessors())));
- stages.put(Stage.INTERNAL_RESPONSE,
multiThreadedStage(Stage.INTERNAL_RESPONSE, Math.max(2,
Runtime.getRuntime().availableProcessors())));
+ stages.put(Stage.REQUEST_RESPONSE,
multiThreadedStage(Stage.REQUEST_RESPONSE,
Runtime.getRuntime().availableProcessors()));
+ stages.put(Stage.INTERNAL_RESPONSE,
multiThreadedStage(Stage.INTERNAL_RESPONSE,
Runtime.getRuntime().availableProcessors()));
// the rest are all single-threaded
stages.put(Stage.STREAM, new
JMXEnabledThreadPoolExecutor(Stage.STREAM));
stages.put(Stage.GOSSIP, new
JMXEnabledThreadPoolExecutor(Stage.GOSSIP));
stages.put(Stage.ANTI_ENTROPY, new
JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
stages.put(Stage.MIGRATION, new
JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
- stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR,
Math.max(2, Runtime.getRuntime().availableProcessors())));
+ stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR,
Runtime.getRuntime().availableProcessors()));
}
private static ThreadPoolExecutor multiThreadedStage(Stage stage, int
numThreads)
{
- // avoid running afoul of requirement in DebuggableThreadPoolExecutor
that single-threaded executors
- // must have unbounded queues
- assert numThreads > 1 : "multi-threaded stages must have at least 2
threads";
-
return new JMXEnabledThreadPoolExecutor(numThreads,
- numThreads,
KEEPALIVE,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<Runnable>(),
@@ -70,10 +65,7 @@ public class StageManager
private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage
stage, int numThreads)
{
- assert numThreads > 1 : "multi-threaded stages must have at least 2
threads";
-
return new JMXConfigurableThreadPoolExecutor(numThreads,
- numThreads,
KEEPALIVE,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<Runnable>(),
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java
Tue Mar 15 19:08:11 2011
@@ -112,6 +112,7 @@ public class Config
public boolean compaction_preheat_key_cache = true;
public boolean incremental_backups = false;
+ public int memtable_flush_queue_size = 4;
public static enum CommitLogSync {
periodic,
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Tue Mar 15 19:08:11 2011
@@ -1218,4 +1218,9 @@ public class DatabaseDescriptor
{
return conf.incremental_backups;
}
+
+ public static int getFlushQueueSize()
+ {
+ return conf.memtable_flush_queue_size;
+ }
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Tue Mar 15 19:08:11 2011
@@ -78,19 +78,17 @@ public class ColumnFamilyStore implement
* called, all data up to the given context has been persisted to SSTables.
*/
private static final ExecutorService flushSorter
- = new JMXEnabledThreadPoolExecutor(1,
-
Runtime.getRuntime().availableProcessors(),
+ = new
JMXEnabledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors()),
new
NamedThreadFactory("FlushSorter"),
"internal");
private static final ExecutorService flushWriter
- = new JMXEnabledThreadPoolExecutor(1,
-
DatabaseDescriptor.getFlushWriters(),
+ = new
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
- new
SynchronousQueue<Runnable>(),
+ new
LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushQueueSize()),
new
NamedThreadFactory("FlushWriter"),
"internal");
public static final ExecutorService postFlushExecutor = new
JMXEnabledThreadPoolExecutor("MemtablePostFlusher");
Modified:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
(original)
+++
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
Tue Mar 15 19:08:11 2011
@@ -35,7 +35,6 @@ public class DebuggableThreadPoolExecuto
{
LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(1);
DebuggableThreadPoolExecutor executor = new
DebuggableThreadPoolExecutor(1,
-
1,
Integer.MAX_VALUE,
TimeUnit.MILLISECONDS,
q,