Cleanup DTPE usage after CASSANDRA-3494

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/263f192b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/263f192b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/263f192b

Branch: refs/heads/trunk
Commit: 263f192b65db18e3cfb5126e86358f374a0be5fa
Parents: 3a04d85
Author: Sylvain Lebresne <[email protected]>
Authored: Wed May 2 14:29:34 2012 +0200
Committer: Sylvain Lebresne <[email protected]>
Committed: Wed May 2 14:29:34 2012 +0200

----------------------------------------------------------------------
 .../concurrent/DebuggableThreadPoolExecutor.java   |   27 +++++++++++++-
 .../apache/cassandra/io/sstable/SSTableReader.java |    2 +-
 .../org/apache/cassandra/net/MessagingService.java |    7 +---
 3 files changed, 27 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/263f192b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java 
b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index 74b2ad9..f85a9ba 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -98,9 +98,32 @@ public class DebuggableThreadPoolExecutor extends 
ThreadPoolExecutor
         this.setRejectedExecutionHandler(blockingExecutionHandler);
     }
 
-    public static DebuggableThreadPoolExecutor createWithPoolSize(String 
threadPoolName, int size)
+    /**
+     * Returns a ThreadPoolExecutor with a fixed number of threads.
+     * When all threads are actively executing tasks, new tasks are queued.
+     * If (most) threads are expected to be idle most of the time, prefer 
createWithMaxSize() instead.
+     * @param threadPoolName the name of the threads created by this executor
+     * @param size the fixed number of threads for this executor
+     * @return the new DebuggableThreadPoolExecutor
+     */
+    public static DebuggableThreadPoolExecutor createWithFixedPoolSize(String 
threadPoolName, int size)
+    {
+        return createWithMaximumPoolSize(threadPoolName, size, 
Integer.MAX_VALUE, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Returns a ThreadPoolExecutor with a fixed maximum number of threads, 
but whose
+     * threads are terminated when idle for too long.
+     * When all threads are actively executing tasks, new tasks are queued.
+     * @param threadPoolName the name of the threads created by this executor
+     * @param size the maximum number of threads for this executor
+     * @param keepAliveTime the time an idle thread is kept alive before being 
terminated
+     * @param unit tht time unit for {@code keepAliveTime}
+     * @return the new DebuggableThreadPoolExecutor
+     */
+    public static DebuggableThreadPoolExecutor 
createWithMaximumPoolSize(String threadPoolName, int size, int keepAliveTime, 
TimeUnit unit)
     {
-        return new DebuggableThreadPoolExecutor(size, Integer.MAX_VALUE, 
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory(threadPoolName));
+        return new DebuggableThreadPoolExecutor(size, Integer.MAX_VALUE, 
keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory(threadPoolName));
     }
 
     protected void onInitialRejection(Runnable task) {}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/263f192b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index b4fed4a..c332ae6 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -212,7 +212,7 @@ public class SSTableReader extends SSTable
     {
         final Collection<SSTableReader> sstables = new 
LinkedBlockingQueue<SSTableReader>();
 
-        ExecutorService executor = 
DebuggableThreadPoolExecutor.createWithPoolSize("SSTableBatchOpen", 
Runtime.getRuntime().availableProcessors());
+        ExecutorService executor = 
DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", 
Runtime.getRuntime().availableProcessors());
         for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
         {
             Runnable runnable = new Runnable()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/263f192b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index c8942b1..e5be891 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -474,12 +474,7 @@ public final class MessagingService implements 
MessagingServiceMBean
         if (executor == null)
         {
             // Using a core pool size of 0 is important. See documentation of 
streamExecutors.
-            executor = new DebuggableThreadPoolExecutor(0,
-                                                        1,
-                                                        1,
-                                                        TimeUnit.SECONDS,
-                                                        new 
LinkedBlockingQueue<Runnable>(),
-                                                        new 
NamedThreadFactory("Streaming to " + to));
+            executor = 
DebuggableThreadPoolExecutor.createWithMaximumPoolSize("Streaming to " + to, 1, 
1, TimeUnit.SECONDS);
             DebuggableThreadPoolExecutor old = streamExecutors.putIfAbsent(to, 
executor);
             if (old != null)
             {

Reply via email to