Repository: spark
Updated Branches:
  refs/heads/branch-1.5 93a0510a5 -> 3868ab644


[SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and 
AppClient (backport 1.5)

backport #10108 to branch 1.5

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10135 from zsxwing/fix-threadpool-1.5.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3868ab64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3868ab64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3868ab64

Branch: refs/heads/branch-1.5
Commit: 3868ab644cc87ce68ba1605f6da65c5e951ce412
Parents: 93a0510
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Mon Dec 7 12:04:18 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Mon Dec 7 12:04:18 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/deploy/client/AppClient.scala   | 10 ++++------
 .../org/apache/spark/deploy/worker/Worker.scala  | 10 ++++------
 .../apache/spark/deploy/yarn/YarnAllocator.scala | 19 +++++--------------
 3 files changed, 13 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3868ab64/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 25ea692..bd28429 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -66,12 +66,10 @@ private[spark] class AppClient(
     // A thread pool for registering with masters. Because registering with a 
master is a blocking
     // action, this thread pool must be able to create 
"masterRpcAddresses.size" threads at the same
     // time so that we can register with all masters.
-    private val registerMasterThreadPool = new ThreadPoolExecutor(
-      0,
-      masterRpcAddresses.size, // Make sure we can register with all masters 
at the same time
-      60L, TimeUnit.SECONDS,
-      new SynchronousQueue[Runnable](),
-      ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
+    private val registerMasterThreadPool = 
ThreadUtils.newDaemonCachedThreadPool(
+      "appclient-register-master-threadpool",
+      masterRpcAddresses.length // Make sure we can register with all masters 
at the same time
+    )
 
     // A scheduled executor for scheduling the registration actions
     private val registrationRetryThread =

http://git-wip-us.apache.org/repos/asf/spark/blob/3868ab64/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 79b1536..a898bb1 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -147,12 +147,10 @@ private[deploy] class Worker(
   // A thread pool for registering with masters. Because registering with a 
master is a blocking
   // action, this thread pool must be able to create "masterRpcAddresses.size" 
threads at the same
   // time so that we can register with all masters.
-  private val registerMasterThreadPool = new ThreadPoolExecutor(
-    0,
-    masterRpcAddresses.size, // Make sure we can register with all masters at 
the same time
-    60L, TimeUnit.SECONDS,
-    new SynchronousQueue[Runnable](),
-    ThreadUtils.namedThreadFactory("worker-register-master-threadpool"))
+  private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+    "worker-register-master-threadpool",
+    masterRpcAddresses.size // Make sure we can register with all masters at 
the same time
+  )
 
   var coresUsed = 0
   var memoryUsed = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/3868ab64/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 6a02848..52a3fd9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -21,26 +21,21 @@ import java.util.Collections
 import java.util.concurrent._
 import java.util.regex.Pattern
 
-import org.apache.spark.util.Utils
-
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.util.RackResolver
-
 import org.apache.log4j.{Level, Logger}
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
+import org.apache.spark.util.ThreadUtils
 
 /**
  * YarnAllocator is charged with requesting containers from the YARN 
ResourceManager and deciding
@@ -108,13 +103,9 @@ private[yarn] class YarnAllocator(
   // Resource capability requested for each executors
   private[yarn] val resource = Resource.newInstance(executorMemory + 
memoryOverhead, executorCores)
 
-  private val launcherPool = new ThreadPoolExecutor(
-    // max pool size of Integer.MAX_VALUE is ignored because we use an 
unbounded queue
-    sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), 
Integer.MAX_VALUE,
-    1, TimeUnit.MINUTES,
-    new LinkedBlockingQueue[Runnable](),
-    new ThreadFactoryBuilder().setNameFormat("ContainerLauncher 
#%d").setDaemon(true).build())
-  launcherPool.allowCoreThreadTimeOut(true)
+  private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
+    "ContainerLauncher",
+    sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25))
 
   // For testing
   private val launchContainers = 
sparkConf.getBoolean("spark.yarn.launchContainers", true)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to