[
https://issues.apache.org/jira/browse/SPARK-10796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
SuYan updated SPARK-10796:
--------------------------
Description:
{code}
test("Resubmit stage while lost partition in ZombieTasksets or
RemovedTaskSets") {
val firstRDD = new MyRDD(sc, 3, Nil)
val firstShuffleDep = new ShuffleDependency(firstRDD, new
HashPartitioner(3))
val firstShuffleId = firstShuffleDep.shuffleId
val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(3))
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))
// things start out smoothly, stage 0 completes with no issues
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
(Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
))
// then start running stage 1
runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
Success,
makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
// simulate make stage 1 resubmit, notice for stage1.0
// partitionId=1 already finished in hostD, so if we resubmit stage1,
// stage 1.1 only resubmit tasks for partitionId = 0,2
runEvent(makeCompletionEvent(
taskSets(1).tasks(1),
FetchFailed(null, firstShuffleId, 2, 1, "Fetch failed"), null))
scheduler.resubmitFailedStages()
val stage1Resubmit1 = taskSets(2)
assert(stage1Resubmit1.stageId == 1)
assert(stage1Resubmit1.tasks.size == 2)
// now exec-hostD lost, so the output loc of stage1 partitionId=1 will lost.
runEvent(ExecutorLost("exec-hostD"))
runEvent(makeCompletionEvent(taskSets(1).tasks(0), Resubmitted, null))
// let stage1Resubmit1 complete
complete(taskSets(2), Seq(
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length))
))
// and let we complete tasksets1.0's active running Tasks
runEvent(makeCompletionEvent(
taskSets(1).tasks(1),
Success,
makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
runEvent(makeCompletionEvent(
taskSets(1).tasks(2),
Success,
makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
// Now all runningTasksets for stage1 was all completed.
assert(scheduler.runningStages.head.pendingPartitions.head == 0)
{code}
was:
We meet that problem in Spark 1.3.0, and I also check the latest Spark code,
and I think that problem still exist.
1. We know a running *ShuffleMapStage* will have multiple *TaskSet*: one Active
TaskSet, multiple Zombie TaskSet.
2. We think a running *ShuffleMapStage* is success only if its partition are
all process success, namely each task‘s *MapStatus* are all add into
*outputLocs*
3. *MapStatus* of running *ShuffleMapStage* may succeed by Zombie TaskSet1 /
Zombie TaskSet2 /..../ Active TaskSetN, and may some MapStatus only belong to
one TaskSet, and may be a Zombie TaskSet.
4. If lost a executor, it chanced that some lost-executor related *MapStatus*
are succeed by some Zombie TaskSet. In current logical, The solution to
resolved that lost *MapStatus* problem is, each *TaskSet* re-running that those
tasks which succeed in lost-executor: re-add into *TaskSet's pendingTasks*, and
re-add it paritions into *Stage‘s pendingPartitions* . but it is useless if
that lost *MapStatus* only belong to *Zombie TaskSet*, it is Zombie, so will
never be scheduled his *pendingTasks*
5. The condition for resubmit stage is only if some task throws
*FetchFailedException*, but may the lost-executor just not empty any
*MapStatus* of parent Stage for one of running Stages, and it‘s happen to that
running Stage was lost a *MapStatus* only belong to a *ZombieTask*. So if all
Zombie TaskSets are all processed his runningTasks and Active TaskSet are all
processed his pendingTask, then will removed by *TaskSchedulerImp*, then that
running Stage's *pending partitions* is still nonEmpty. it will hangs......
> The Stage taskSets may are all removed while stage still have pending
> partitions after having lost some executors
> -----------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-10796
> URL: https://issues.apache.org/jira/browse/SPARK-10796
> Project: Spark
> Issue Type: Bug
> Components: Scheduler
> Affects Versions: 1.3.0, 1.4.0, 1.5.0
> Reporter: SuYan
> Priority: Minor
>
> {code}
> test("Resubmit stage while lost partition in ZombieTasksets or
> RemovedTaskSets") {
> val firstRDD = new MyRDD(sc, 3, Nil)
> val firstShuffleDep = new ShuffleDependency(firstRDD, new
> HashPartitioner(3))
> val firstShuffleId = firstShuffleDep.shuffleId
> val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
> val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
> HashPartitioner(3))
> val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
> submit(reduceRdd, Array(0))
> // things start out smoothly, stage 0 completes with no issues
> complete(taskSets(0), Seq(
> (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
> (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
> (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
> ))
> // then start running stage 1
> runEvent(makeCompletionEvent(
> taskSets(1).tasks(0),
> Success,
> makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
> // simulate make stage 1 resubmit, notice for stage1.0
> // partitionId=1 already finished in hostD, so if we resubmit stage1,
> // stage 1.1 only resubmit tasks for partitionId = 0,2
> runEvent(makeCompletionEvent(
> taskSets(1).tasks(1),
> FetchFailed(null, firstShuffleId, 2, 1, "Fetch failed"), null))
> scheduler.resubmitFailedStages()
> val stage1Resubmit1 = taskSets(2)
> assert(stage1Resubmit1.stageId == 1)
> assert(stage1Resubmit1.tasks.size == 2)
> // now exec-hostD lost, so the output loc of stage1 partitionId=1 will
> lost.
> runEvent(ExecutorLost("exec-hostD"))
> runEvent(makeCompletionEvent(taskSets(1).tasks(0), Resubmitted, null))
> // let stage1Resubmit1 complete
> complete(taskSets(2), Seq(
> (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
> (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length))
> ))
> // and let we complete tasksets1.0's active running Tasks
> runEvent(makeCompletionEvent(
> taskSets(1).tasks(1),
> Success,
> makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
> runEvent(makeCompletionEvent(
> taskSets(1).tasks(2),
> Success,
> makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
> // Now all runningTasksets for stage1 was all completed.
> assert(scheduler.runningStages.head.pendingPartitions.head == 0)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]