Repository: spark
Updated Branches:
  refs/heads/branch-1.6 6935b5080 -> 765307f41


[SPARK-13803] restore the changes in SPARK-3411

## What changes were proposed in this pull request?

This patch contains the functionality to balance the load of the cluster-mode 
drivers among workers

This patch restores the changes in https://github.com/apache/spark/pull/1106 
which was erased due to the merging of https://github.com/apache/spark/pull/731

## How was this patch tested?

test with existing test cases

Author: CodingCat <zhunans...@gmail.com>

Closes #11702 from CodingCat/SPARK-13803.

(cherry picked from commit bd5365bbe9ff6518cde9402ee8843ec1002fff5b)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/765307f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/765307f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/765307f4

Branch: refs/heads/branch-1.6
Commit: 765307f415301bcf660b7d2d75d50f0e6aa32ddd
Parents: 6935b50
Author: CodingCat <zhunans...@gmail.com>
Authored: Tue Mar 15 10:10:23 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Mar 15 10:10:33 2016 +0000

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala | 21 ++++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/765307f4/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 5d97c63..37b4dd5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -703,15 +703,28 @@ private[deploy] class Master(
    * every time a new app joins or resource availability changes.
    */
   private def schedule(): Unit = {
-    if (state != RecoveryState.ALIVE) { return }
+    if (state != RecoveryState.ALIVE) {
+      return
+    }
     // Drivers take strict precedence over executors
-    val shuffledWorkers = Random.shuffle(workers) // Randomization helps 
balance drivers
-    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
-      for (driver <- waitingDrivers) {
+    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == 
WorkerState.ALIVE))
+    val numWorkersAlive = shuffledAliveWorkers.size
+    var curPos = 0
+    for (driver <- waitingDrivers.toList) { // iterate over a copy of 
waitingDrivers
+      // We assign workers to each waiting driver in a round-robin fashion. 
For each driver, we
+      // start from the last worker that was assigned a driver, and continue 
onwards until we have
+      // explored all alive workers.
+      var launched = false
+      var numWorkersVisited = 0
+      while (numWorkersVisited < numWorkersAlive && !launched) {
+        val worker = shuffledAliveWorkers(curPos)
+        numWorkersVisited += 1
         if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= 
driver.desc.cores) {
           launchDriver(worker, driver)
           waitingDrivers -= driver
+          launched = true
         }
+        curPos = (curPos + 1) % numWorkersAlive
       }
     }
     startExecutorsOnWorkers()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to