Author: jbellis
Date: Fri Mar 19 21:32:22 2010
New Revision: 925440
URL: http://svn.apache.org/viewvc?rev=925440&view=rev
Log:
bound read, mutation, and response stages to fix possible OOM during log
replay. patch by jbellis; tested by Brandon Williams for CASSANDRA-885
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=925440&r1=925439&r2=925440&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Fri Mar 19 21:32:22 2010
@@ -3,6 +3,10 @@
* Bootstrapping can skip ranges under the right conditions (CASSANDRA-902)
* fix merging row versions in range_slice for CL > ONE (CASSANDRA-884)
* default write ConsistencyLeven chaned from ZERO to ONE
+ * use lexical comparison if time part of TimeUUIDs are the same
+ (CASSANDRA-907)
+ * bound read, mutation, and response stages to fix possible OOM
+ during log replay (CASSANDRA-885)
0.6.0-beta3
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=925440&r1=925439&r2=925440&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java
Fri Mar 19 21:32:22 2010
@@ -21,15 +21,14 @@ package org.apache.cassandra.concurrent;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.config.DatabaseDescriptor;
-import static
org.apache.cassandra.config.DatabaseDescriptor.getConcurrentWriters;
import static
org.apache.cassandra.config.DatabaseDescriptor.getConcurrentReaders;
+import static
org.apache.cassandra.config.DatabaseDescriptor.getConcurrentWriters;
/**
@@ -53,7 +52,7 @@ public class StageManager
{
stages.put(MUTATION_STAGE, multiThreadedStage(MUTATION_STAGE,
getConcurrentWriters()));
stages.put(READ_STAGE, multiThreadedStage(READ_STAGE,
getConcurrentReaders()));
- stages.put(RESPONSE_STAGE, multiThreadedStage("RESPONSE-STAGE",
Runtime.getRuntime().availableProcessors()));
+ stages.put(RESPONSE_STAGE, multiThreadedStage("RESPONSE-STAGE",
Math.max(2, Runtime.getRuntime().availableProcessors())));
// the rest are all single-threaded
stages.put(STREAM_STAGE, new
JMXEnabledThreadPoolExecutor(STREAM_STAGE));
stages.put(GOSSIP_STAGE, new JMXEnabledThreadPoolExecutor("GMFD"));
@@ -63,11 +62,15 @@ public class StageManager
private static ThreadPoolExecutor multiThreadedStage(String name, int
numThreads)
{
+ // avoid running afoul of requirement in DebuggableThreadPoolExecutor
that single-threaded executors
+ // must have unbounded queues
+ assert numThreads > 1 : "multi-threaded stages must have at least 2
threads";
+
return new JMXEnabledThreadPoolExecutor(numThreads,
numThreads,
Integer.MAX_VALUE,
TimeUnit.SECONDS,
- new
LinkedBlockingQueue<Runnable>(),
+ new
LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getStageQueueSize()),
new NamedThreadFactory(name));
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=925440&r1=925439&r2=925440&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Fri Mar 19 21:32:22 2010
@@ -152,6 +152,8 @@ public class DatabaseDescriptor
throw new RuntimeException("Cannot locate " + STORAGE_CONF_FILE + "
via storage-config system property or classpath lookup.");
}
+ private static int stageQueueSize_ = 4096;
+
static
{
try
@@ -290,11 +292,20 @@ public class DatabaseDescriptor
{
concurrentReaders = Integer.parseInt(rawReaders);
}
+ if (concurrentReaders < 2)
+ {
+ throw new ConfigurationException("ConcurrentReads must be at
least 2");
+ }
+
String rawWriters =
xmlUtils.getNodeValue("/Storage/ConcurrentWrites");
if (rawWriters != null)
{
concurrentWriters = Integer.parseInt(rawWriters);
}
+ if (concurrentWriters < 2)
+ {
+ throw new ConfigurationException("ConcurrentWrites must be at
least 2");
+ }
String rawFlushData =
xmlUtils.getNodeValue("/Storage/FlushDataBufferSizeInMB");
if (rawFlushData != null)
@@ -1092,6 +1103,11 @@ public class DatabaseDescriptor
return getCFMetaData(tableName, cfName).subcolumnComparator;
}
+ public static int getStageQueueSize()
+ {
+ return stageQueueSize_;
+ }
+
/**
* @return The absolute number of keys that should be cached per table.
*/