Repository: spark
Updated Branches:
  refs/heads/branch-1.4 43035b4b4 -> c103c99d2


[SPARK-9352] [SPARK-9353] Add tests for standalone scheduling code

This also fixes a small issue in the standalone Master that was uncovered by 
the new tests. For more detail, read the description of SPARK-9353.

Author: Andrew Or <[email protected]>

Closes #7668 from andrewor14/standalone-scheduling-tests and squashes the 
following commits:

d852faf [Andrew Or] Add tests + fix scheduling with memory limits

Conflicts:
        core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c103c99d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c103c99d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c103c99d

Branch: refs/heads/branch-1.4
Commit: c103c99d23952a329ae56bde405e194ae7aac301
Parents: 43035b4
Author: Andrew Or <[email protected]>
Authored: Sun Jul 26 13:03:13 2015 -0700
Committer: Andrew Or <[email protected]>
Committed: Mon Jul 27 17:58:58 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala | 135 ++++++++-------
 .../spark/deploy/master/MasterSuite.scala       | 170 +++++++++++++++++++
 2 files changed, 242 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c103c99d/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index ac74eba..821c430 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -532,67 +532,6 @@ private[master] class Master(
   }
 
   /**
-   * Schedule executors to be launched on the workers.
-   * Returns an array containing number of cores assigned to each worker.
-   *
-   * There are two modes of launching executors. The first attempts to spread 
out an application's
-   * executors on as many workers as possible, while the second does the 
opposite (i.e. launch them
-   * on as few workers as possible). The former is usually better for data 
locality purposes and is
-   * the default.
-   *
-   * The number of cores assigned to each executor is configurable. When this 
is explicitly set,
-   * multiple executors from the same application may be launched on the same 
worker if the worker
-   * has enough cores and memory. Otherwise, each executor grabs all the cores 
available on the
-   * worker by default, in which case only one executor may be launched on 
each worker.
-   *
-   * It is important to allocate coresPerExecutor on each worker at a time 
(instead of 1 core
-   * at a time). Consider the following example: cluster has 4 workers with 16 
cores each.
-   * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 
16). If 1 core is
-   * allocated at a time, 12 cores from each worker would be assigned to each 
executor.
-   * Since 12 < 16, no executors would launch [SPARK-8881].
-   */
-  private[master] def scheduleExecutorsOnWorkers(
-      app: ApplicationInfo,
-      usableWorkers: Array[WorkerInfo],
-      spreadOutApps: Boolean): Array[Int] = {
-    // If the number of cores per executor is not specified, then we can just 
schedule
-    // 1 core at a time since we expect a single executor to be launched on 
each worker
-    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
-    val memoryPerExecutor = app.desc.memoryPerExecutorMB
-    val numUsable = usableWorkers.length
-    val assignedCores = new Array[Int](numUsable) // Number of cores to give 
to each worker
-    val assignedMemory = new Array[Int](numUsable) // Amount of memory to give 
to each worker
-    var coresToAssign = math.min(app.coresLeft, 
usableWorkers.map(_.coresFree).sum)
-    var freeWorkers = (0 until numUsable).toIndexedSeq
-
-    def canLaunchExecutor(pos: Int): Boolean = {
-      usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
-      usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor
-    }
-
-    while (coresToAssign >= coresPerExecutor && freeWorkers.nonEmpty) {
-      freeWorkers = freeWorkers.filter(canLaunchExecutor)
-      freeWorkers.foreach { pos =>
-        var keepScheduling = true
-        while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= 
coresPerExecutor) {
-          coresToAssign -= coresPerExecutor
-          assignedCores(pos) += coresPerExecutor
-          assignedMemory(pos) += memoryPerExecutor
-
-          // Spreading out an application means spreading out its executors 
across as
-          // many workers as possible. If we are not spreading out, then we 
should keep
-          // scheduling executors on this worker until we use all of its 
resources.
-          // Otherwise, just move on to the next worker.
-          if (spreadOutApps) {
-            keepScheduling = false
-          }
-        }
-      }
-    }
-    assignedCores
-  }
-
-  /**
    * Schedule and launch executors on workers
    */
   private def startExecutorsOnWorkers(): Unit = {
@@ -605,7 +544,7 @@ private[master] class Master(
         .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
           worker.coresFree >= coresPerExecutor.getOrElse(1))
         .sortBy(_.coresFree).reverse
-      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, 
spreadOutApps)
+      val assignedCores = Master.scheduleExecutorsOnWorkers(app, 
usableWorkers, spreadOutApps)
 
       // Now that we've decided how many cores to allocate on each worker, 
