Cleanup DTPE usage after CASSANDRA-3494
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/263f192b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/263f192b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/263f192b Branch: refs/heads/trunk Commit: 263f192b65db18e3cfb5126e86358f374a0be5fa Parents: 3a04d85 Author: Sylvain Lebresne <[email protected]> Authored: Wed May 2 14:29:34 2012 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Wed May 2 14:29:34 2012 +0200 ---------------------------------------------------------------------- .../concurrent/DebuggableThreadPoolExecutor.java | 27 +++++++++++++- .../apache/cassandra/io/sstable/SSTableReader.java | 2 +- .../org/apache/cassandra/net/MessagingService.java | 7 +--- 3 files changed, 27 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/263f192b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java index 74b2ad9..f85a9ba 100644 --- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java @@ -98,9 +98,32 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor this.setRejectedExecutionHandler(blockingExecutionHandler); } - public static DebuggableThreadPoolExecutor createWithPoolSize(String threadPoolName, int size) + /** + * Returns a ThreadPoolExecutor with a fixed number of threads. + * When all threads are actively executing tasks, new tasks are queued. + * If (most) threads are expected to be idle most of the time, prefer createWithMaxSize() instead. + * @param threadPoolName the name of the threads created by this executor + * @param size the fixed number of threads for this executor + * @return the new DebuggableThreadPoolExecutor + */ + public static DebuggableThreadPoolExecutor createWithFixedPoolSize(String threadPoolName, int size) + { + return createWithMaximumPoolSize(threadPoolName, size, Integer.MAX_VALUE, TimeUnit.SECONDS); + } + + /** + * Returns a ThreadPoolExecutor with a fixed maximum number of threads, but whose + * threads are terminated when idle for too long. + * When all threads are actively executing tasks, new tasks are queued. + * @param threadPoolName the name of the threads created by this executor + * @param size the maximum number of threads for this executor + * @param keepAliveTime the time an idle thread is kept alive before being terminated + * @param unit tht time unit for {@code keepAliveTime} + * @return the new DebuggableThreadPoolExecutor + */ + public static DebuggableThreadPoolExecutor createWithMaximumPoolSize(String threadPoolName, int size, int keepAliveTime, TimeUnit unit) { - return new DebuggableThreadPoolExecutor(size, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName)); + return new DebuggableThreadPoolExecutor(size, Integer.MAX_VALUE, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName)); } protected void onInitialRejection(Runnable task) {} http://git-wip-us.apache.org/repos/asf/cassandra/blob/263f192b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index b4fed4a..c332ae6 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -212,7 +212,7 @@ public class SSTableReader extends SSTable { final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>(); - ExecutorService executor = DebuggableThreadPoolExecutor.createWithPoolSize("SSTableBatchOpen", Runtime.getRuntime().availableProcessors()); + ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", Runtime.getRuntime().availableProcessors()); for (final Map.Entry<Descriptor, Set<Component>> entry : entries) { Runnable runnable = new Runnable() http://git-wip-us.apache.org/repos/asf/cassandra/blob/263f192b/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index c8942b1..e5be891 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -474,12 +474,7 @@ public final class MessagingService implements MessagingServiceMBean if (executor == null) { // Using a core pool size of 0 is important. See documentation of streamExecutors. - executor = new DebuggableThreadPoolExecutor(0, - 1, - 1, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("Streaming to " + to)); + executor = DebuggableThreadPoolExecutor.createWithMaximumPoolSize("Streaming to " + to, 1, 1, TimeUnit.SECONDS); DebuggableThreadPoolExecutor old = streamExecutors.putIfAbsent(to, executor); if (old != null) {
