This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b219f27586b [SPARK-41469][CORE] Avoid unnecessary task rerun on
decommissioned executor lost if shuffle data migrated
b219f27586b is described below
commit b219f27586b80bb552f0ae3a4121c7f12ed1f970
Author: Yi Wu <[email protected]>
AuthorDate: Tue Dec 27 15:37:45 2022 -0600
[SPARK-41469][CORE] Avoid unnecessary task rerun on decommissioned executor
lost if shuffle data migrated
### What changes were proposed in this pull request?
This PR proposes to avoid rerunning the finished shuffle map task in
`TaskSetManager.executorLost()` if the executor lost is caused by decommission
and the shuffle data has been successfully migrated.
### Why are the changes needed?
To avoid unnecessary task recomputation.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added UT
Closes #39011 from Ngone51/decom-executor-lost.
Authored-by: Yi Wu <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../scala/org/apache/spark/MapOutputTracker.scala | 11 ++++++
.../org/apache/spark/scheduler/DAGScheduler.scala | 7 +++-
.../scala/org/apache/spark/scheduler/TaskSet.scala | 3 +-
.../apache/spark/scheduler/TaskSetManager.scala | 42 ++++++++++++++++++---
.../spark/deploy/DecommissionWorkerSuite.scala | 3 ++
.../org/apache/spark/scheduler/FakeTask.scala | 6 +--
.../org/apache/spark/scheduler/PoolSuite.scala | 2 +-
.../spark/scheduler/TaskSchedulerImplSuite.scala | 6 +--
.../spark/scheduler/TaskSetManagerSuite.scala | 44 ++++++++++++++++++++--
9 files changed, 105 insertions(+), 19 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 3b5a21df4d6..a163fef693e 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -1128,6 +1128,17 @@ private[spark] class MapOutputTrackerMaster(
}
}
+ /**
+ * Get map output location by (shuffleId, mapId)
+ */
+ def getMapOutputLocation(shuffleId: Int, mapId: Long):
Option[BlockManagerId] = {
+ shuffleStatuses.get(shuffleId).flatMap { shuffleStatus =>
+ shuffleStatus.withMapStatuses { mapStatues =>
+ mapStatues.filter(_ != null).find(_.mapId == mapId).map(_.location)
+ }
+ }
+ }
+
def incrementEpoch(): Unit = {
epochLock.synchronized {
epoch += 1
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index bb17a987717..cc991178481 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1590,9 +1590,14 @@ private[spark] class DAGScheduler(
if (tasks.nonEmpty) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage
(${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
+ val shuffleId = stage match {
+ case s: ShuffleMapStage => Some(s.shuffleDep.shuffleId)
+ case _: ResultStage => None
+ }
+
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId,
properties,
- stage.resourceProfileId))
+ stage.resourceProfileId, shuffleId))
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
index 7a8ed16f6eb..6411757313e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
@@ -29,7 +29,8 @@ private[spark] class TaskSet(
val stageAttemptId: Int,
val priority: Int,
val properties: Properties,
- val resourceProfileId: Int) {
+ val resourceProfileId: Int,
+ val shuffleId: Option[Int]) {
val id: String = stageId + "." + stageAttemptId
override def toString: String = "TaskSet " + id
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index cbb8fd0a334..124a27502fe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -76,6 +76,8 @@ private[spark] class TaskSetManager(
val tasks = taskSet.tasks
private val isShuffleMapTasks = tasks(0).isInstanceOf[ShuffleMapTask]
+ // shuffleId is only available when isShuffleMapTasks=true
+ private val shuffleId = taskSet.shuffleId
private[scheduler] val partitionToIndex = tasks.zipWithIndex
.map { case (t, idx) => t.partitionId -> idx }.toMap
val numTasks = tasks.length
@@ -1046,17 +1048,45 @@ private[spark] class TaskSetManager(
/** Called by TaskScheduler when an executor is lost so we can re-enqueue
our tasks */
override def executorLost(execId: String, host: String, reason:
ExecutorLossReason): Unit = {
- // Re-enqueue any tasks that ran on the failed executor if this is a
shuffle map stage,
- // and we are not using an external shuffle server which could serve the
shuffle outputs.
- // The reason is the next stage wouldn't be able to fetch the data from
this dead executor
- // so we would need to rerun these tasks on other executors.
- if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled
&& !isZombie) {
+ // Re-enqueue any tasks with potential shuffle data loss that ran on the
failed executor
+ // if this is a shuffle map stage, and we are not using an external
shuffle server which
+ // could serve the shuffle outputs or the executor lost is caused by
decommission (which
+ // can destroy the whole host). The reason is the next stage wouldn't be
able to fetch the
+ // data from this dead executor so we would need to rerun these tasks on
other executors.
+ val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
+ (reason.isInstanceOf[ExecutorDecommission] ||
!env.blockManager.externalShuffleServiceEnabled)
+ if (maybeShuffleMapOutputLoss && !isZombie) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = info.index
+ lazy val isShuffleMapOutputAvailable = reason match {
+ case ExecutorDecommission(_, _) =>
+ val mapId = if (conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
+ info.partitionId
+ } else {
+ tid
+ }
+ val locationOpt =
env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+ .getMapOutputLocation(shuffleId.get, mapId)
+ // There are 3 cases of locationOpt:
+ // 1) locationOpt.isDefined && locationOpt.get.host == host:
+ // this case implies that the shuffle map output is still on
the lost executor. The
+ // map output file is supposed to lose so we should rerun this
task;
+ // 2) locationOpt.isDefined && locationOpt.get.host != host:
+ // this case implies that the shuffle map output has been
migrated to another
+ // host. The task doesn't need to rerun;
+ // 3) locationOpt.isEmpty:
+ // This shouldn't not happen ideally since TaskSetManager
handles executor lost first
+ // before DAGScheduler. So the map statues for the successful
task must be available
+ // at this moment. keep it here in case the handling order
changes.
+ locationOpt.exists(_.host != host)
+
+ case _ => false
+ }
// We may have a running task whose partition has been marked as
successful,
// this partition has another task completed in another stage attempt.
// We treat it as a running task and will call handleFailedTask later.
- if (successful(index) && !info.running &&
!killedByOtherAttempt.contains(tid)) {
+ if (successful(index) && !info.running &&
!killedByOtherAttempt.contains(tid) &&
+ !isShuffleMapOutputAvailable) {
successful(index) = false
copiesRunning(index) -= 1
tasksSuccessful -= 1
diff --git
a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala
index c2486b9650d..fe9bce770f5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala
@@ -321,6 +321,9 @@ class DecommissionWorkerSuite
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+ // Task resubmit is a signal to DAGScheduler not a real task end event.
Ignore it here
+ // to avoid over count.
+ if (taskEnd.reason == Resubmitted) return
val taskSignature = getSignature(taskEnd.taskInfo, taskEnd.stageId,
taskEnd.stageAttemptId)
logInfo(s"Task End $taskSignature")
tasksFinished.add(taskSignature)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index fdd89378927..6ab56d3fffe 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -73,7 +73,7 @@ object FakeTask {
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil)
}
- new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null,
rpId)
+ new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null,
rpId, None)
}
def createShuffleMapTaskSet(
@@ -100,7 +100,7 @@ object FakeTask {
SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array())
}
new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null,
- ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, Some(0))
}
def createBarrierTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*):
TaskSet = {
@@ -129,6 +129,6 @@ object FakeTask {
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil,
isBarrier = true)
}
- new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null,
rpId)
+ new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null,
rpId, None)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
index fa2c5eaee8b..85ade97eb92 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
@@ -45,7 +45,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
new FakeTask(stageId, i, Nil)
}
new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null,
- ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID), 0)
+ ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None), 0)
}
def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId:
Int): Unit = {
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index eec5449bc72..af4cf8731b6 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -531,7 +531,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
val numFreeCores = 1
val taskSet = new TaskSet(
Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0,
1)),
- 0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ 0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None)
val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0",
"host0", taskCpus),
new WorkerOffer("executor1", "host1", numFreeCores))
taskScheduler.submitTasks(taskSet)
@@ -546,7 +546,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
taskScheduler.submitTasks(FakeTask.createTaskSet(1))
val taskSet2 = new TaskSet(
Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0,
1)),
- 1, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ 1, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None)
taskScheduler.submitTasks(taskSet2)
taskDescriptions =
taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
@@ -2160,7 +2160,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
override def index: Int = 1
}, 1, Seq(TaskLocation("host1", "executor1")), new Properties, null)
- val taskSet = new TaskSet(Array(task1, task2), 0, 0, 0, null, 0)
+ val taskSet = new TaskSet(Array(task1, task2), 0, 0, 0, null, 0, Some(0))
taskScheduler.submitTasks(taskSet)
val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index a3b9eff8084..45360f486ed 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -670,6 +670,42 @@ class TaskSetManagerSuite
assert(manager.resourceOffer("execA", "host1", ANY)._1.isDefined)
}
+ test("SPARK-41469: task doesn't need to rerun on executor lost if shuffle
data has migrated") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc)
+ val backend = mock(classOf[SchedulerBackend])
+ doNothing().when(backend).reviveOffers()
+ sched.initialize(backend)
+
+ sched.addExecutor("exec0", "host0")
+
+ val mapOutputTracker =
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+ mapOutputTracker.registerShuffle(0, 2, 0)
+
+ val taskSet = FakeTask.createShuffleMapTaskSet(2, 0, 0,
+ Seq(TaskLocation("host0", "exec0")), Seq(TaskLocation("host1", "exec1")))
+ sched.submitTasks(taskSet)
+ val manager = sched.taskSetManagerForAttempt(0, 0).get
+
+ // Schedule task 0 and mark it as completed with shuffle map output
registered
+ val taskDesc = manager.resourceOffer("exec0", "host0", PROCESS_LOCAL)._1
+ assert(taskDesc.isDefined)
+ val taskIndex = taskDesc.get.index
+ val taskId = taskDesc.get.taskId
+ manager.handleSuccessfulTask(taskId, createTaskResult(taskId.toInt))
+ mapOutputTracker.registerMapOutput(0, taskIndex,
+ MapStatus(BlockManagerId("exec0", "host0", 8848), Array(1024), taskId))
+
+ // Mock executor "exec0" decommission and migrate shuffle map output of
task 0
+ manager.executorDecommission("exec0")
+ mapOutputTracker.updateMapOutput(0, taskId, BlockManagerId("exec1",
"host1", 8848))
+
+ // Trigger executor "exec0" lost. Since the map output of task 0 has been
migrated, it doesn't
+ // need to rerun. So task 0 should still remain in the successful status.
+ manager.executorLost("exec0", "host0", ExecutorDecommission())
+ assert(manager.successful(taskIndex))
+ }
+
test("SPARK-32653: Decommissioned host should not be used to calculate
locality levels") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc)
@@ -770,7 +806,7 @@ class TaskSetManagerSuite
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0,
- null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
assert(!manager.emittedTaskSizeWarning)
@@ -786,7 +822,7 @@ class TaskSetManagerSuite
val taskSet = new TaskSet(
Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0,
1)),
- 0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ 0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
intercept[TaskNotSerializableException] {
@@ -866,7 +902,7 @@ class TaskSetManagerSuite
override def index: Int = 0
}, 1, Seq(TaskLocation("host1", "execA")), new Properties, null)
val taskSet = new TaskSet(Array(singleTask), 0, 0, 0,
- null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, Some(0))
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
// Offer host1, which should be accepted as a PROCESS_LOCAL location
@@ -2192,7 +2228,7 @@ class TaskSetManagerSuite
val tasks = Array.tabulate[Task[_]](2)(partition => new
FakeLongTasks(stageId = 0, partition))
val taskSet: TaskSet = new TaskSet(tasks, stageId = 0, stageAttemptId = 0,
priority = 0, null,
- ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None)
val stageId = taskSet.stageId
val stageAttemptId = taskSet.stageAttemptId
sched.submitTasks(taskSet)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]