Updated Branches: refs/heads/cassandra-2.0.0 65cd5801f -> ddcd54a3b
Fix potential deadlock in the native protocol patch by slebresne; reviewed by jbellis for CASSANDRA-5926 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/02d9238e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/02d9238e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/02d9238e Branch: refs/heads/cassandra-2.0.0 Commit: 02d9238e2ac664ded1db7b923201bd4d9730f9b3 Parents: 17186d8 Author: Sylvain Lebresne <[email protected]> Authored: Mon Aug 26 17:35:19 2013 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Mon Aug 26 17:36:47 2013 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 2 + .../transport/RequestThreadPoolExecutor.java | 46 ++++++++++++++++++-- 3 files changed, 45 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/02d9238e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e887c27..4246c30 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -27,6 +27,7 @@ * Properly handle parsing huge map and set literals (CASSANDRA-5893) * Fix LCS L0 compaction may overlap in L1 (CASSANDRA-5907) * New sstablesplit tool to split large sstables offline (CASSANDRA-4766) + * Fix potential deadlock in native protocol server (CASSANDRA-5926) Merged from 1.1: * Correctly validate sparse composite cells in scrub (CASSANDRA-5855) http://git-wip-us.apache.org/repos/asf/cassandra/blob/02d9238e/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 49e9874..28c43ba 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -342,6 +342,8 @@ native_transport_port: 9042 # The minimum and maximum threads for handling requests when the native # transport is used. They are similar to rpc_min_threads and rpc_max_threads, # though the defaults differ slightly. +# NOTE: native_transport_min_threads is now deprecated and ignored (but kept +# in the 1.2.x serie for compatibility sake). # native_transport_min_threads: 16 # native_transport_max_threads: 128 http://git-wip-us.apache.org/repos/asf/cassandra/blob/02d9238e/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java index faae28f..d266387 100644 --- a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java @@ -20,20 +20,58 @@ package org.apache.cassandra.transport; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor; +import org.jboss.netty.util.ObjectSizeEstimator; + import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; -public class RequestThreadPoolExecutor extends DebuggableThreadPoolExecutor +public class RequestThreadPoolExecutor extends MemoryAwareThreadPoolExecutor { private final static int CORE_THREAD_TIMEOUT_SEC = 30; + // Number of request we accept to queue before blocking. We could allow this to be configured... + private final static int MAX_QUEUED_REQUESTS = 128; public RequestThreadPoolExecutor() { - super(DatabaseDescriptor.getNativeTransportMinThreads(), - DatabaseDescriptor.getNativeTransportMaxThreads(), + super(DatabaseDescriptor.getNativeTransportMaxThreads(), + 0, // We don't use the per-channel limit, only the global one + MAX_QUEUED_REQUESTS, CORE_THREAD_TIMEOUT_SEC, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(32), // Seems to help smooth latency compared to SynchronousQueue. + sizeEstimator(), new NamedThreadFactory("Native-Transport-Requests")); } + + /* + * In theory, the ObjectSizeEstimator should estimate the actual size of a + * request, and MemoryAwareThreadPoolExecutor sets a memory limit on how + * much memory we allow for request before blocking. + * + * However, the memory size used by a CQL query is not very intersting and + * by no mean reflect the memory size it's execution will use (the interesting part). + * Furthermore, we're mainly interested in limiting the number of unhandled requests that + * piles up to implement some back-pressure, and for that, there is no real need to do + * fancy esimation of request size. So we use a trivial estimator that just count the + * number of request. + * + * We could get more fancy later ... + */ + private static ObjectSizeEstimator sizeEstimator() + { + return new ObjectSizeEstimator() + { + public int estimateSize(Object o) + { + return 1; + } + }; + } + + @Override + protected void afterExecute(Runnable r, Throwable t) + { + super.afterExecute(r, t); + DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t); + } }