let's allocate them
       for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
@@ -930,7 +869,7 @@ private[master] class Master(
 
 private[deploy] object Master extends Logging {
   val systemName = "sparkMaster"
-  private val actorName = "Master"
+  val actorName = "Master"
 
   def main(argStrings: Array[String]) {
     SignalLogger.register(log)
@@ -982,4 +921,74 @@ private[deploy] object Master extends Logging {
     val portsResponse = Await.result(portsRequest, 
timeout).asInstanceOf[BoundPortsResponse]
     (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
   }
+
+
+  /**
+   * Schedule executors to be launched on the workers.
+   * Returns an array containing number of cores assigned to each worker.
+   *
+   * There are two modes of launching executors. The first attempts to spread 
out an application's
+   * executors on as many workers as possible, while the second does the 
opposite (i.e. launch them
+   * on as few workers as possible). The former is usually better for data 
locality purposes and is
+   * the default.
+   *
+   * The number of cores assigned to each executor is configurable. When this 
is explicitly set,
+   * multiple executors from the same application may be launched on the same 
worker if the worker
+   * has enough cores and memory. Otherwise, each executor grabs all the cores 
available on the
+   * worker by default, in which case only one executor may be launched on 
each worker.
+   *
+   * It is important to allocate coresPerExecutor on each worker at a time 
(instead of 1 core
+   * at a time). Consider the following example: cluster has 4 workers with 16 
cores each.
+   * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 
16). If 1 core is
+   * allocated at a time, 12 cores from each worker would be assigned to each 
executor.
+   * Since 12 < 16, no executors would launch [SPARK-8881].
+   *
+   * Unfortunately, this must be moved out here into the Master object because 
Akka allows
+   * neither creating actors outside of Props nor accessing the Master after 
setting up the
+   * actor system. Otherwise, there is no way to test it.
+   */
+  def scheduleExecutorsOnWorkers(
+      app: ApplicationInfo,
+      usableWorkers: Array[WorkerInfo],
+      spreadOutApps: Boolean): Array[Int] = {
+    // If the number of cores per executor is not specified, then we can just 
schedule
+    // 1 core at a time since we expect a single executor to be launched on 
each worker
+    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
+    val memoryPerExecutor = app.desc.memoryPerExecutorMB
+    val numUsable = usableWorkers.length
+    val assignedCores = new Array[Int](numUsable) // Number of cores to give 
to each worker
+    val assignedMemory = new Array[Int](numUsable) // Amount of memory to give 
to each worker
+    var coresToAssign = math.min(app.coresLeft, 
usableWorkers.map(_.coresFree).sum)
+    var freeWorkers = (0 until numUsable).toIndexedSeq
+
+    def canLaunchExecutor(pos: Int): Boolean = {
+      usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
+        usableWorkers(pos).memoryFree - assignedMemory(pos) >= 
memoryPerExecutor
+    }
+
+    while (coresToAssign >= coresPerExecutor && freeWorkers.nonEmpty) {
+      freeWorkers = freeWorkers.filter(canLaunchExecutor)
+      freeWorkers.foreach { pos =>
+        var keepScheduling = true
+        while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= 
coresPerExecutor) {
+          coresToAssign -= coresPerExecutor
+          assignedCores(pos) += coresPerExecutor
+          // If cores per executor is not set, we are assigning 1 core at a 
time
+          // without actually meaning to launch 1 executor for each core 
assigned
+          if (app.desc.coresPerExecutor.isDefined) {
+            assignedMemory(pos) += memoryPerExecutor
+          }
+
+          // Spreading out an application means spreading out its executors 
across as
+          // many workers as possible. If we are not spreading out, then we 
should keep
+          // scheduling executors on this worker until we use all of its 
resources.
+          // Otherwise, just move on to the next worker.
+          if (spreadOutApps) {
+            keepScheduling = false
+          }
+        }
+      }
+    }
+    assignedCores
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c103c99d/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 014e87b..3eabc1a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -184,4 +184,174 @@ class MasterSuite extends SparkFunSuite with Matchers 
with Eventually {
     }
   }
 
+  test("basic scheduling - spread out") {
+    testBasicScheduling(spreadOut = true)
+  }
+
+  test("basic scheduling - no spread out") {
+    testBasicScheduling(spreadOut = false)
+  }
+
+  test("scheduling with max cores - spread out") {
+    testSchedulingWithMaxCores(spreadOut = true)
+  }
+
+  test("scheduling with max cores - no spread out") {
+    testSchedulingWithMaxCores(spreadOut = false)
+  }
+
+  test("scheduling with cores per executor - spread out") {
+    testSchedulingWithCoresPerExecutor(spreadOut = true)
+  }
+
+  test("scheduling with cores per executor - no spread out") {
+    testSchedulingWithCoresPerExecutor(spreadOut = false)
+  }
+
+  test("scheduling with cores per executor AND max cores - spread out") {
+    testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true)
+  }
+
+  test("scheduling with cores per executor AND max cores - no spread out") {
+    testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false)
+  }
+
+  private def testBasicScheduling(spreadOut: Boolean): Unit = {
+    val appInfo = makeAppInfo(1024)
+    val workerInfo = makeWorkerInfo(4096, 10)
+    val workerInfos = Array(workerInfo, workerInfo, workerInfo)
+    val scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo, 
workerInfos, spreadOut)
+    assert(scheduledCores.length === 3)
+    assert(scheduledCores(0) === 10)
+    assert(scheduledCores(1) === 10)
+    assert(scheduledCores(2) === 10)
+  }
+
+  private def testSchedulingWithMaxCores(spreadOut: Boolean): Unit = {
+    val appInfo1 = makeAppInfo(1024, maxCores = Some(8))
+    val appInfo2 = makeAppInfo(1024, maxCores = Some(16))
+    val workerInfo = makeWorkerInfo(4096, 10)
+    val workerInfos = Array(workerInfo, workerInfo, workerInfo)
+    var scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo1, 
workerInfos, spreadOut)
+    assert(scheduledCores.length === 3)
+    // With spreading out, each worker should be assigned a few cores
+    if (spreadOut) {
+      assert(scheduledCores(0) === 3)
+      assert(scheduledCores(1) === 3)
+      assert(scheduledCores(2) === 2)
+    } else {
+      // Without spreading out, the cores should be concentrated on the first 
worker
+      assert(scheduledCores(0) === 8)
+      assert(scheduledCores(1) === 0)
+      assert(scheduledCores(2) === 0)
+    }
+    // Now test the same thing with max cores > cores per worker
+    scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo2, workerInfos, 
spreadOut)
+    assert(scheduledCores.length === 3)
+    if (spreadOut) {
+      assert(scheduledCores(0) === 6)
+      assert(scheduledCores(1) === 5)
+      assert(scheduledCores(2) === 5)
+    } else {
+      // Without spreading out, the first worker should be fully booked,
+      // and the leftover cores should spill over to the second worker only.
+      assert(scheduledCores(0) === 10)
+      assert(scheduledCores(1) === 6)
+      assert(scheduledCores(2) === 0)
+    }
+  }
+
+  private def testSchedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
+    val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2))
+    val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2))
+    val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3))
+    val workerInfo = makeWorkerInfo(4096, 10)
+    val workerInfos = Array(workerInfo, workerInfo, workerInfo)
+    // Each worker should end up with 4 executors with 2 cores each
+    // This should be 4 because of the memory restriction on each worker
+    var scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo1, 
workerInfos, spreadOut)
+    assert(scheduledCores.length === 3)
+    assert(scheduledCores(0) === 8)
+    assert(scheduledCores(1) === 8)
+    assert(scheduledCores(2) === 8)
+    // Now test the same thing without running into the worker memory limit
+    // Each worker should now end up with 5 executors with 2 cores each
+    scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo2, workerInfos, 
spreadOut)
+    assert(scheduledCores.length === 3)
+    assert(scheduledCores(0) === 10)
+    assert(scheduledCores(1) === 10)
+    assert(scheduledCores(2) === 10)
+    // Now test the same thing with a cores per executor that 10 is not 
divisible by
+    scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo3, workerInfos, 
spreadOut)
+    assert(scheduledCores.length === 3)
+    assert(scheduledCores(0) === 9)
+    assert(scheduledCores(1) === 9)
+    assert(scheduledCores(2) === 9)
+  }
+
+  // Sorry for the long method name!
+  private def testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut: 
Boolean): Unit = {
+    val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = 
Some(4))
+    val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = 
Some(20))
+    val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = 
Some(20))
+    val workerInfo = makeWorkerInfo(4096, 10)
+    val workerInfos = Array(workerInfo, workerInfo, workerInfo)
+    // We should only launch two executors, each with exactly 2 cores
+    var scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo1, 
workerInfos, spreadOut)
+    assert(scheduledCores.length === 3)
+    if (spreadOut) {
+      assert(scheduledCores(0) === 2)
+      assert(scheduledCores(1) === 2)
+      assert(scheduledCores(2) === 0)
+    } else {
+      assert(scheduledCores(0) === 4)
+      assert(scheduledCores(1) === 0)
+      assert(scheduledCores(2) === 0)
+    }
+    // Test max cores > number of cores per worker
+    scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo2, workerInfos, 
spreadOut)
+    assert(scheduledCores.length === 3)
+    if (spreadOut) {
+      assert(scheduledCores(0) === 8)
+      assert(scheduledCores(1) === 6)
+      assert(scheduledCores(2) === 6)
+    } else {
+      assert(scheduledCores(0) === 10)
+      assert(scheduledCores(1) === 10)
+      assert(scheduledCores(2) === 0)
+    }
+    // Test max cores > number of cores per worker AND
+    // a cores per executor that is 10 is not divisible by
+    scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo3, workerInfos, 
spreadOut)
+    assert(scheduledCores.length === 3)
+    if (spreadOut) {
+      assert(scheduledCores(0) === 6)
+      assert(scheduledCores(1) === 6)
+      assert(scheduledCores(2) === 6)
+    } else {
+      assert(scheduledCores(0) === 9)
+      assert(scheduledCores(1) === 9)
+      assert(scheduledCores(2) === 0)
+    }
+  }
+
+  // ===============================
+  // | Utility methods for testing |
+  // ===============================
+
+  private def makeAppInfo(
+      memoryPerExecutorMb: Int,
+      coresPerExecutor: Option[Int] = None,
+      maxCores: Option[Int] = None): ApplicationInfo = {
+    val desc = new ApplicationDescription(
+      "test", maxCores, memoryPerExecutorMb, null, "", None, None, 
coresPerExecutor)
+    val appId = System.currentTimeMillis.toString
+    new ApplicationInfo(0, appId, desc, new Date, null, Int.MaxValue)
+  }
+
+  private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = {
+    val workerId = System.currentTimeMillis.toString
+    new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, 
"address")
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to