Author: jbellis
Date: Tue Mar 15 20:12:03 2011
New Revision: 1081925
URL: http://svn.apache.org/viewvc?rev=1081925&view=rev
Log:
merge from 0.7
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
cassandra/trunk/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 15 20:12:03 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7:1026516-1081897
+/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914
+/cassandra/branches/cassandra-0.7:1026516-1081924
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Mar 15 20:12:03 2011
@@ -18,8 +18,10 @@
* fix tombstone handling in repair and sstable2json (CASSANDRA-2279)
* clear Built flag in system table when dropping an index (CASSANDRA-2320)
* validate index names (CASSANDRA-1761)
+ * add memtable_flush_queue_size defaulting to 4 (CASSANDRA-2333)
* allow job configuration to set the CL used in Hadoop jobs (CASSANDRA-2331)
* queue secondary indexes for flush before the parent (CASSANDRA-2330)
+ * shut down server for OOM on a Thrift thread (CASSANDRA-2269)
>>>>>>> .merge-right.r1081840
Modified: cassandra/trunk/conf/cassandra.yaml
URL:
http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Tue Mar 15 20:12:03 2011
@@ -148,6 +148,11 @@ concurrent_writes: 32
# By default this will be set to the amount of data directories defined.
#memtable_flush_writers: 1
+# the number of full memtables to allow pending flush, that is,
+# waiting for a writer thread. At a minimum, this should be set to
+# the maximum number of secondary indexes created on a single CF.
+memtable_flush_queue_size: 4
+
# Buffer size to use when performing contiguous column slices.
# Increase this to the size of the column slices you typically perform
sliced_buffer_size_in_kb: 64
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 15 20:12:03 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1081897
+/cassandra/branches/cassandra-0.7/contrib:1026516-1081924
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 15 20:12:03 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1081897
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1081924
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 15 20:12:03 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1081897
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1081924
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 15 20:12:03 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1081897
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1081924
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 15 20:12:03 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1081897
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1081924
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 15 20:12:03 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1081897
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1081924
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Tue Mar 15 20:12:03 2011
@@ -26,59 +26,74 @@ import java.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * This class encorporates some Executor best practices for Cassandra. Most
of the executors in the system
+ * should use or extend this. There are two main improvements over a vanilla
TPE:
+ *
+ * - If a task throws an exception, the default uncaught exception handler
will be invoked; if there is
+ * no such handler, the exception will be logged.
+ * - MaximumPoolSize is not supported. Here is what that means (quoting TPE
javadoc):
+ *
+ * If fewer than corePoolSize threads are running, the Executor always
prefers adding a new thread rather than queuing.
+ * If corePoolSize or more threads are running, the Executor always
prefers queuing a request rather than adding a new thread.
+ * If a request cannot be queued, a new thread is created unless this
would exceed maximumPoolSize, in which case, the task will be rejected.
+ *
+ * We don't want this last stage of creating new threads if the queue is
full; it makes it needlessly difficult to
+ * reason about the system's behavior. In other words, if DebuggableTPE has
allocated our maximum number of (core)
+ * threads and the queue is full, we want the enqueuer to block. But to
allow the number of threads to drop if a
+ * stage is less busy, core thread timeout is enabled.
+ */
public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
{
protected static Logger logger =
LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
public DebuggableThreadPoolExecutor(String threadPoolName, int priority)
{
- this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName,
priority));
+ this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName,
priority));
}
- public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
+ public DebuggableThreadPoolExecutor(int corePoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
{
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
+ super(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue,
threadFactory);
+ allowCoreThreadTimeOut(true);
- if (maximumPoolSize > 1)
+ // preserve task serialization. this is more complicated than it
needs to be,
+ // since TPE rejects if queue.offer reports a full queue. we'll just
+ // override this with a handler that retries until it gets in. ugly,
but effective.
+ // (there is an extensive analysis of the options here at
+ //
http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
+ this.setRejectedExecutionHandler(new RejectedExecutionHandler()
{
- // clearly strict serialization is not a requirement. just make
the calling thread execute.
- this.setRejectedExecutionHandler(new
ThreadPoolExecutor.CallerRunsPolicy());
- }
- else
- {
- // preserve task serialization. this is more complicated than it
needs to be,
- // since TPE rejects if queue.offer reports a full queue. we'll
just
- // override this with a handler that retries until it gets in.
ugly, but effective.
- // (there is an extensive analysis of the options here at
- //
http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
- this.setRejectedExecutionHandler(new RejectedExecutionHandler()
+ public void rejectedExecution(Runnable task, ThreadPoolExecutor
executor)
{
- public void rejectedExecution(Runnable task,
ThreadPoolExecutor executor)
+ BlockingQueue<Runnable> queue = executor.getQueue();
+ while (true)
{
- BlockingQueue<Runnable> queue = executor.getQueue();
- while (true)
+ if (executor.isShutdown())
+ throw new
RejectedExecutionException("ThreadPoolExecutor has shut down");
+ try
+ {
+ if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
+ break;
+ }
+ catch (InterruptedException e)
{
- if (executor.isShutdown())
- throw new
RejectedExecutionException("ThreadPoolExecutor has shut down");
- try
- {
- if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
- break;
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
+ throw new AssertionError(e);
}
}
- });
- }
+ }
+ });
}
+ @Override
public void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r,t);
+ logExceptionsAfterExecute(r, t);
+ }
+ public static void logExceptionsAfterExecute(Runnable r, Throwable t)
+ {
// exceptions wrapped by FutureTask
if (r instanceof FutureTask<?>)
{
@@ -92,7 +107,9 @@ public class DebuggableThreadPoolExecuto
}
catch (ExecutionException e)
{
- if (Thread.getDefaultUncaughtExceptionHandler() != null)
+ if (Thread.getDefaultUncaughtExceptionHandler() == null)
+ logger.error("Error in ThreadPoolExecutor", e.getCause());
+ else
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
e.getCause());
}
}
@@ -100,7 +117,10 @@ public class DebuggableThreadPoolExecuto
// exceptions for non-FutureTask runnables [i.e., added via execute()
instead of submit()]
if (t != null)
{
- logger.error("Error in ThreadPoolExecutor", t);
+ if (Thread.getDefaultUncaughtExceptionHandler() == null)
+ logger.error("Error in ThreadPoolExecutor", t);
+ else
+
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
t);
}
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
Tue Mar 15 20:12:03 2011
@@ -25,14 +25,13 @@ public class JMXConfigurableThreadPoolEx
{
public JMXConfigurableThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
+ long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable>
workQueue,
NamedThreadFactory threadFactory,
String jmxPath)
{
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, jmxPath);
+ super(corePoolSize, keepAliveTime, unit, workQueue, threadFactory,
jmxPath);
}
}
\ No newline at end of file
Modified:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java
Tue Mar 15 20:12:03 2011
@@ -20,9 +20,7 @@ package org.apache.cassandra.concurrent;
public interface JMXConfigurableThreadPoolExecutorMBean extends
JMXEnabledThreadPoolExecutorMBean
{
-
void setCorePoolSize(int n);
int getCorePoolSize();
-
}
\ No newline at end of file
Modified:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
Tue Mar 15 20:12:03 2011
@@ -38,28 +38,27 @@ public class JMXEnabledThreadPoolExecuto
public JMXEnabledThreadPoolExecutor(String threadPoolName)
{
- this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName),
"internal");
+ this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName),
"internal");
}
public JMXEnabledThreadPoolExecutor(String threadPoolName, String jmxPath)
{
- this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName),
jmxPath);
+ this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName),
jmxPath);
}
public JMXEnabledThreadPoolExecutor(String threadPoolName, int priority)
{
- this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName,
priority), "internal");
+ this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName,
priority), "internal");
}
public JMXEnabledThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
NamedThreadFactory threadFactory,
String jmxPath)
{
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
+ super(corePoolSize, keepAliveTime, unit, workQueue, threadFactory);
super.prestartAllCoreThreads();
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
Modified:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
Tue Mar 15 20:12:03 2011
@@ -41,8 +41,8 @@ public class StageManager
{
stages.put(Stage.MUTATION,
multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ,
getConcurrentReaders()));
- stages.put(Stage.REQUEST_RESPONSE,
multiThreadedStage(Stage.REQUEST_RESPONSE, Math.max(2,
Runtime.getRuntime().availableProcessors())));
- stages.put(Stage.INTERNAL_RESPONSE,
multiThreadedStage(Stage.INTERNAL_RESPONSE, Math.max(2,
Runtime.getRuntime().availableProcessors())));
+ stages.put(Stage.REQUEST_RESPONSE,
multiThreadedStage(Stage.REQUEST_RESPONSE,
Runtime.getRuntime().availableProcessors()));
+ stages.put(Stage.INTERNAL_RESPONSE,
multiThreadedStage(Stage.INTERNAL_RESPONSE,
Runtime.getRuntime().availableProcessors()));
stages.put(Stage.REPLICATE_ON_WRITE,
multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE,
getConcurrentReplicators()));
// the rest are all single-threaded
stages.put(Stage.STREAM, new
JMXEnabledThreadPoolExecutor(Stage.STREAM));
@@ -50,17 +50,12 @@ public class StageManager
stages.put(Stage.ANTI_ENTROPY, new
JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
stages.put(Stage.MIGRATION, new
JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
- stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR,
Math.max(2, Runtime.getRuntime().availableProcessors())));
+ stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR,
Runtime.getRuntime().availableProcessors()));
}
private static ThreadPoolExecutor multiThreadedStage(Stage stage, int
numThreads)
{
- // avoid running afoul of requirement in DebuggableThreadPoolExecutor
that single-threaded executors
- // must have unbounded queues
- assert numThreads > 1 : "multi-threaded stages must have at least 2
threads";
-
return new JMXEnabledThreadPoolExecutor(numThreads,
- numThreads,
KEEPALIVE,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<Runnable>(),
@@ -70,10 +65,7 @@ public class StageManager
private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage
stage, int numThreads)
{
- assert numThreads > 1 : "multi-threaded stages must have at least 2
threads";
-
return new JMXConfigurableThreadPoolExecutor(numThreads,
- numThreads,
KEEPALIVE,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<Runnable>(),
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Tue Mar 15
20:12:03 2011
@@ -115,6 +115,7 @@ public class Config
public boolean compaction_preheat_key_cache = true;
public boolean incremental_backups = false;
+ public int memtable_flush_queue_size = 4;
public static enum CommitLogSync {
periodic,
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Tue Mar 15 20:12:03 2011
@@ -1248,4 +1248,9 @@ public class DatabaseDescriptor
{
return conf.incremental_backups;
}
+
+ public static int getFlushQueueSize()
+ {
+ return conf.memtable_flush_queue_size;
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue
Mar 15 20:12:03 2011
@@ -79,19 +79,17 @@ public class ColumnFamilyStore implement
* called, all data up to the given context has been persisted to SSTables.
*/
private static final ExecutorService flushSorter
- = new JMXEnabledThreadPoolExecutor(1,
-
Runtime.getRuntime().availableProcessors(),
+ = new
JMXEnabledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors()),
new
NamedThreadFactory("FlushSorter"),
"internal");
private static final ExecutorService flushWriter
- = new JMXEnabledThreadPoolExecutor(1,
-
DatabaseDescriptor.getFlushWriters(),
+ = new
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
- new
SynchronousQueue<Runnable>(),
+ new
LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushQueueSize()),
new
NamedThreadFactory("FlushWriter"),
"internal");
public static final ExecutorService postFlushExecutor = new
JMXEnabledThreadPoolExecutor("MemtablePostFlusher");
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
Tue Mar 15 20:12:03 2011
@@ -33,6 +33,7 @@ import org.apache.log4j.PropertyConfigur
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -363,6 +364,7 @@ public abstract class AbstractCassandraD
protected void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r, t);
+ DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
state.get().logout();
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
Tue Mar 15 20:12:03 2011
@@ -74,36 +74,23 @@ public class WriteResponseHandler extend
protected int determineBlockFor(String table)
{
- int blockFor = 0;
switch (consistencyLevel)
{
case ONE:
- blockFor = 1;
- break;
+ return 1;
case ANY:
- blockFor = 1;
- break;
+ return 1;
case TWO:
- blockFor = 2;
- break;
+ return 2;
case THREE:
- blockFor = 3;
- break;
+ return 3;
case QUORUM:
- blockFor = (writeEndpoints.size() / 2) + 1;
- break;
+ return (writeEndpoints.size() / 2) + 1;
case ALL:
- blockFor = writeEndpoints.size();
- break;
+ return writeEndpoints.size();
default:
throw new UnsupportedOperationException("invalid consistency
level: " + consistencyLevel.toString());
}
- // at most one node per range can bootstrap at a time, and these will
be added to the write until
- // bootstrap finishes (at which point we no longer need to write to
the old ones).
- assert 1 <= blockFor && blockFor <= 2 *
Table.open(table).getReplicationStrategy().getReplicationFactor()
- : String.format("invalid response count %d for replication factor
%d",
- blockFor,
Table.open(table).getReplicationStrategy().getReplicationFactor());
- return blockFor;
}
public void assureSufficientLiveNodes() throws UnavailableException
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
Tue Mar 15 20:12:03 2011
@@ -35,7 +35,6 @@ public class DebuggableThreadPoolExecuto
{
LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(1);
DebuggableThreadPoolExecutor executor = new
DebuggableThreadPoolExecutor(1,
-
1,
Integer.MAX_VALUE,
TimeUnit.MILLISECONDS,
q,