Repository: spark
Updated Branches:
  refs/heads/branch-2.2 75544c019 -> c86c078c1


[SPARK-20483] Mesos Coarse mode may starve other Mesos frameworks

## What changes were proposed in this pull request?

Set maxCores to be a multiple of the smallest executor that can be launched. 
This ensures that we correctly detect the condition where no more executors 
will be launched when spark.cores.max is not a multiple of spark.executor.cores

## How was this patch tested?

This was manually tested with other sample frameworks measuring their incoming 
offers to determine if starvation would occur.

dbtsai mgummelt

Author: Davis Shepherd <dsheph...@netflix.com>

Closes #17786 from dgshep/fix_mesos_max_cores.

(cherry picked from commit 7633933e54ffb08ab9d959be5f76c26fae29d1d9)
Signed-off-by: DB Tsai <dbt...@dbtsai.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c86c078c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c86c078c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c86c078c

Branch: refs/heads/branch-2.2
Commit: c86c078c1e016ff9dd7df26da9febe2a53f905d1
Parents: 75544c0
Author: Davis Shepherd <dsheph...@netflix.com>
Authored: Thu Apr 27 18:06:12 2017 +0000
Committer: DB Tsai <dbt...@dbtsai.com>
Committed: Thu Apr 27 18:06:31 2017 +0000

----------------------------------------------------------------------
 .../mesos/MesosCoarseGrainedSchedulerBackend.scala   | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c86c078c/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 2a36ec4..8f5b97c 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -60,8 +60,16 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
 
   private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt)
 
+  private val executorCoresOption = 
conf.getOption("spark.executor.cores").map(_.toInt)
+
+  private val minCoresPerExecutor = executorCoresOption.getOrElse(1)
+
   // Maximum number of cores to acquire
-  private val maxCores = maxCoresOption.getOrElse(Int.MaxValue)
+  private val maxCores = {
+    val cores = maxCoresOption.getOrElse(Int.MaxValue)
+    // Set maxCores to a multiple of smallest executor we can launch
+    cores - (cores % minCoresPerExecutor)
+  }
 
   private val useFetcherCache = 
conf.getBoolean("spark.mesos.fetcherCache.enable", false)
 
@@ -489,8 +497,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   }
 
   private def executorCores(offerCPUs: Int): Int = {
-    sc.conf.getInt("spark.executor.cores",
-      math.min(offerCPUs, maxCores - totalCoresAcquired))
+    executorCoresOption.getOrElse(
+      math.min(offerCPUs, maxCores - totalCoresAcquired)
+    )
   }
 
   override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: 
TaskStatus) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to