Author: jbellis
Date: Tue Mar 15 19:08:11 2011
New Revision: 1081908

URL: http://svn.apache.org/viewvc?rev=1081908&view=rev
Log:
add memtable_flush_queue_size defaulting to 4
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-2333

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/conf/cassandra.yaml
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Mar 15 19:08:11 2011
@@ -3,6 +3,7 @@
  * fix tombstone handling in repair and sstable2json (CASSANDRA-2279)
  * clear Built flag in system table when dropping an index (CASSANDRA-2320)
  * validate index names (CASSANDRA-1761)
+ * add memtable_flush_queue_size defaulting to 4 (CASSANDRA-2333)
  * allow job configuration to set the CL used in Hadoop jobs (CASSANDRA-2331)
  * queue secondary indexes for flush before the parent (CASSANDRA-2330)
 

Modified: cassandra/branches/cassandra-0.7/conf/cassandra.yaml
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/conf/cassandra.yaml?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-0.7/conf/cassandra.yaml Tue Mar 15 19:08:11 
2011
@@ -143,6 +143,11 @@ concurrent_writes: 32
 # By default this will be set to the amount of data directories defined.
 #memtable_flush_writers: 1
 
+# the number of full memtables to allow pending flush, that is,
+# waiting for a writer thread.  At a minimum, this should be set to
+# the maximum number of secondary indexes created on a single CF.
+memtable_flush_queue_size: 4
+
 # Buffer size to use when performing contiguous column slices. 
 # Increase this to the size of the column slices you typically perform
 sliced_buffer_size_in_kb: 64

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 Tue Mar 15 19:08:11 2011
@@ -26,53 +26,63 @@ import java.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * This class encorporates some Executor best practices for Cassandra.  Most 
of the executors in the system
+ * should use or extend this.  There are two main improvements over a vanilla 
TPE:
+ *
+ * - If a task throws an exception, the default uncaught exception handler 
will be invoked; if there is
+ *   no such handler, the exception will be logged.
+ * - MaximumPoolSize is not supported.  Here is what that means (quoting TPE 
javadoc):
+ *
+ *     If fewer than corePoolSize threads are running, the Executor always 
prefers adding a new thread rather than queuing.
+ *     If corePoolSize or more threads are running, the Executor always 
prefers queuing a request rather than adding a new thread.
+ *     If a request cannot be queued, a new thread is created unless this 
would exceed maximumPoolSize, in which case, the task will be rejected.
+ *
+ *   We don't want this last stage of creating new threads if the queue is 
full; it makes it needlessly difficult to
+ *   reason about the system's behavior.  In other words, if DebuggableTPE has 
allocated our maximum number of (core)
+ *   threads and the queue is full, we want the enqueuer to block.  But to 
allow the number of threads to drop if a
+ *   stage is less busy, core thread timeout is enabled.
+ */
 public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
 {
     protected static Logger logger = 
LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
 
     public DebuggableThreadPoolExecutor(String threadPoolName, int priority)
     {
-        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName, 
priority));
+        this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName, 
priority));
     }
 
