Allow overriding available processors with
-Dcassandra.available_processors
Patch by brandonwilliams, reviewed by iamaleksey for CASSANDRA-4790


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d6d4151e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d6d4151e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d6d4151e

Branch: refs/heads/trunk
Commit: d6d4151ee24791aa35a983de1c39cc4f895d6a55
Parents: 54ab2d2
Author: Brandon Williams <[email protected]>
Authored: Wed Dec 5 13:18:00 2012 -0600
Committer: Brandon Williams <[email protected]>
Committed: Wed Dec 5 14:46:11 2012 -0600

----------------------------------------------------------------------
 .../apache/cassandra/concurrent/StageManager.java  |    9 +-
 src/java/org/apache/cassandra/config/Config.java   |    6 +-
 .../cassandra/config/DatabaseDescriptor.java       |    2 +-
 .../PeriodicCommitLogExecutorService.java          |    3 +-
 .../db/compaction/ParallelCompactionIterable.java  |    2 +-
 .../cassandra/hadoop/ColumnFamilyRecordWriter.java |    3 +-
 .../apache/cassandra/io/sstable/SSTableReader.java |    2 +-
 .../org/apache/cassandra/service/StorageProxy.java |    4 +-
 .../apache/cassandra/thrift/CassandraDaemon.java   |  223 +++++++++++++++
 .../org/apache/cassandra/utils/FBUtilities.java    |    8 +
 10 files changed, 251 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java 
b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 7ca45f4..bf2e4c2 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.net.MessagingService;
 
 import static org.apache.cassandra.config.DatabaseDescriptor.*;
+import org.apache.cassandra.utils.FBUtilities;
 
 
 /**
@@ -41,21 +42,21 @@ public class StageManager
 
     public static final long KEEPALIVE = 60; // seconds to keep "extra" 
threads alive for when idle
 
-    public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * 
Runtime.getRuntime().availableProcessors();
+    public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * 
FBUtilities.getAvailableProcessors();
 
     static
     {
         stages.put(Stage.MUTATION, 
multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
         stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, 
getConcurrentReaders()));
-        stages.put(Stage.REQUEST_RESPONSE, 
multiThreadedStage(Stage.REQUEST_RESPONSE, 
Runtime.getRuntime().availableProcessors()));
-        stages.put(Stage.INTERNAL_RESPONSE, 
multiThreadedStage(Stage.INTERNAL_RESPONSE, 
Runtime.getRuntime().availableProcessors()));
+        stages.put(Stage.REQUEST_RESPONSE, 
multiThreadedStage(Stage.REQUEST_RESPONSE, 
FBUtilities.getAvailableProcessors()));
+        stages.put(Stage.INTERNAL_RESPONSE, 
multiThreadedStage(Stage.INTERNAL_RESPONSE, 
FBUtilities.getAvailableProcessors()));
         stages.put(Stage.REPLICATE_ON_WRITE, 
multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, 
getConcurrentReplicators(), MAX_REPLICATE_ON_WRITE_TASKS));
         // the rest are all single-threaded
         stages.put(Stage.GOSSIP, new 
JMXEnabledThreadPoolExecutor(Stage.GOSSIP));
         stages.put(Stage.ANTI_ENTROPY, new 
JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
         stages.put(Stage.MIGRATION, new 
JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
         stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
-        stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, 
Runtime.getRuntime().availableProcessors()));
+        stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, 
FBUtilities.getAvailableProcessors()));
         stages.put(Stage.TRACING, tracingExecutor());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index e9f190f..609633c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -21,12 +21,15 @@ import org.apache.cassandra.cache.SerializingCacheProvider;
 import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.io.util.NativeAllocator;
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * A class that contains configuration properties for the cassandra node it 
runs within.
  * 
  * Properties declared as volatile can be mutated via JMX.
  */
