Repository: spark
Updated Branches:
  refs/heads/master fb5d43fb2 -> 1cf19760d


[SPARK-9352] [SPARK-9353] Add tests for standalone scheduling code

This also fixes a small issue in the standalone Master that was uncovered by 
the new tests. For more detail, read the description of SPARK-9353.

Author: Andrew Or <[email protected]>

Closes #7668 from andrewor14/standalone-scheduling-tests and squashes the 
following commits:

d852faf [Andrew Or] Add tests + fix scheduling with memory limits


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1cf19760
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1cf19760
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1cf19760

Branch: refs/heads/master
Commit: 1cf19760d61a5a17bd175a906d34a2940141b76d
Parents: fb5d43f
Author: Andrew Or <[email protected]>
Authored: Sun Jul 26 13:03:13 2015 -0700
Committer: Andrew Or <[email protected]>
Committed: Sun Jul 26 13:03:13 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala |   8 +-
 .../spark/deploy/master/MasterSuite.scala       | 199 ++++++++++++++++++-
 2 files changed, 202 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1cf19760/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 029f94d..51b3f0d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -559,7 +559,7 @@ private[master] class Master(
    * allocated at a time, 12 cores from each worker would be assigned to each 
executor.
    * Since 12 < 16, no executors would launch [SPARK-8881].
    */
-  private[master] def scheduleExecutorsOnWorkers(
+  private def scheduleExecutorsOnWorkers(
       app: ApplicationInfo,
       usableWorkers: Array[WorkerInfo],
       spreadOutApps: Boolean): Array[Int] = {
@@ -585,7 +585,11 @@ private[master] class Master(
         while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= 
coresPerExecutor) {
           coresToAssign -= coresPerExecutor
           assignedCores(pos) += coresPerExecutor
-          assignedMemory(pos) += memoryPerExecutor
+          // If cores per executor is not set, we are assigning 1 core at a 
time
+          // without actually meaning to launch 1 executor for each core 
assigned
+          if (app.desc.coresPerExecutor.isDefined) {
+            assignedMemory(pos) += memoryPerExecutor
+          }
 
           // Spreading out an application means spreading out its executors 
across as
           // many workers as possible. If we are not spreading out, then we 
should keep

http://git-wip-us.apache.org/repos/asf/spark/blob/1cf19760/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index a8fbaf1..4d7016d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -25,14 +25,15 @@ import scala.language.postfixOps
 
 import org.json4s._
 import org.json4s.jackson.JsonMethods._
-import org.scalatest.Matchers
+import org.scalatest.{Matchers, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually
 import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy._
+import org.apache.spark.rpc.RpcEnv
 
-class MasterSuite extends SparkFunSuite with Matchers with Eventually {
+class MasterSuite extends SparkFunSuite with Matchers with Eventually with 
PrivateMethodTester {
 
   test("can use a custom recovery mode factory") {
     val conf = new SparkConf(loadDefaults = false)
@@ -142,4 +143,196 @@ class MasterSuite extends SparkFunSuite with Matchers 
with Eventually {
     }
   }
 
+  test("basic scheduling - spread out") {
+    testBasicScheduling(spreadOut = true)
+  }
+
+  test("basic scheduling - no spread out") {
+    testBasicScheduling(spreadOut = false)
+  }
+
+  test("scheduling with max cores - spread out") {
+    testSchedulingWithMaxCores(spreadOut = true)
+  }
+
+  test("scheduling with max cores - no spread out") {
+    testSchedulingWithMaxCores(spreadOut = false)
+  }
+
+  test("scheduling with cores per executor - spread out") {
+    testSchedulingWithCoresPerExecutor(spreadOut = true)
+  }
+
+  test("scheduling with cores per executor - no spread out") {
+    testSchedulingWithCoresPerExecutor(spreadOut = false)
+  }
+
+  test("scheduling with cores per executor AND max cores - spread out") {
+    testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true)
+  }
+
+  test("scheduling with cores per executor AND max cores - no spread out") {
+    testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false)
+  }
+
+  private def testBasicScheduling(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo = makeAppInfo(1024)
+    val workerInfo = makeWorkerInfo(4096, 10)
+    val workerInfos = Array(workerInfo, workerInfo, workerInfo)
+    val scheduledCores = master.invokePrivate(
+      _scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut))
+    assert(scheduledCores.length === 3)
+    assert(scheduledCores(0) === 10)
+    assert(scheduledCores(1) === 10)
+    assert(scheduledCores(2) === 10)
+  }
+
+  private def testSchedulingWithMaxCores(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo1 = makeAppInfo(1024, maxCores = Some(8))
+    val appInfo2 = makeAppInfo(1024, maxCores = Some(16))
+    val workerInfo = makeWorkerInfo(4096, 10)
+    val workerInfos = Array(workerInfo, workerInfo, workerInfo)
+    var scheduledCores = master.invokePrivate(
+      _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
+    assert(scheduledCores.length === 3)
+    // With spreading out, each worker should be assigned a few cores
+    if (spreadOut) {
+      assert(scheduledCores(0) === 3)
+      assert(scheduledCores(1) === 3)
+      assert(scheduledCores(2) === 2)
+    } else {
+      // Without spreading out, the cores should be concentrated on the first 
worker
+      assert(scheduledCores(0) === 8)
+      assert(scheduledCores(1) === 0)
+      assert(scheduledCores(2) === 0)
+    }
+    // Now test the same thing with max cores > cores per worker
+    scheduledCores = master.invokePrivate(
+      _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
+    assert(scheduledCores.length === 3)
+    if (spreadOut) {
+      assert(scheduledCores(0) === 6)
+      assert(scheduledCores(1) === 5)
+      assert(scheduledCores(2) === 5)
+    } else {
+      // Without spreading out, the first worker should be fully booked,
+      // and the leftover cores should spill over to the second worker only.
+      assert(scheduledCores(0) === 10)
+      assert(scheduledCores(1) === 6)
+      assert(scheduledCores(2) === 0)
+    }
+  }
+
+  private def testSchedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2))
+    val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2))
+    val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3))
+    val workerInfo = makeWorkerInfo(4096, 10)
+    val workerInfos = Array(workerInfo, workerInfo, workerInfo)
+    // Each worker should end up with 4 executors with 2 cores each
+    // This should be 4 because of the memory restriction on each worker
+    var scheduledCores = master.invokePrivate(
+      _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
+    assert(scheduledCores.length === 3)
+    assert(scheduledCores(0) === 8)
+    assert(scheduledCores(1) === 8)
+    assert(scheduledCores(2) === 8)
+    // Now test the same thing without running into the worker memory limit
+    // Each worker should now end up with 5 executors with 2 cores each
+    scheduledCores = master.invokePrivate(
+      _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
+    assert(scheduledCores.length === 3)
+    assert(scheduledCores(0) === 10)
+    assert(scheduledCores(1) === 10)
+    assert(scheduledCores(2) === 10)
+    // Now test the same thing with a cores per executor that 10 is not 
divisible by
+    scheduledCores = master.invokePrivate(
+      _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
+    assert(scheduledCores.length === 3)
+    assert(scheduledCores(0) === 9)
+    assert(scheduledCores(1) === 9)
+    assert(scheduledCores(2) === 9)
+  }
+
+  // Sorry for the long method name!
+  private def testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut: 
Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = 
Some(4))
+    val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = 
Some(20))
+    val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = 
Some(20))
+    val workerInfo = makeWorkerInfo(4096, 10)
+    val workerInfos = Array(workerInfo, workerInfo, workerInfo)
+    // We should only launch two executors, each with exactly 2 cores
+    var scheduledCores = master.invokePrivate(
+      _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
+    assert(scheduledCores.length === 3)
+    if (spreadOut) {
+      assert(scheduledCores(0) === 2)
+      assert(scheduledCores(1) === 2)
+      assert(scheduledCores(2) === 0)
+    } else {
+      assert(scheduledCores(0) === 4)
+      assert(scheduledCores(1) === 0)
+      assert(scheduledCores(2) === 0)
+    }
+    // Test max cores > number of cores per worker
+    scheduledCores = master.invokePrivate(
+      _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
+    assert(scheduledCores.length === 3)
+    if (spreadOut) {
+      assert(scheduledCores(0) === 8)
+      assert(scheduledCores(1) === 6)
+      assert(scheduledCores(2) === 6)
+    } else {
+      assert(scheduledCores(0) === 10)
+      assert(scheduledCores(1) === 10)
+      assert(scheduledCores(2) === 0)
+    }
+    // Test max cores > number of cores per worker AND
+    // a cores per executor that is 10 is not divisible by
+    scheduledCores = master.invokePrivate(
+      _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
+    assert(scheduledCores.length === 3)
+    if (spreadOut) {
+      assert(scheduledCores(0) === 6)
+      assert(scheduledCores(1) === 6)
+      assert(scheduledCores(2) === 6)
+    } else {
+      assert(scheduledCores(0) === 9)
+      assert(scheduledCores(1) === 9)
+      assert(scheduledCores(2) === 0)
+    }
+  }
+
+  // ===============================
+  // | Utility methods for testing |
+  // ===============================
+
+  private val _scheduleExecutorsOnWorkers = 
PrivateMethod[Array[Int]]('scheduleExecutorsOnWorkers)
+
+  private def makeMaster(conf: SparkConf = new SparkConf): Master = {
+    val securityMgr = new SecurityManager(conf)
+    val rpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 7077, conf, 
securityMgr)
+    val master = new Master(rpcEnv, rpcEnv.address, 8080, securityMgr, conf)
+    master
+  }
+
+  private def makeAppInfo(
+      memoryPerExecutorMb: Int,
+      coresPerExecutor: Option[Int] = None,
+      maxCores: Option[Int] = None): ApplicationInfo = {
+    val desc = new ApplicationDescription(
+      "test", maxCores, memoryPerExecutorMb, null, "", None, None, 
coresPerExecutor)
+    val appId = System.currentTimeMillis.toString
+    new ApplicationInfo(0, appId, desc, new Date, null, Int.MaxValue)
+  }
+
+  private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = {
+    val workerId = System.currentTimeMillis.toString
+    new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, 
"address")
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to