Repository: spark Updated Branches: refs/heads/branch-1.6 6935b5080 -> 765307f41
[SPARK-13803] restore the changes in SPARK-3411 ## What changes were proposed in this pull request? This patch contains the functionality to balance the load of the cluster-mode drivers among workers This patch restores the changes in https://github.com/apache/spark/pull/1106 which was erased due to the merging of https://github.com/apache/spark/pull/731 ## How was this patch tested? test with existing test cases Author: CodingCat <zhunans...@gmail.com> Closes #11702 from CodingCat/SPARK-13803. (cherry picked from commit bd5365bbe9ff6518cde9402ee8843ec1002fff5b) Signed-off-by: Sean Owen <so...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/765307f4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/765307f4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/765307f4 Branch: refs/heads/branch-1.6 Commit: 765307f415301bcf660b7d2d75d50f0e6aa32ddd Parents: 6935b50 Author: CodingCat <zhunans...@gmail.com> Authored: Tue Mar 15 10:10:23 2016 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Tue Mar 15 10:10:33 2016 +0000 ---------------------------------------------------------------------- .../org/apache/spark/deploy/master/Master.scala | 21 ++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/765307f4/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 5d97c63..37b4dd5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -703,15 +703,28 @@ private[deploy] class Master( * every time a new app joins or resource availability changes. */ private def schedule(): Unit = { - if (state != RecoveryState.ALIVE) { return } + if (state != RecoveryState.ALIVE) { + return + } // Drivers take strict precedence over executors - val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers - for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { - for (driver <- waitingDrivers) { + val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) + val numWorkersAlive = shuffledAliveWorkers.size + var curPos = 0 + for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers + // We assign workers to each waiting driver in a round-robin fashion. For each driver, we + // start from the last worker that was assigned a driver, and continue onwards until we have + // explored all alive workers. + var launched = false + var numWorkersVisited = 0 + while (numWorkersVisited < numWorkersAlive && !launched) { + val worker = shuffledAliveWorkers(curPos) + numWorkersVisited += 1 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver + launched = true } + curPos = (curPos + 1) % numWorkersAlive } } startExecutorsOnWorkers() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org