Author: jbellis
Date: Fri Mar 19 21:32:22 2010
New Revision: 925440

URL: http://svn.apache.org/viewvc?rev=925440&view=rev
Log:
bound read, mutation, and response stages to fix possible OOM during log 
replay.  patch by jbellis; tested by Brandon Williams for CASSANDRA-885

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=925440&r1=925439&r2=925440&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Fri Mar 19 21:32:22 2010
@@ -3,6 +3,10 @@
  * Bootstrapping can skip ranges under the right conditions (CASSANDRA-902)
  * fix merging row versions in range_slice for CL > ONE (CASSANDRA-884)
  * default write ConsistencyLeven chaned from ZERO to ONE
+ * use lexical comparison if time part of TimeUUIDs are the same 
+   (CASSANDRA-907)
+ * bound read, mutation, and response stages to fix possible OOM
+   during log replay (CASSANDRA-885)
 
 
 0.6.0-beta3

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=925440&r1=925439&r2=925440&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java
 Fri Mar 19 21:32:22 2010
@@ -21,15 +21,14 @@ package org.apache.cassandra.concurrent;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.config.DatabaseDescriptor;
 
-import static 
org.apache.cassandra.config.DatabaseDescriptor.getConcurrentWriters;
 import static 
org.apache.cassandra.config.DatabaseDescriptor.getConcurrentReaders;
+import static 
org.apache.cassandra.config.DatabaseDescriptor.getConcurrentWriters;
 
 
 /**
@@ -53,7 +52,7 @@ public class StageManager
     {
         stages.put(MUTATION_STAGE, multiThreadedStage(MUTATION_STAGE, 
getConcurrentWriters()));
         stages.put(READ_STAGE, multiThreadedStage(READ_STAGE, 
getConcurrentReaders()));
-        stages.put(RESPONSE_STAGE, multiThreadedStage("RESPONSE-STAGE", 
Runtime.getRuntime().availableProcessors()));
+        stages.put(RESPONSE_STAGE, multiThreadedStage("RESPONSE-STAGE", 
Math.max(2, Runtime.getRuntime().availableProcessors())));
         // the rest are all single-threaded
         stages.put(STREAM_STAGE, new 
JMXEnabledThreadPoolExecutor(STREAM_STAGE));
         stages.put(GOSSIP_STAGE, new JMXEnabledThreadPoolExecutor("GMFD"));
@@ -63,11 +62,15 @@ public class StageManager
 
     private static ThreadPoolExecutor multiThreadedStage(String name, int 
numThreads)
     {
+        // avoid running afoul of requirement in DebuggableThreadPoolExecutor 
that single-threaded executors
+        // must have unbounded queues
+        assert numThreads > 1 : "multi-threaded stages must have at least 2 
threads";
+
         return new JMXEnabledThreadPoolExecutor(numThreads,
                                                 numThreads,
                                                 Integer.MAX_VALUE,
                                                 TimeUnit.SECONDS,
-                                                new 
LinkedBlockingQueue<Runnable>(),
+                                                new 
LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getStageQueueSize()),
                                                 new NamedThreadFactory(name));
     }
 

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=925440&r1=925439&r2=925440&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
 Fri Mar 19 21:32:22 2010
@@ -152,6 +152,8 @@ public class DatabaseDescriptor
         throw new RuntimeException("Cannot locate " + STORAGE_CONF_FILE + " 
via storage-config system property or classpath lookup.");
     }
 
+    private static int stageQueueSize_ = 4096;
+
     static
     {
         try
@@ -290,11 +292,20 @@ public class DatabaseDescriptor
             {
                 concurrentReaders = Integer.parseInt(rawReaders);
             }
+            if (concurrentReaders < 2)
+            {
+                throw new ConfigurationException("ConcurrentReads must be at 
least 2");
+            }
+
             String rawWriters = 
xmlUtils.getNodeValue("/Storage/ConcurrentWrites");
             if (rawWriters != null)
             {
                 concurrentWriters = Integer.parseInt(rawWriters);
             }
+            if (concurrentWriters < 2)
+            {
+                throw new ConfigurationException("ConcurrentWrites must be at 
least 2");
+            }
 
             String rawFlushData = 
xmlUtils.getNodeValue("/Storage/FlushDataBufferSizeInMB");
             if (rawFlushData != null)
@@ -1092,6 +1103,11 @@ public class DatabaseDescriptor
         return getCFMetaData(tableName, cfName).subcolumnComparator;
     }
 
+    public static int getStageQueueSize()
+    {
+        return stageQueueSize_;
+    }
+
     /**
      * @return The absolute number of keys that should be cached per table.
      */


Reply via email to