+
+
 public class Config
 {
     public String cluster_name = "Test Cluster";
@@ -101,8 +104,9 @@ public class Config
     /* if the size of columns or super-columns are more than this, indexing 
will kick in */
     public Integer column_index_size_in_kb = 64;
     public Integer in_memory_compaction_limit_in_mb = 64;
-    public Integer concurrent_compactors = 
Runtime.getRuntime().availableProcessors();
+    public Integer concurrent_compactors = 
FBUtilities.getAvailableProcessors();
     public volatile Integer compaction_throughput_mb_per_sec = 16;
+    public Integer compaction_throughput_mb_per_sec = 16;
     public Boolean multithreaded_compaction = false;
 
     public Integer max_streaming_retries = 3;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index c624ba3..c57983a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -389,7 +389,7 @@ public class DatabaseDescriptor
             }
 
             if (conf.concurrent_compactors == null)
-                conf.concurrent_compactors = 
Runtime.getRuntime().availableProcessors();
+                conf.concurrent_compactors = 
FBUtilities.getAvailableProcessors();
 
             if (conf.concurrent_compactors <= 0)
                 throw new ConfigurationException("concurrent_compactors should 
be strictly greater than 0");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
 
b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
index 39978ef..94f593e 100644
--- 
a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
+++ 
b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.concurrent.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
@@ -32,7 +33,7 @@ class PeriodicCommitLogExecutorService implements 
ICommitLogExecutorService
 
     public PeriodicCommitLogExecutorService(final CommitLog commitLog)
     {
-        queue = new LinkedBlockingQueue<Runnable>(1024 * 
Runtime.getRuntime().availableProcessors());
+        queue = new LinkedBlockingQueue<Runnable>(1024 * 
FBUtilities.getAvailableProcessors());
         Runnable runnable = new WrappedRunnable()
         {
             public void runMayThrow() throws Exception

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java 
b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 56fce20..b19395c 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -135,7 +135,7 @@ public class ParallelCompactionIterable extends 
AbstractCompactionIterable
         private final List<RowContainer> rows = new ArrayList<RowContainer>();
         private int row = 0;
 
-        private final ThreadPoolExecutor executor = new 
DebuggableThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
+        private final ThreadPoolExecutor executor = new 
DebuggableThreadPoolExecutor(FBUtilities.getAvailableProcessors(),
                                                                                
      Integer.MAX_VALUE,
                                                                                
      TimeUnit.MILLISECONDS,
                                                                                
      new SynchronousQueue<Runnable>(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java 
b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index 3b66976..909c291 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.client.RingCache;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -101,7 +102,7 @@ implements 
org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
     {
         this.conf = conf;
         this.ringCache = new RingCache(conf);
-        this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * 
Runtime.getRuntime().availableProcessors());
+        this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * 
FBUtilities.getAvailableProcessors());
         this.clients = new HashMap<Range,RangeClient>();
         batchThreshold = 
conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
         consistencyLevel = 
ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index c1c751c..45a1f7a 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -225,7 +225,7 @@ public class SSTableReader extends SSTable
     {
         final Collection<SSTableReader> sstables = new 
LinkedBlockingQueue<SSTableReader>();
 
-        ExecutorService executor = 
DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", 
Runtime.getRuntime().availableProcessors());
+        ExecutorService executor = 
DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", 
FBUtilities.getAvailableProcessors());
         for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
         {
             Runnable runnable = new Runnable()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0c3eae9..a3b95ff 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -79,7 +79,9 @@ public class StorageProxy implements StorageProxyMBean
 
     public static final StorageProxy instance = new StorageProxy();
 
-    private static volatile int maxHintsInProgress = 1024 * 
Runtime.getRuntime().availableProcessors();
+    private static volatile boolean hintedHandoffEnabled = 
DatabaseDescriptor.hintedHandoffEnabled();
+    private static volatile int maxHintWindow = 
DatabaseDescriptor.getMaxHintWindow();
+    private static volatile int maxHintsInProgress = 1024 * 
FBUtilities.getAvailableProcessors();
     private static final AtomicInteger totalHintsInProgress = new 
AtomicInteger();
     private static final Map<InetAddress, AtomicInteger> hintsInProgress = new 
MapMaker().concurrencyLevel(1).makeComputingMap(new Function<InetAddress, 
AtomicInteger>()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraDaemon.java 
b/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
new file mode 100644
index 0000000..572e3e0
--- /dev/null
+++ b/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.thrift;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.service.AbstractCassandraDaemon;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+/**
+ * This class supports two methods for creating a Cassandra node daemon,
+ * invoking the class's main method, and using the jsvc wrapper from
+ * commons-daemon, (for more information on using this class with the
+ * jsvc wrapper, see the
+ * <a href="http://commons.apache.org/daemon/jsvc.html";>Commons Daemon</a>
+ * documentation).
+ */
+
+public class CassandraDaemon extends 
org.apache.cassandra.service.AbstractCassandraDaemon
+{
+    protected static CassandraDaemon instance;
+
+    static
+    {
+        AbstractCassandraDaemon.initLog4j();
+    }
+
+    private static Logger logger = 
LoggerFactory.getLogger(CassandraDaemon.class);
+    private final static String SYNC = "sync";
+    private final static String ASYNC = "async";
+    private final static String HSHA = "hsha";
+    public final static List<String> rpc_server_types = Arrays.asList(SYNC, 
ASYNC, HSHA);
+    private ThriftServer server;
+
+    protected void startServer()
+    {
+        if (server == null)
+        {
+            server = new ThriftServer(listenAddr, listenPort);
+            server.start();
+        }
+    }
+
+    protected void stopServer()
+    {
+        if (server != null)
+        {
+            server.stopServer();
+            try
+            {
+                server.join();
+            }
+            catch (InterruptedException e)
+            {
+                logger.error("Interrupted while waiting thrift server to 
stop", e);
+            }
+            server = null;
+        }
+    }
+
+    public static void stop(String[] args)
+    {
+        instance.stopServer();
+        instance.deactivate();
+    }
+
+    public static void main(String[] args)
+    {
+        instance = new CassandraDaemon();
+        instance.activate();
+    }
+
+    /**
+     * Simple class to run the thrift connection accepting code in separate
+     * thread of control.
+     */
+    private static class ThriftServer extends Thread
+    {
+        private TServer serverEngine;
+
+        public ThriftServer(InetAddress listenAddr, int listenPort)
+        {
+            // now we start listening for clients
+            final CassandraServer cassandraServer = new CassandraServer();
+            Cassandra.Processor processor = new 
Cassandra.Processor(cassandraServer);
+
+            // Transport
+            logger.info(String.format("Binding thrift service to %s:%s", 
listenAddr, listenPort));
+
+            // Protocol factory
+            TProtocolFactory tProtocolFactory = new 
TBinaryProtocol.Factory(true, true, 
DatabaseDescriptor.getThriftMaxMessageLength());
+
+            // Transport factory
+            int tFramedTransportSize = 
DatabaseDescriptor.getThriftFramedTransportSize();
+            TTransportFactory inTransportFactory = new 
TFramedTransport.Factory(tFramedTransportSize);
+            TTransportFactory outTransportFactory = new 
TFramedTransport.Factory(tFramedTransportSize);
+            logger.info("Using TFastFramedTransport with a max frame size of 
{} bytes.", tFramedTransportSize);
+
+            if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(SYNC))
+            {
+                TServerTransport serverTransport;
+                try
+                {
+                    serverTransport = new TCustomServerSocket(new 
InetSocketAddress(listenAddr, listenPort),
+                                                              
DatabaseDescriptor.getRpcKeepAlive(),
+                                                              
DatabaseDescriptor.getRpcSendBufferSize(),
+                                                              
DatabaseDescriptor.getRpcRecvBufferSize());
+                }
+                catch (TTransportException e)
+                {
+                    throw new RuntimeException(String.format("Unable to create 
thrift socket to %s:%s", listenAddr, listenPort), e);
+                }
+                // ThreadPool Server and will be invocation per connection 
basis...
+                TThreadPoolServer.Args serverArgs = new 
TThreadPoolServer.Args(serverTransport)
+                                                                         
.minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
+                                                                         
.maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
+                                                                         
.inputTransportFactory(inTransportFactory)
+                                                                         
.outputTransportFactory(outTransportFactory)
+                                                                         
.inputProtocolFactory(tProtocolFactory)
+                                                                         
.outputProtocolFactory(tProtocolFactory)
+                                                                         
.processor(processor);
+                ExecutorService executorService = new 
CleaningThreadPool(cassandraServer.clientState, serverArgs.minWorkerThreads, 
serverArgs.maxWorkerThreads);
+                serverEngine = new CustomTThreadPoolServer(serverArgs, 
executorService);
+                logger.info(String.format("Using synchronous/threadpool thrift 
server on %s : %s", listenAddr, listenPort));
+            }
+            else
+            {
+                TNonblockingServerTransport serverTransport;
+                try
+                {
+                    serverTransport = new TCustomNonblockingServerSocket(new 
InetSocketAddress(listenAddr, listenPort),
+                                                                             
DatabaseDescriptor.getRpcKeepAlive(),
+                                                                             
DatabaseDescriptor.getRpcSendBufferSize(),
+                                                                             
DatabaseDescriptor.getRpcRecvBufferSize());
+                }
+                catch (TTransportException e)
+                {
+                    throw new RuntimeException(String.format("Unable to create 
thrift socket to %s:%s", listenAddr, listenPort), e);
+                }
+
+                if 
(DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ASYNC))
+                {
+                    // This is single threaded hence the invocation will be all
+                    // in one thread.
+                    TNonblockingServer.Args serverArgs = new 
TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
+                                                                               
                      .outputTransportFactory(outTransportFactory)
+                                                                               
                      .inputProtocolFactory(tProtocolFactory)
+                                                                               
                      .outputProtocolFactory(tProtocolFactory)
+                                                                               
                      .processor(processor);
+                    logger.info(String.format("Using non-blocking/asynchronous 
thrift server on %s : %s", listenAddr, listenPort));
+                    serverEngine = new CustomTNonBlockingServer(serverArgs);
+                }
+                else if 
(DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(HSHA))
+                {
+                    // This is NIO selector service but the invocation will be 
Multi-Threaded with the Executor service.
+                    ExecutorService executorService = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
+                                                                               
        DatabaseDescriptor.getRpcMaxThreads(),
+                                                                               
        60L,
+                                                                               
        TimeUnit.SECONDS,
+                                                                               
        new SynchronousQueue<Runnable>(),
+                                                                               
        new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
+                    TNonblockingServer.Args serverArgs = new 
TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
+                                                                               
        .outputTransportFactory(outTransportFactory)
+                                                                               
        .inputProtocolFactory(tProtocolFactory)
+                                                                               
        .outputProtocolFactory(tProtocolFactory)
+                                                                               
        .processor(processor);
+                    logger.info(String.format("Using custom 
half-sync/half-async thrift server on %s : %s", listenAddr, listenPort));
+                    // Check for available processors in the system which will 
be equal to the IO Threads.
+                    serverEngine = new CustomTHsHaServer(serverArgs, 
executorService, FBUtilities.getAvailableProcessors());
+                }
+            }
+        }
+
+        public void run()
+        {
+            logger.info("Listening for thrift clients...");
+            serverEngine.serve();
+        }
+
+        public void stopServer()
+        {
+            logger.info("Stop listening to thrift clients");
+            serverEngine.stop();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java 
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index bc910bd..82fab71 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -69,6 +69,14 @@ public class FBUtilities
     private static volatile InetAddress localInetAddress;
     private static volatile InetAddress broadcastInetAddress;
 
+    public static int getAvailableProcessors()
+    {
+        if (System.getProperty("cassandra.available_processors") != null)
+            return 
Integer.parseInt(System.getProperty("cassandra.available_processors"));
+        else
+            return Runtime.getRuntime().availableProcessors();
+    }
+
     private static final ThreadLocal<MessageDigest> localMD5Digest = new 
ThreadLocal<MessageDigest>()
     {
         @Override

Reply via email to