Allow overriding available processors with -Dcassandra.available_processors Patch by brandonwilliams, reviewed by iamaleksey for CASSANDRA-4790
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d6d4151e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d6d4151e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d6d4151e Branch: refs/heads/trunk Commit: d6d4151ee24791aa35a983de1c39cc4f895d6a55 Parents: 54ab2d2 Author: Brandon Williams <[email protected]> Authored: Wed Dec 5 13:18:00 2012 -0600 Committer: Brandon Williams <[email protected]> Committed: Wed Dec 5 14:46:11 2012 -0600 ---------------------------------------------------------------------- .../apache/cassandra/concurrent/StageManager.java | 9 +- src/java/org/apache/cassandra/config/Config.java | 6 +- .../cassandra/config/DatabaseDescriptor.java | 2 +- .../PeriodicCommitLogExecutorService.java | 3 +- .../db/compaction/ParallelCompactionIterable.java | 2 +- .../cassandra/hadoop/ColumnFamilyRecordWriter.java | 3 +- .../apache/cassandra/io/sstable/SSTableReader.java | 2 +- .../org/apache/cassandra/service/StorageProxy.java | 4 +- .../apache/cassandra/thrift/CassandraDaemon.java | 223 +++++++++++++++ .../org/apache/cassandra/utils/FBUtilities.java | 8 + 10 files changed, 251 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/concurrent/StageManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java index 7ca45f4..bf2e4c2 100644 --- a/src/java/org/apache/cassandra/concurrent/StageManager.java +++ b/src/java/org/apache/cassandra/concurrent/StageManager.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.net.MessagingService; import static org.apache.cassandra.config.DatabaseDescriptor.*; +import org.apache.cassandra.utils.FBUtilities; /** @@ -41,21 +42,21 @@ public class StageManager public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle - public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * Runtime.getRuntime().availableProcessors(); + public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * FBUtilities.getAvailableProcessors(); static { stages.put(Stage.MUTATION, multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters())); stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, getConcurrentReaders())); - stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, Runtime.getRuntime().availableProcessors())); - stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, Runtime.getRuntime().availableProcessors())); + stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors())); + stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, FBUtilities.getAvailableProcessors())); stages.put(Stage.REPLICATE_ON_WRITE, multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, getConcurrentReplicators(), MAX_REPLICATE_ON_WRITE_TASKS)); // the rest are all single-threaded stages.put(Stage.GOSSIP, new JMXEnabledThreadPoolExecutor(Stage.GOSSIP)); stages.put(Stage.ANTI_ENTROPY, new JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY)); stages.put(Stage.MIGRATION, new JMXEnabledThreadPoolExecutor(Stage.MIGRATION)); stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC)); - stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, Runtime.getRuntime().availableProcessors())); + stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, FBUtilities.getAvailableProcessors())); stages.put(Stage.TRACING, tracingExecutor()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index e9f190f..609633c 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -21,12 +21,15 @@ import org.apache.cassandra.cache.SerializingCacheProvider; import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions; import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; import org.apache.cassandra.io.util.NativeAllocator; +import org.apache.cassandra.utils.FBUtilities; /** * A class that contains configuration properties for the cassandra node it runs within. * * Properties declared as volatile can be mutated via JMX. */ + + public class Config { public String cluster_name = "Test Cluster"; @@ -101,8 +104,9 @@ public class Config /* if the size of columns or super-columns are more than this, indexing will kick in */ public Integer column_index_size_in_kb = 64; public Integer in_memory_compaction_limit_in_mb = 64; - public Integer concurrent_compactors = Runtime.getRuntime().availableProcessors(); + public Integer concurrent_compactors = FBUtilities.getAvailableProcessors(); public volatile Integer compaction_throughput_mb_per_sec = 16; + public Integer compaction_throughput_mb_per_sec = 16; public Boolean multithreaded_compaction = false; public Integer max_streaming_retries = 3; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index c624ba3..c57983a 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -389,7 +389,7 @@ public class DatabaseDescriptor } if (conf.concurrent_compactors == null) - conf.concurrent_compactors = Runtime.getRuntime().availableProcessors(); + conf.concurrent_compactors = FBUtilities.getAvailableProcessors(); if (conf.concurrent_compactors <= 0) throw new ConfigurationException("concurrent_compactors should be strictly greater than 0"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java index 39978ef..94f593e 100644 --- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java +++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.concurrent.*; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; class PeriodicCommitLogExecutorService implements ICommitLogExecutorService @@ -32,7 +33,7 @@ class PeriodicCommitLogExecutorService implements ICommitLogExecutorService public PeriodicCommitLogExecutorService(final CommitLog commitLog) { - queue = new LinkedBlockingQueue<Runnable>(1024 * Runtime.getRuntime().availableProcessors()); + queue = new LinkedBlockingQueue<Runnable>(1024 * FBUtilities.getAvailableProcessors()); Runnable runnable = new WrappedRunnable() { public void runMayThrow() throws Exception http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java index 56fce20..b19395c 100644 --- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java @@ -135,7 +135,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable private final List<RowContainer> rows = new ArrayList<RowContainer>(); private int row = 0; - private final ThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), + private final ThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(FBUtilities.getAvailableProcessors(), Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java index 3b66976..909c291 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java @@ -30,6 +30,7 @@ import org.apache.cassandra.client.RingCache; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.thrift.*; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.RecordWriter; @@ -101,7 +102,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> { this.conf = conf; this.ringCache = new RingCache(conf); - this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * Runtime.getRuntime().availableProcessors()); + this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors()); this.clients = new HashMap<Range,RangeClient>(); batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32); consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index c1c751c..45a1f7a 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -225,7 +225,7 @@ public class SSTableReader extends SSTable { final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>(); - ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", Runtime.getRuntime().availableProcessors()); + ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors()); for (final Map.Entry<Descriptor, Set<Component>> entry : entries) { Runnable runnable = new Runnable() http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 0c3eae9..a3b95ff 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -79,7 +79,9 @@ public class StorageProxy implements StorageProxyMBean public static final StorageProxy instance = new StorageProxy(); - private static volatile int maxHintsInProgress = 1024 * Runtime.getRuntime().availableProcessors(); + private static volatile boolean hintedHandoffEnabled = DatabaseDescriptor.hintedHandoffEnabled(); + private static volatile int maxHintWindow = DatabaseDescriptor.getMaxHintWindow(); + private static volatile int maxHintsInProgress = 1024 * FBUtilities.getAvailableProcessors(); private static final AtomicInteger totalHintsInProgress = new AtomicInteger(); private static final Map<InetAddress, AtomicInteger> hintsInProgress = new MapMaker().concurrencyLevel(1).makeComputingMap(new Function<InetAddress, AtomicInteger>() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/thrift/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraDaemon.java b/src/java/org/apache/cassandra/thrift/CassandraDaemon.java new file mode 100644 index 0000000..572e3e0 --- /dev/null +++ b/src/java/org/apache/cassandra/thrift/CassandraDaemon.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.thrift; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.service.AbstractCassandraDaemon; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.thrift.server.TNonblockingServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.server.TServer; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; + +/** + * This class supports two methods for creating a Cassandra node daemon, + * invoking the class's main method, and using the jsvc wrapper from + * commons-daemon, (for more information on using this class with the + * jsvc wrapper, see the + * <a href="http://commons.apache.org/daemon/jsvc.html">Commons Daemon</a> + * documentation). + */ + +public class CassandraDaemon extends org.apache.cassandra.service.AbstractCassandraDaemon +{ + protected static CassandraDaemon instance; + + static + { + AbstractCassandraDaemon.initLog4j(); + } + + private static Logger logger = LoggerFactory.getLogger(CassandraDaemon.class); + private final static String SYNC = "sync"; + private final static String ASYNC = "async"; + private final static String HSHA = "hsha"; + public final static List<String> rpc_server_types = Arrays.asList(SYNC, ASYNC, HSHA); + private ThriftServer server; + + protected void startServer() + { + if (server == null) + { + server = new ThriftServer(listenAddr, listenPort); + server.start(); + } + } + + protected void stopServer() + { + if (server != null) + { + server.stopServer(); + try + { + server.join(); + } + catch (InterruptedException e) + { + logger.error("Interrupted while waiting thrift server to stop", e); + } + server = null; + } + } + + public static void stop(String[] args) + { + instance.stopServer(); + instance.deactivate(); + } + + public static void main(String[] args) + { + instance = new CassandraDaemon(); + instance.activate(); + } + + /** + * Simple class to run the thrift connection accepting code in separate + * thread of control. + */ + private static class ThriftServer extends Thread + { + private TServer serverEngine; + + public ThriftServer(InetAddress listenAddr, int listenPort) + { + // now we start listening for clients + final CassandraServer cassandraServer = new CassandraServer(); + Cassandra.Processor processor = new Cassandra.Processor(cassandraServer); + + // Transport + logger.info(String.format("Binding thrift service to %s:%s", listenAddr, listenPort)); + + // Protocol factory + TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true, true, DatabaseDescriptor.getThriftMaxMessageLength()); + + // Transport factory + int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize(); + TTransportFactory inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize); + TTransportFactory outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize); + logger.info("Using TFastFramedTransport with a max frame size of {} bytes.", tFramedTransportSize); + + if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(SYNC)) + { + TServerTransport serverTransport; + try + { + serverTransport = new TCustomServerSocket(new InetSocketAddress(listenAddr, listenPort), + DatabaseDescriptor.getRpcKeepAlive(), + DatabaseDescriptor.getRpcSendBufferSize(), + DatabaseDescriptor.getRpcRecvBufferSize()); + } + catch (TTransportException e) + { + throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e); + } + // ThreadPool Server and will be invocation per connection basis... + TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport) + .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads()) + .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads()) + .inputTransportFactory(inTransportFactory) + .outputTransportFactory(outTransportFactory) + .inputProtocolFactory(tProtocolFactory) + .outputProtocolFactory(tProtocolFactory) + .processor(processor); + ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState, serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads); + serverEngine = new CustomTThreadPoolServer(serverArgs, executorService); + logger.info(String.format("Using synchronous/threadpool thrift server on %s : %s", listenAddr, listenPort)); + } + else + { + TNonblockingServerTransport serverTransport; + try + { + serverTransport = new TCustomNonblockingServerSocket(new InetSocketAddress(listenAddr, listenPort), + DatabaseDescriptor.getRpcKeepAlive(), + DatabaseDescriptor.getRpcSendBufferSize(), + DatabaseDescriptor.getRpcRecvBufferSize()); + } + catch (TTransportException e) + { + throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e); + } + + if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ASYNC)) + { + // This is single threaded hence the invocation will be all + // in one thread. + TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory) + .outputTransportFactory(outTransportFactory) + .inputProtocolFactory(tProtocolFactory) + .outputProtocolFactory(tProtocolFactory) + .processor(processor); + logger.info(String.format("Using non-blocking/asynchronous thrift server on %s : %s", listenAddr, listenPort)); + serverEngine = new CustomTNonBlockingServer(serverArgs); + } + else if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(HSHA)) + { + // This is NIO selector service but the invocation will be Multi-Threaded with the Executor service. + ExecutorService executorService = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(), + DatabaseDescriptor.getRpcMaxThreads(), + 60L, + TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), + new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL"); + TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory) + .outputTransportFactory(outTransportFactory) + .inputProtocolFactory(tProtocolFactory) + .outputProtocolFactory(tProtocolFactory) + .processor(processor); + logger.info(String.format("Using custom half-sync/half-async thrift server on %s : %s", listenAddr, listenPort)); + // Check for available processors in the system which will be equal to the IO Threads. + serverEngine = new CustomTHsHaServer(serverArgs, executorService, FBUtilities.getAvailableProcessors()); + } + } + } + + public void run() + { + logger.info("Listening for thrift clients..."); + serverEngine.serve(); + } + + public void stopServer() + { + logger.info("Stop listening to thrift clients"); + serverEngine.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6d4151e/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index bc910bd..82fab71 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -69,6 +69,14 @@ public class FBUtilities private static volatile InetAddress localInetAddress; private static volatile InetAddress broadcastInetAddress; + public static int getAvailableProcessors() + { + if (System.getProperty("cassandra.available_processors") != null) + return Integer.parseInt(System.getProperty("cassandra.available_processors")); + else + return Runtime.getRuntime().availableProcessors(); + } + private static final ThreadLocal<MessageDigest> localMD5Digest = new ThreadLocal<MessageDigest>() { @Override
