Repository: spark
Updated Branches:
  refs/heads/master d9956f86a -> 3720057b8


[SPARK-3607] ConnectionManager threads.max configs on the thread pools don't 
work

Hi all - cleaned up the code to get rid of the unused parameter and added some 
discussion of the ThreadPoolExecutor parameters to explain why we can use a 
single threadCount instead of providing a min/max.

Author: Ilya Ganelin <[email protected]>

Closes #3664 from ilganeli/SPARK-3607C and squashes the following commits:

3c05690 [Ilya Ganelin] Updated documentation and refactored code to extract 
shared variables


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3720057b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3720057b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3720057b

Branch: refs/heads/master
Commit: 3720057b8e7c15c2c0464b5bb7243bc22323f4e8
Parents: d9956f8
Author: Ilya Ganelin <[email protected]>
Authored: Thu Dec 18 12:53:18 2014 -0800
Committer: Reynold Xin <[email protected]>
Committed: Thu Dec 18 12:53:18 2014 -0800

----------------------------------------------------------------------
 .../spark/network/nio/ConnectionManager.scala   | 27 +++++++++++++-------
 1 file changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3720057b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala 
b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index df4b085..243b71c 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -83,9 +83,21 @@ private[nio] class ConnectionManager(
 
   private val ackTimeout = 
conf.getInt("spark.core.connection.ack.wait.timeout", 60)
 
+  // Get the thread counts from the Spark Configuration.
+  // 
+  // Even though the ThreadPoolExecutor constructor takes both a minimum and 
maximum value,
+  // we only query for the minimum value because we are using 
LinkedBlockingDeque.
+  // 
+  // The JavaDoc for ThreadPoolExecutor points out that when using a 
LinkedBlockingDeque (which is 
+  // an unbounded queue) no more than corePoolSize threads will ever be 
created, so only the "min"
+  // parameter is necessary.
+  private val handlerThreadCount = 
conf.getInt("spark.core.connection.handler.threads.min", 20)
+  private val ioThreadCount = 
conf.getInt("spark.core.connection.io.threads.min", 4)
+  private val connectThreadCount = 
conf.getInt("spark.core.connection.connect.threads.min", 1)
+
   private val handleMessageExecutor = new ThreadPoolExecutor(
-    conf.getInt("spark.core.connection.handler.threads.min", 20),
-    conf.getInt("spark.core.connection.handler.threads.max", 60),
+    handlerThreadCount,
+    handlerThreadCount,
     conf.getInt("spark.core.connection.handler.threads.keepalive", 60), 
TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable](),
     Utils.namedThreadFactory("handle-message-executor")) {
@@ -96,12 +108,11 @@ private[nio] class ConnectionManager(
         logError("Error in handleMessageExecutor is not handled properly", t)
       }
     }
-
   }
 
   private val handleReadWriteExecutor = new ThreadPoolExecutor(
-    conf.getInt("spark.core.connection.io.threads.min", 4),
-    conf.getInt("spark.core.connection.io.threads.max", 32),
+    ioThreadCount,
+    ioThreadCount,
     conf.getInt("spark.core.connection.io.threads.keepalive", 60), 
TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable](),
     Utils.namedThreadFactory("handle-read-write-executor")) {
@@ -112,14 +123,13 @@ private[nio] class ConnectionManager(
         logError("Error in handleReadWriteExecutor is not handled properly", t)
       }
     }
-
   }
 
   // Use a different, yet smaller, thread pool - infrequently used with very 
short lived tasks :
   // which should be executed asap
   private val handleConnectExecutor = new ThreadPoolExecutor(
-    conf.getInt("spark.core.connection.connect.threads.min", 1),
-    conf.getInt("spark.core.connection.connect.threads.max", 8),
+    connectThreadCount,
+    connectThreadCount,
     conf.getInt("spark.core.connection.connect.threads.keepalive", 60), 
TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable](),
     Utils.namedThreadFactory("handle-connect-executor")) {
@@ -130,7 +140,6 @@ private[nio] class ConnectionManager(
         logError("Error in handleConnectExecutor is not handled properly", t)
       }
     }
-
   }
 
   private val serverChannel = ServerSocketChannel.open()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to