[
https://issues.apache.org/jira/browse/SPARK-10796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sean Owen resolved SPARK-10796.
-------------------------------
Resolution: Won't Fix
> The Stage taskSets may are all removed while stage still have pending
> partitions after having lost some executors
> -----------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-10796
> URL: https://issues.apache.org/jira/browse/SPARK-10796
> Project: Spark
> Issue Type: Bug
> Components: Scheduler
> Affects Versions: 1.3.0, 1.4.0, 1.5.0
> Reporter: SuYan
> Priority: Minor
>
> desc:
> 1. We know a running ShuffleMapStage will have multiple TaskSet: one Active
> TaskSet, multiple Zombie TaskSet, and mutiple removedTaskSet
> We think a running ShuffleMapStage is success only if its partition are all
> process success, namely each task‘s MapStatus are all add into outputLocs
> MapStatus of running ShuffleMapStage may succeed by RemovedTaskSet1../Zombie
> TaskSet1 / Zombie TaskSet2 /..../ Active TaskSetN. So it had a chance that
> some output only hold by some RemovedTaskset or ZombieTaskSet.
> If lost a executor, it chanced that some lost-executor related MapStatus are
> succeed by some Zombie TaskSet.
> In current logical, The solution to resolved that lost MapStatus problem is,
> each TaskSet re-running that those tasks which succeed in lost-executor:
> re-add into TaskSet's pendingTasks,
> and re-add it paritions into Stage‘s pendingPartitions .
> but it is useless if that lost MapStatus only belong to Zombie/Removed
> TaskSet, it is Zombie, so will never be scheduled his pendingTasks
> The condition for resubmit stage is only if some task throws
> FetchFailedException, but may the lost-executor just not empty any MapStatus
> of parent Stage for one of running Stages,
> and it‘s happen to that running Stage was lost a MapStatus only belong to a
> ZombieTask or removedTaskset.
> So if all Zombie TaskSets are all processed his runningTasks and Active
> TaskSet are all processed his pendingTask, then will removed by
> TaskSchedulerImp, then that running Stage's pending partitions is still
> nonEmpty. it will hangs......
> {code}
> test("Resubmit stage while lost partition in ZombieTasksets or
> RemovedTaskSets") {
> val firstRDD = new MyRDD(sc, 3, Nil)
> val firstShuffleDep = new ShuffleDependency(firstRDD, new
> HashPartitioner(3))
> val firstShuffleId = firstShuffleDep.shuffleId
> val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
> val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
> HashPartitioner(3))
> val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
> submit(reduceRdd, Array(0))
> // things start out smoothly, stage 0 completes with no issues
> complete(taskSets(0), Seq(
> (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
> (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
> (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
> ))
> // then start running stage 1
> runEvent(makeCompletionEvent(
> taskSets(1).tasks(0),
> Success,
> makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
> // simulate make stage 1 resubmit, notice for stage1.0
> // partitionId=1 already finished in hostD, so if we resubmit stage1,
> // stage 1.1 only resubmit tasks for partitionId = 0,2
> runEvent(makeCompletionEvent(
> taskSets(1).tasks(1),
> FetchFailed(null, firstShuffleId, 2, 1, "Fetch failed"), null))
> scheduler.resubmitFailedStages()
> val stage1Resubmit1 = taskSets(2)
> assert(stage1Resubmit1.stageId == 1)
> assert(stage1Resubmit1.tasks.size == 2)
> // now exec-hostD lost, so the output loc of stage1 partitionId=1 will
> lost.
> runEvent(ExecutorLost("exec-hostD"))
> runEvent(makeCompletionEvent(taskSets(1).tasks(0), Resubmitted, null))
> // let stage1Resubmit1 complete
> complete(taskSets(2), Seq(
> (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
> (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length))
> ))
> // and let we complete tasksets1.0's active running Tasks
> runEvent(makeCompletionEvent(
> taskSets(1).tasks(1),
> Success,
> makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
> runEvent(makeCompletionEvent(
> taskSets(1).tasks(2),
> Success,
> makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
> // Now all runningTasksets for stage1 was all completed.
> assert(scheduler.runningStages.head.pendingPartitions.head == 0)
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]