[ 
https://issues.apache.org/jira/browse/SPARK-10796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-10796.
-------------------------------
    Resolution: Won't Fix

> The Stage taskSets may are all removed while stage still have pending 
> partitions after having lost some executors
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-10796
>                 URL: https://issues.apache.org/jira/browse/SPARK-10796
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 1.3.0, 1.4.0, 1.5.0
>            Reporter: SuYan
>            Priority: Minor
>
> desc:
> 1. We know a running ShuffleMapStage will have multiple TaskSet: one Active 
> TaskSet, multiple Zombie TaskSet, and mutiple removedTaskSet
> We think a running ShuffleMapStage is success only if its partition are all 
> process success, namely each task‘s MapStatus are all add into outputLocs
> MapStatus of running ShuffleMapStage may succeed by RemovedTaskSet1../Zombie 
> TaskSet1 / Zombie TaskSet2 /..../ Active TaskSetN. So it had a chance that 
> some output only hold by some RemovedTaskset or ZombieTaskSet.
> If lost a executor, it chanced that some lost-executor related MapStatus are 
> succeed by some Zombie TaskSet.
> In current logical, The solution to resolved that lost MapStatus problem is,
> each TaskSet re-running that those tasks which succeed in lost-executor: 
> re-add into TaskSet's pendingTasks, 
> and re-add it paritions into Stage‘s pendingPartitions .
> but it is useless if that lost MapStatus only belong to Zombie/Removed 
> TaskSet, it is Zombie, so will never be scheduled his pendingTasks
> The condition for resubmit stage is only if some task throws 
> FetchFailedException, but may the lost-executor just not empty any MapStatus 
> of parent Stage for one of running Stages, 
> and it‘s happen to that running Stage was lost a MapStatus only belong to a 
> ZombieTask or removedTaskset.
> So if all Zombie TaskSets are all processed his runningTasks and Active 
> TaskSet are all processed his pendingTask, then will removed by 
> TaskSchedulerImp, then that running Stage's pending partitions is still 
> nonEmpty. it will hangs......
> {code}
>  test("Resubmit stage while lost partition in ZombieTasksets or 
> RemovedTaskSets") {
>     val firstRDD = new MyRDD(sc, 3, Nil)
>     val firstShuffleDep = new ShuffleDependency(firstRDD, new 
> HashPartitioner(3))
>     val firstShuffleId = firstShuffleDep.shuffleId
>     val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
>     val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
> HashPartitioner(3))
>     val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
>     submit(reduceRdd, Array(0))
>     // things start out smoothly, stage 0 completes with no issues
>     complete(taskSets(0), Seq(
>       (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
>       (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
>       (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
>     ))
>     // then start running stage 1
>     runEvent(makeCompletionEvent(
>       taskSets(1).tasks(0),
>       Success,
>       makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
>     // simulate make stage 1 resubmit, notice for stage1.0
>     // partitionId=1 already finished in hostD, so if we resubmit stage1,
>     // stage 1.1 only resubmit tasks for partitionId = 0,2
>     runEvent(makeCompletionEvent(
>       taskSets(1).tasks(1),
>       FetchFailed(null, firstShuffleId, 2, 1, "Fetch failed"), null))
>     scheduler.resubmitFailedStages()
>     val stage1Resubmit1 = taskSets(2)
>     assert(stage1Resubmit1.stageId == 1)
>     assert(stage1Resubmit1.tasks.size == 2)
>     // now exec-hostD lost, so the output loc of stage1 partitionId=1 will 
> lost.
>     runEvent(ExecutorLost("exec-hostD"))
>     runEvent(makeCompletionEvent(taskSets(1).tasks(0), Resubmitted, null))
>     // let stage1Resubmit1 complete
>     complete(taskSets(2), Seq(
>       (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
>       (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length))
>     ))
>     // and let we complete tasksets1.0's active running Tasks
>     runEvent(makeCompletionEvent(
>       taskSets(1).tasks(1),
>       Success,
>       makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
>     runEvent(makeCompletionEvent(
>       taskSets(1).tasks(2),
>       Success,
>       makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
>     // Now all runningTasksets for stage1 was all completed. 
>     assert(scheduler.runningStages.head.pendingPartitions.head == 0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to