Repository: spark Updated Branches: refs/heads/branch-1.5 93a0510a5 -> 3868ab644
[SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and AppClient (backport 1.5) backport #10108 to branch 1.5 Author: Shixiong Zhu <shixi...@databricks.com> Closes #10135 from zsxwing/fix-threadpool-1.5. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3868ab64 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3868ab64 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3868ab64 Branch: refs/heads/branch-1.5 Commit: 3868ab644cc87ce68ba1605f6da65c5e951ce412 Parents: 93a0510 Author: Shixiong Zhu <shixi...@databricks.com> Authored: Mon Dec 7 12:04:18 2015 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Mon Dec 7 12:04:18 2015 -0800 ---------------------------------------------------------------------- .../apache/spark/deploy/client/AppClient.scala | 10 ++++------ .../org/apache/spark/deploy/worker/Worker.scala | 10 ++++------ .../apache/spark/deploy/yarn/YarnAllocator.scala | 19 +++++-------------- 3 files changed, 13 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3868ab64/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 25ea692..bd28429 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -66,12 +66,10 @@ private[spark] class AppClient( // A thread pool for registering with masters. Because registering with a master is a blocking // action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same // time so that we can register with all masters. - private val registerMasterThreadPool = new ThreadPoolExecutor( - 0, - masterRpcAddresses.size, // Make sure we can register with all masters at the same time - 60L, TimeUnit.SECONDS, - new SynchronousQueue[Runnable](), - ThreadUtils.namedThreadFactory("appclient-register-master-threadpool")) + private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool( + "appclient-register-master-threadpool", + masterRpcAddresses.length // Make sure we can register with all masters at the same time + ) // A scheduled executor for scheduling the registration actions private val registrationRetryThread = http://git-wip-us.apache.org/repos/asf/spark/blob/3868ab64/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 79b1536..a898bb1 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -147,12 +147,10 @@ private[deploy] class Worker( // A thread pool for registering with masters. Because registering with a master is a blocking // action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same // time so that we can register with all masters. - private val registerMasterThreadPool = new ThreadPoolExecutor( - 0, - masterRpcAddresses.size, // Make sure we can register with all masters at the same time - 60L, TimeUnit.SECONDS, - new SynchronousQueue[Runnable](), - ThreadUtils.namedThreadFactory("worker-register-master-threadpool")) + private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool( + "worker-register-master-threadpool", + masterRpcAddresses.size // Make sure we can register with all masters at the same time + ) var coresUsed = 0 var memoryUsed = 0 http://git-wip-us.apache.org/repos/asf/spark/blob/3868ab64/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 6a02848..52a3fd9 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -21,26 +21,21 @@ import java.util.Collections import java.util.concurrent._ import java.util.regex.Pattern -import org.apache.spark.util.Utils - import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import com.google.common.util.concurrent.ThreadFactoryBuilder - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.util.RackResolver - import org.apache.log4j.{Level, Logger} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor +import org.apache.spark.util.ThreadUtils /** * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding @@ -108,13 +103,9 @@ private[yarn] class YarnAllocator( // Resource capability requested for each executors private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores) - private val launcherPool = new ThreadPoolExecutor( - // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue - sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE, - 1, TimeUnit.MINUTES, - new LinkedBlockingQueue[Runnable](), - new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build()) - launcherPool.allowCoreThreadTimeOut(true) + private val launcherPool = ThreadUtils.newDaemonCachedThreadPool( + "ContainerLauncher", + sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25)) // For testing private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org