-    public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, 
ThreadFactory threadFactory)
+    public DebuggableThreadPoolExecutor(int corePoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
     {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
+        super(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
+        allowCoreThreadTimeOut(true);
 
-        if (maximumPoolSize > 1)
+        // preserve task serialization.  this is more complicated than it 
needs to be,
+        // since TPE rejects if queue.offer reports a full queue.  we'll just
+        // override this with a handler that retries until it gets in.  ugly, 
but effective.
+        // (there is an extensive analysis of the options here at
+        //  
http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
+        this.setRejectedExecutionHandler(new RejectedExecutionHandler()
         {
-            // clearly strict serialization is not a requirement.  just make 
the calling thread execute.
-            this.setRejectedExecutionHandler(new 
ThreadPoolExecutor.CallerRunsPolicy());
-        }
-        else
-        {
-            // preserve task serialization.  this is more complicated than it 
needs to be,
-            // since TPE rejects if queue.offer reports a full queue.  we'll 
just
-            // override this with a handler that retries until it gets in.  
ugly, but effective.
-            // (there is an extensive analysis of the options here at
-            //  
http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
-            this.setRejectedExecutionHandler(new RejectedExecutionHandler()
+            public void rejectedExecution(Runnable task, ThreadPoolExecutor 
executor)
             {
-                public void rejectedExecution(Runnable task, 
ThreadPoolExecutor executor)
+                BlockingQueue<Runnable> queue = executor.getQueue();
+                while (true)
                 {
-                    BlockingQueue<Runnable> queue = executor.getQueue();
-                    while (true)
+                    if (executor.isShutdown())
+                        throw new 
RejectedExecutionException("ThreadPoolExecutor has shut down");
+                    try
+                    {
+                        if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
+                            break;
+                    }
+                    catch (InterruptedException e)
                     {
-                        if (executor.isShutdown())
-                            throw new 
RejectedExecutionException("ThreadPoolExecutor has shut down");
-                        try
-                        {
-                            if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
-                                break;
-                        }
-                        catch (InterruptedException e)
-                        {
-                            throw new AssertionError(e);    
-                        }
+                        throw new AssertionError(e);
                     }
                 }
-            });
-        }
+            }
+        });
     }
 
     public void afterExecute(Runnable r, Throwable t)

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
 Tue Mar 15 19:08:11 2011
@@ -25,14 +25,13 @@ public class JMXConfigurableThreadPoolEx
 {
 
     public JMXConfigurableThreadPoolExecutor(int corePoolSize,
-                                             int maximumPoolSize, 
-                                                long keepAliveTime, 
+                                                long keepAliveTime,
                                                 TimeUnit unit,
                                              BlockingQueue<Runnable> 
workQueue, 
                                              NamedThreadFactory threadFactory,
                                              String jmxPath)
     {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory, jmxPath);
+        super(corePoolSize, keepAliveTime, unit, workQueue, threadFactory, 
jmxPath);
     }
     
 }
\ No newline at end of file

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java
 Tue Mar 15 19:08:11 2011
@@ -20,9 +20,7 @@ package org.apache.cassandra.concurrent;
 
 public interface JMXConfigurableThreadPoolExecutorMBean extends 
JMXEnabledThreadPoolExecutorMBean
 {
-
     void setCorePoolSize(int n);
 
     int getCorePoolSize();
-    
 }
\ No newline at end of file

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
 Tue Mar 15 19:08:11 2011
