Repository: spark Updated Branches: refs/heads/branch-1.4 43035b4b4 -> c103c99d2
[SPARK-9352] [SPARK-9353] Add tests for standalone scheduling code This also fixes a small issue in the standalone Master that was uncovered by the new tests. For more detail, read the description of SPARK-9353. Author: Andrew Or <[email protected]> Closes #7668 from andrewor14/standalone-scheduling-tests and squashes the following commits: d852faf [Andrew Or] Add tests + fix scheduling with memory limits Conflicts: core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c103c99d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c103c99d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c103c99d Branch: refs/heads/branch-1.4 Commit: c103c99d23952a329ae56bde405e194ae7aac301 Parents: 43035b4 Author: Andrew Or <[email protected]> Authored: Sun Jul 26 13:03:13 2015 -0700 Committer: Andrew Or <[email protected]> Committed: Mon Jul 27 17:58:58 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/deploy/master/Master.scala | 135 ++++++++------- .../spark/deploy/master/MasterSuite.scala | 170 +++++++++++++++++++ 2 files changed, 242 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c103c99d/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index ac74eba..821c430 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -532,67 +532,6 @@ private[master] class Master( } /** - * Schedule executors to be launched on the workers. - * Returns an array containing number of cores assigned to each worker. - * - * There are two modes of launching executors. The first attempts to spread out an application's - * executors on as many workers as possible, while the second does the opposite (i.e. launch them - * on as few workers as possible). The former is usually better for data locality purposes and is - * the default. - * - * The number of cores assigned to each executor is configurable. When this is explicitly set, - * multiple executors from the same application may be launched on the same worker if the worker - * has enough cores and memory. Otherwise, each executor grabs all the cores available on the - * worker by default, in which case only one executor may be launched on each worker. - * - * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core - * at a time). Consider the following example: cluster has 4 workers with 16 cores each. - * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is - * allocated at a time, 12 cores from each worker would be assigned to each executor. - * Since 12 < 16, no executors would launch [SPARK-8881]. - */ - private[master] def scheduleExecutorsOnWorkers( - app: ApplicationInfo, - usableWorkers: Array[WorkerInfo], - spreadOutApps: Boolean): Array[Int] = { - // If the number of cores per executor is not specified, then we can just schedule - // 1 core at a time since we expect a single executor to be launched on each worker - val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1) - val memoryPerExecutor = app.desc.memoryPerExecutorMB - val numUsable = usableWorkers.length - val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker - val assignedMemory = new Array[Int](numUsable) // Amount of memory to give to each worker - var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) - var freeWorkers = (0 until numUsable).toIndexedSeq - - def canLaunchExecutor(pos: Int): Boolean = { - usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor && - usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor - } - - while (coresToAssign >= coresPerExecutor && freeWorkers.nonEmpty) { - freeWorkers = freeWorkers.filter(canLaunchExecutor) - freeWorkers.foreach { pos => - var keepScheduling = true - while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) { - coresToAssign -= coresPerExecutor - assignedCores(pos) += coresPerExecutor - assignedMemory(pos) += memoryPerExecutor - - // Spreading out an application means spreading out its executors across as - // many workers as possible. If we are not spreading out, then we should keep - // scheduling executors on this worker until we use all of its resources. - // Otherwise, just move on to the next worker. - if (spreadOutApps) { - keepScheduling = false - } - } - } - } - assignedCores - } - - /** * Schedule and launch executors on workers */ private def startExecutorsOnWorkers(): Unit = { @@ -605,7 +544,7 @@ private[master] class Master( .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1)) .sortBy(_.coresFree).reverse - val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) + val assignedCores = Master.scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) // Now that we've decided how many cores to allocate on each worker, let's allocate them for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { @@ -930,7 +869,7 @@ private[master] class Master( private[deploy] object Master extends Logging { val systemName = "sparkMaster" - private val actorName = "Master" + val actorName = "Master" def main(argStrings: Array[String]) { SignalLogger.register(log) @@ -982,4 +921,74 @@ private[deploy] object Master extends Logging { val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse] (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort) } + + + /** + * Schedule executors to be launched on the workers. + * Returns an array containing number of cores assigned to each worker. + * + * There are two modes of launching executors. The first attempts to spread out an application's + * executors on as many workers as possible, while the second does the opposite (i.e. launch them + * on as few workers as possible). The former is usually better for data locality purposes and is + * the default. + * + * The number of cores assigned to each executor is configurable. When this is explicitly set, + * multiple executors from the same application may be launched on the same worker if the worker + * has enough cores and memory. Otherwise, each executor grabs all the cores available on the + * worker by default, in which case only one executor may be launched on each worker. + * + * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core + * at a time). Consider the following example: cluster has 4 workers with 16 cores each. + * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is + * allocated at a time, 12 cores from each worker would be assigned to each executor. + * Since 12 < 16, no executors would launch [SPARK-8881]. + * + * Unfortunately, this must be moved out here into the Master object because Akka allows + * neither creating actors outside of Props nor accessing the Master after setting up the + * actor system. Otherwise, there is no way to test it. + */ + def scheduleExecutorsOnWorkers( + app: ApplicationInfo, + usableWorkers: Array[WorkerInfo], + spreadOutApps: Boolean): Array[Int] = { + // If the number of cores per executor is not specified, then we can just schedule + // 1 core at a time since we expect a single executor to be launched on each worker + val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1) + val memoryPerExecutor = app.desc.memoryPerExecutorMB + val numUsable = usableWorkers.length + val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker + val assignedMemory = new Array[Int](numUsable) // Amount of memory to give to each worker + var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) + var freeWorkers = (0 until numUsable).toIndexedSeq + + def canLaunchExecutor(pos: Int): Boolean = { + usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor && + usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor + } + + while (coresToAssign >= coresPerExecutor && freeWorkers.nonEmpty) { + freeWorkers = freeWorkers.filter(canLaunchExecutor) + freeWorkers.foreach { pos => + var keepScheduling = true + while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) { + coresToAssign -= coresPerExecutor + assignedCores(pos) += coresPerExecutor + // If cores per executor is not set, we are assigning 1 core at a time + // without actually meaning to launch 1 executor for each core assigned + if (app.desc.coresPerExecutor.isDefined) { + assignedMemory(pos) += memoryPerExecutor + } + + // Spreading out an application means spreading out its executors across as + // many workers as possible. If we are not spreading out, then we should keep + // scheduling executors on this worker until we use all of its resources. + // Otherwise, just move on to the next worker. + if (spreadOutApps) { + keepScheduling = false + } + } + } + } + assignedCores + } } http://git-wip-us.apache.org/repos/asf/spark/blob/c103c99d/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 014e87b..3eabc1a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -184,4 +184,174 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually { } } + test("basic scheduling - spread out") { + testBasicScheduling(spreadOut = true) + } + + test("basic scheduling - no spread out") { + testBasicScheduling(spreadOut = false) + } + + test("scheduling with max cores - spread out") { + testSchedulingWithMaxCores(spreadOut = true) + } + + test("scheduling with max cores - no spread out") { + testSchedulingWithMaxCores(spreadOut = false) + } + + test("scheduling with cores per executor - spread out") { + testSchedulingWithCoresPerExecutor(spreadOut = true) + } + + test("scheduling with cores per executor - no spread out") { + testSchedulingWithCoresPerExecutor(spreadOut = false) + } + + test("scheduling with cores per executor AND max cores - spread out") { + testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true) + } + + test("scheduling with cores per executor AND max cores - no spread out") { + testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false) + } + + private def testBasicScheduling(spreadOut: Boolean): Unit = { + val appInfo = makeAppInfo(1024) + val workerInfo = makeWorkerInfo(4096, 10) + val workerInfos = Array(workerInfo, workerInfo, workerInfo) + val scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + assert(scheduledCores(0) === 10) + assert(scheduledCores(1) === 10) + assert(scheduledCores(2) === 10) + } + + private def testSchedulingWithMaxCores(spreadOut: Boolean): Unit = { + val appInfo1 = makeAppInfo(1024, maxCores = Some(8)) + val appInfo2 = makeAppInfo(1024, maxCores = Some(16)) + val workerInfo = makeWorkerInfo(4096, 10) + val workerInfos = Array(workerInfo, workerInfo, workerInfo) + var scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + // With spreading out, each worker should be assigned a few cores + if (spreadOut) { + assert(scheduledCores(0) === 3) + assert(scheduledCores(1) === 3) + assert(scheduledCores(2) === 2) + } else { + // Without spreading out, the cores should be concentrated on the first worker + assert(scheduledCores(0) === 8) + assert(scheduledCores(1) === 0) + assert(scheduledCores(2) === 0) + } + // Now test the same thing with max cores > cores per worker + scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + if (spreadOut) { + assert(scheduledCores(0) === 6) + assert(scheduledCores(1) === 5) + assert(scheduledCores(2) === 5) + } else { + // Without spreading out, the first worker should be fully booked, + // and the leftover cores should spill over to the second worker only. + assert(scheduledCores(0) === 10) + assert(scheduledCores(1) === 6) + assert(scheduledCores(2) === 0) + } + } + + private def testSchedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = { + val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2)) + val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2)) + val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3)) + val workerInfo = makeWorkerInfo(4096, 10) + val workerInfos = Array(workerInfo, workerInfo, workerInfo) + // Each worker should end up with 4 executors with 2 cores each + // This should be 4 because of the memory restriction on each worker + var scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + assert(scheduledCores(0) === 8) + assert(scheduledCores(1) === 8) + assert(scheduledCores(2) === 8) + // Now test the same thing without running into the worker memory limit + // Each worker should now end up with 5 executors with 2 cores each + scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + assert(scheduledCores(0) === 10) + assert(scheduledCores(1) === 10) + assert(scheduledCores(2) === 10) + // Now test the same thing with a cores per executor that 10 is not divisible by + scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + assert(scheduledCores(0) === 9) + assert(scheduledCores(1) === 9) + assert(scheduledCores(2) === 9) + } + + // Sorry for the long method name! + private def testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = { + val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(4)) + val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(20)) + val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = Some(20)) + val workerInfo = makeWorkerInfo(4096, 10) + val workerInfos = Array(workerInfo, workerInfo, workerInfo) + // We should only launch two executors, each with exactly 2 cores + var scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + if (spreadOut) { + assert(scheduledCores(0) === 2) + assert(scheduledCores(1) === 2) + assert(scheduledCores(2) === 0) + } else { + assert(scheduledCores(0) === 4) + assert(scheduledCores(1) === 0) + assert(scheduledCores(2) === 0) + } + // Test max cores > number of cores per worker + scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + if (spreadOut) { + assert(scheduledCores(0) === 8) + assert(scheduledCores(1) === 6) + assert(scheduledCores(2) === 6) + } else { + assert(scheduledCores(0) === 10) + assert(scheduledCores(1) === 10) + assert(scheduledCores(2) === 0) + } + // Test max cores > number of cores per worker AND + // a cores per executor that is 10 is not divisible by + scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + if (spreadOut) { + assert(scheduledCores(0) === 6) + assert(scheduledCores(1) === 6) + assert(scheduledCores(2) === 6) + } else { + assert(scheduledCores(0) === 9) + assert(scheduledCores(1) === 9) + assert(scheduledCores(2) === 0) + } + } + + // =============================== + // | Utility methods for testing | + // =============================== + + private def makeAppInfo( + memoryPerExecutorMb: Int, + coresPerExecutor: Option[Int] = None, + maxCores: Option[Int] = None): ApplicationInfo = { + val desc = new ApplicationDescription( + "test", maxCores, memoryPerExecutorMb, null, "", None, None, coresPerExecutor) + val appId = System.currentTimeMillis.toString + new ApplicationInfo(0, appId, desc, new Date, null, Int.MaxValue) + } + + private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = { + val workerId = System.currentTimeMillis.toString + new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, "address") + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
