This is an automated email from the ASF dual-hosted git repository. jiangxb1987 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 99e503c [SPARK-29263][SCHEDULER] Update `availableSlots` in `resourceOffers()` before checking available slots for barrier taskSet 99e503c is described below commit 99e503cebfd9cb19372c88b0dd70c6743f864454 Author: Juliusz Sompolski <ju...@databricks.com> AuthorDate: Fri Sep 27 11:18:32 2019 -0700 [SPARK-29263][SCHEDULER] Update `availableSlots` in `resourceOffers()` before checking available slots for barrier taskSet ### What changes were proposed in this pull request? availableSlots are computed before the for loop looping over all TaskSets in resourceOffers. But the number of slots changes in every iteration, as in every iteration these slots are taken. The number of available slots checked by a barrier task set has therefore to be recomputed in every iteration from availableCpus. ### Why are the changes needed? Bugfix. This could make resourceOffer attempt to start a barrier task set, even though it has not enough slots available. That would then be caught by the `require` in line 519, which will throw an exception, which will get caught and ignored by Dispatcher's MessageLoop, so nothing terrible would happen, but the exception would prevent resourceOffers from considering further TaskSets. Note that launching the barrier TaskSet can still fail if other requirements are not satisfied, and still can be rolled-back by throwing exception in this `require`. Handling it more gracefully remains a TODO in SPARK-24818, but this fix at least should resolve the situation when it's unable to launch because of insufficient slots. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Added UT Closes #23375 Closes #25946 from juliuszsompolski/SPARK-29263. Authored-by: Juliusz Sompolski <ju...@databricks.com> Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com> (cherry picked from commit 420abb457df0f422f73bab19a6ed6d7c6bab3173) Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com> --- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../org/apache/spark/scheduler/FakeTask.scala | 36 +++++++++++---- .../spark/scheduler/TaskSchedulerImplSuite.scala | 51 ++++++++++++++++------ 3 files changed, 65 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index e194b79..38dbbe7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -391,7 +391,6 @@ private[spark] class TaskSchedulerImpl( // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) val availableCpus = shuffledOffers.map(o => o.cores).toArray - val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( @@ -405,6 +404,7 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { + val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sum // Skip the barrier taskSet if the available slots are less than the number of pending tasks. if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { // Skip the launch process. diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala index b29d32f..abc8841 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala @@ -42,15 +42,23 @@ object FakeTask { * locations for each task (given as varargs) if this sequence is not empty. */ def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { - createTaskSet(numTasks, stageAttemptId = 0, prefLocs: _*) + createTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0, prefLocs: _*) } - def createTaskSet(numTasks: Int, stageAttemptId: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { - createTaskSet(numTasks, stageId = 0, stageAttemptId, prefLocs: _*) + def createTaskSet( + numTasks: Int, + stageId: Int, + stageAttemptId: Int, + prefLocs: Seq[TaskLocation]*): TaskSet = { + createTaskSet(numTasks, stageId, stageAttemptId, priority = 0, prefLocs: _*) } - def createTaskSet(numTasks: Int, stageId: Int, stageAttemptId: Int, prefLocs: Seq[TaskLocation]*): - TaskSet = { + def createTaskSet( + numTasks: Int, + stageId: Int, + stageAttemptId: Int, + priority: Int, + prefLocs: Seq[TaskLocation]*): TaskSet = { if (prefLocs.size != 0 && prefLocs.size != numTasks) { throw new IllegalArgumentException("Wrong number of task locations") } @@ -65,6 +73,15 @@ object FakeTask { stageId: Int, stageAttemptId: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { + createShuffleMapTaskSet(numTasks, stageId, stageAttemptId, priority = 0, prefLocs: _*) + } + + def createShuffleMapTaskSet( + numTasks: Int, + stageId: Int, + stageAttemptId: Int, + priority: Int, + prefLocs: Seq[TaskLocation]*): TaskSet = { if (prefLocs.size != 0 && prefLocs.size != numTasks) { throw new IllegalArgumentException("Wrong number of task locations") } @@ -74,17 +91,18 @@ object FakeTask { }, prefLocs(i), new Properties, SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array()) } - new TaskSet(tasks, stageId, stageAttemptId, priority = 0, null) + new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null) } def createBarrierTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { - createBarrierTaskSet(numTasks, stageId = 0, stageAttempId = 0, prefLocs: _*) + createBarrierTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0, prefLocs: _*) } def createBarrierTaskSet( numTasks: Int, stageId: Int, - stageAttempId: Int, + stageAttemptId: Int, + priority: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { if (prefLocs.size != 0 && prefLocs.size != numTasks) { throw new IllegalArgumentException("Wrong number of task locations") @@ -92,6 +110,6 @@ object FakeTask { val tasks = Array.tabulate[Task[_]](numTasks) { i => new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil, isBarrier = true) } - new TaskSet(tasks, stageId, stageAttempId, priority = 0, null) + new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 8b60f8a..5c0601eb03 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -214,19 +214,19 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.taskSetManagerForAttempt(taskset.stageId, taskset.stageAttemptId).get.isZombie } - val attempt1 = FakeTask.createTaskSet(1, 0) + val attempt1 = FakeTask.createTaskSet(1, stageId = 0, stageAttemptId = 0) taskScheduler.submitTasks(attempt1) // The first submitted taskset is active assert(!isTasksetZombie(attempt1)) - val attempt2 = FakeTask.createTaskSet(1, 1) + val attempt2 = FakeTask.createTaskSet(1, stageId = 0, stageAttemptId = 1) taskScheduler.submitTasks(attempt2) // The first submitted taskset is zombie now assert(isTasksetZombie(attempt1)) // The newly submitted taskset is active assert(!isTasksetZombie(attempt2)) - val attempt3 = FakeTask.createTaskSet(1, 2) + val attempt3 = FakeTask.createTaskSet(1, stageId = 0, stageAttemptId = 2) taskScheduler.submitTasks(attempt3) // The first submitted taskset remains zombie assert(isTasksetZombie(attempt1)) @@ -241,7 +241,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val numFreeCores = 1 val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores)) - val attempt1 = FakeTask.createTaskSet(10) + val attempt1 = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 0) // submit attempt 1, offer some resources, some tasks get scheduled taskScheduler.submitTasks(attempt1) @@ -257,7 +257,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(0 === taskDescriptions2.length) // if we schedule another attempt for the same stage, it should get scheduled - val attempt2 = FakeTask.createTaskSet(10, 1) + val attempt2 = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 1) // submit attempt 2, offer some resources, some tasks get scheduled taskScheduler.submitTasks(attempt2) @@ -273,7 +273,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val numFreeCores = 10 val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores)) - val attempt1 = FakeTask.createTaskSet(10) + val attempt1 = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 0) // submit attempt 1, offer some resources, some tasks get scheduled taskScheduler.submitTasks(attempt1) @@ -289,7 +289,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(0 === taskDescriptions2.length) // submit attempt 2 - val attempt2 = FakeTask.createTaskSet(10, 1) + val attempt2 = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 1) taskScheduler.submitTasks(attempt2) // attempt 1 finished (this can happen even if it was marked zombie earlier -- all tasks were @@ -483,7 +483,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("abort stage when all executors are blacklisted and we cannot acquire new executor") { taskScheduler = setupSchedulerWithMockTaskSetBlacklist() - val taskSet = FakeTask.createTaskSet(numTasks = 10, stageAttemptId = 0) + val taskSet = FakeTask.createTaskSet(numTasks = 10) taskScheduler.submitTasks(taskSet) val tsm = stageToMockTaskSetManager(0) @@ -525,7 +525,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0") // We have only 1 task remaining with 1 executor - val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0) + val taskSet = FakeTask.createTaskSet(numTasks = 1) taskScheduler.submitTasks(taskSet) val tsm = stageToMockTaskSetManager(0) @@ -561,7 +561,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10") // We have only 1 task remaining with 1 executor - val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0) + val taskSet = FakeTask.createTaskSet(numTasks = 1) taskScheduler.submitTasks(taskSet) val tsm = stageToMockTaskSetManager(0) @@ -907,7 +907,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("SPARK-16106 locality levels updated if executor added to existing host") { val taskScheduler = setupScheduler() - taskScheduler.submitTasks(FakeTask.createTaskSet(2, 0, + taskScheduler.submitTasks(FakeTask.createTaskSet(2, stageId = 0, stageAttemptId = 0, (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2")) }: _* )) @@ -945,7 +945,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("scheduler checks for executors that can be expired from blacklist") { taskScheduler = setupScheduler() - taskScheduler.submitTasks(FakeTask.createTaskSet(1, 0)) + taskScheduler.submitTasks(FakeTask.createTaskSet(1, stageId = 0, stageAttemptId = 0)) taskScheduler.resourceOffers(IndexedSeq( new WorkerOffer("executor0", "host0", 1) )).flatten @@ -1251,6 +1251,29 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(3 === taskDescriptions.length) } + test("SPARK-29263: barrier TaskSet can't schedule when higher prio taskset takes the slots") { + val taskCpus = 2 + val taskScheduler = setupSchedulerWithMaster( + s"local[$taskCpus]", + config.CPUS_PER_TASK.key -> taskCpus.toString) + + val numFreeCores = 3 + val workerOffers = IndexedSeq( + new WorkerOffer("executor0", "host0", numFreeCores, Some("192.168.0.101:49625")), + new WorkerOffer("executor1", "host1", numFreeCores, Some("192.168.0.101:49627")), + new WorkerOffer("executor2", "host2", numFreeCores, Some("192.168.0.101:49629"))) + val barrier = FakeTask.createBarrierTaskSet(3, stageId = 0, stageAttemptId = 0, priority = 1) + val highPrio = FakeTask.createTaskSet(1, stageId = 1, stageAttemptId = 0, priority = 0) + + // submit highPrio and barrier taskSet + taskScheduler.submitTasks(highPrio) + taskScheduler.submitTasks(barrier) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + // it schedules the highPrio task first, and then will not have enough slots to schedule + // the barrier taskset + assert(1 === taskDescriptions.length) + } + test("cancelTasks shall kill all the running tasks and fail the stage") { val taskScheduler = setupScheduler() @@ -1266,7 +1289,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } }) - val attempt1 = FakeTask.createTaskSet(10, 0) + val attempt1 = FakeTask.createTaskSet(10) taskScheduler.submitTasks(attempt1) val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1), @@ -1297,7 +1320,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } }) - val attempt1 = FakeTask.createTaskSet(10, 0) + val attempt1 = FakeTask.createTaskSet(10) taskScheduler.submitTasks(attempt1) val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org