@@ -38,28 +38,27 @@ public class JMXEnabledThreadPoolExecuto
 
     public JMXEnabledThreadPoolExecutor(String threadPoolName)
     {
-        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName), 
"internal");
+        this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName), 
"internal");
     }
 
     public JMXEnabledThreadPoolExecutor(String threadPoolName, String jmxPath)
     {
-        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName), 
jmxPath);
+        this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName), 
jmxPath);
     }
 
     public JMXEnabledThreadPoolExecutor(String threadPoolName, int priority)
     {
-        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName, 
priority), "internal");
+        this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName, 
priority), "internal");
     }
 
     public JMXEnabledThreadPoolExecutor(int corePoolSize,
-                                        int maximumPoolSize,
                                         long keepAliveTime,
                                         TimeUnit unit,
                                         BlockingQueue<Runnable> workQueue,
                                         NamedThreadFactory threadFactory,
                                         String jmxPath)
     {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
+        super(corePoolSize, keepAliveTime, unit, workQueue, threadFactory);
         super.prestartAllCoreThreads();
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
 Tue Mar 15 19:08:11 2011
@@ -42,25 +42,20 @@ public class StageManager
     {
         stages.put(Stage.MUTATION, 
multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
         stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, 
getConcurrentReaders()));        
-        stages.put(Stage.REQUEST_RESPONSE, 
multiThreadedStage(Stage.REQUEST_RESPONSE, Math.max(2, 
Runtime.getRuntime().availableProcessors())));
-        stages.put(Stage.INTERNAL_RESPONSE, 
multiThreadedStage(Stage.INTERNAL_RESPONSE, Math.max(2, 
Runtime.getRuntime().availableProcessors())));
+        stages.put(Stage.REQUEST_RESPONSE, 
multiThreadedStage(Stage.REQUEST_RESPONSE, 
Runtime.getRuntime().availableProcessors()));
+        stages.put(Stage.INTERNAL_RESPONSE, 
multiThreadedStage(Stage.INTERNAL_RESPONSE, 
Runtime.getRuntime().availableProcessors()));
         // the rest are all single-threaded
         stages.put(Stage.STREAM, new 
JMXEnabledThreadPoolExecutor(Stage.STREAM));
         stages.put(Stage.GOSSIP, new 
JMXEnabledThreadPoolExecutor(Stage.GOSSIP));
         stages.put(Stage.ANTI_ENTROPY, new 
JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
         stages.put(Stage.MIGRATION, new 
JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
         stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
-        stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, 
Math.max(2, Runtime.getRuntime().availableProcessors())));
+        stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, 
Runtime.getRuntime().availableProcessors()));
     }
 
     private static ThreadPoolExecutor multiThreadedStage(Stage stage, int 
numThreads)
     {
-        // avoid running afoul of requirement in DebuggableThreadPoolExecutor 
that single-threaded executors
-        // must have unbounded queues
-        assert numThreads > 1 : "multi-threaded stages must have at least 2 
threads";
-
         return new JMXEnabledThreadPoolExecutor(numThreads,
-                                                numThreads,
                                                 KEEPALIVE,
                                                 TimeUnit.SECONDS,
                                                 new 
LinkedBlockingQueue<Runnable>(),
@@ -70,10 +65,7 @@ public class StageManager
     
     private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage 
stage, int numThreads)
     {
-        assert numThreads > 1 : "multi-threaded stages must have at least 2 
threads";
-        
         return new JMXConfigurableThreadPoolExecutor(numThreads,
-                                                     numThreads,
                                                      KEEPALIVE,
                                                      TimeUnit.SECONDS,
                                                      new 
LinkedBlockingQueue<Runnable>(),

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/Config.java
 Tue Mar 15 19:08:11 2011
@@ -112,6 +112,7 @@ public class Config
     public boolean compaction_preheat_key_cache = true;
 
     public boolean incremental_backups = false;
+    public int memtable_flush_queue_size = 4;
 
     public static enum CommitLogSync {
         periodic,

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
 Tue Mar 15 19:08:11 2011
@@ -1218,4 +1218,9 @@ public class    DatabaseDescriptor
     {
         return conf.incremental_backups;
     }
+
+    public static int getFlushQueueSize()
+    {
+        return conf.memtable_flush_queue_size;
+    }
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 Tue Mar 15 19:08:11 2011
@@ -78,19 +78,17 @@ public class ColumnFamilyStore implement
      * called, all data up to the given context has been persisted to SSTables.
      */
     private static final ExecutorService flushSorter
-            = new JMXEnabledThreadPoolExecutor(1,
-                                               
Runtime.getRuntime().availableProcessors(),
+            = new 
JMXEnabledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                                                StageManager.KEEPALIVE,
                                                TimeUnit.SECONDS,
                                                new 
LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors()),
                                                new 
NamedThreadFactory("FlushSorter"),
                                                "internal");
     private static final ExecutorService flushWriter
-            = new JMXEnabledThreadPoolExecutor(1,
-                                               
DatabaseDescriptor.getFlushWriters(),
+            = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
                                                StageManager.KEEPALIVE,
                                                TimeUnit.SECONDS,
-                                               new 
SynchronousQueue<Runnable>(),
+                                               new 
LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushQueueSize()),
                                                new 
NamedThreadFactory("FlushWriter"),
                                                "internal");
     public static final ExecutorService postFlushExecutor = new 
JMXEnabledThreadPoolExecutor("MemtablePostFlusher");

Modified: 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java?rev=1081908&r1=1081907&r2=1081908&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
 Tue Mar 15 19:08:11 2011
@@ -35,7 +35,6 @@ public class DebuggableThreadPoolExecuto
     {
         LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(1);
         DebuggableThreadPoolExecutor executor = new 
DebuggableThreadPoolExecutor(1,
-                                                                               
  1,
                                                                                
  Integer.MAX_VALUE,
                                                                                
  TimeUnit.MILLISECONDS,
                                                                                
  q,


Reply via email to