[ 
https://issues.apache.org/jira/browse/SPARK-10796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SuYan updated SPARK-10796:
--------------------------
    Description: 

{code}

 test("Resubmit stage while lost partition in ZombieTasksets or 
RemovedTaskSets") {
    val firstRDD = new MyRDD(sc, 3, Nil)
    val firstShuffleDep = new ShuffleDependency(firstRDD, new 
HashPartitioner(3))
    val firstShuffleId = firstShuffleDep.shuffleId
    val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(3))
    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
    submit(reduceRdd, Array(0))

    // things start out smoothly, stage 0 completes with no issues
    complete(taskSets(0), Seq(
      (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
      (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
      (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
    ))

    // then start running stage 1
    runEvent(makeCompletionEvent(
      taskSets(1).tasks(0),
      Success,
      makeMapStatus("hostD", shuffleMapRdd.partitions.length)))

    // simulate make stage 1 resubmit, notice for stage1.0
    // partitionId=1 already finished in hostD, so if we resubmit stage1,
    // stage 1.1 only resubmit tasks for partitionId = 0,2
    runEvent(makeCompletionEvent(
      taskSets(1).tasks(1),
      FetchFailed(null, firstShuffleId, 2, 1, "Fetch failed"), null))
    scheduler.resubmitFailedStages()

    val stage1Resubmit1 = taskSets(2)
    assert(stage1Resubmit1.stageId == 1)
    assert(stage1Resubmit1.tasks.size == 2)

    // now exec-hostD lost, so the output loc of stage1 partitionId=1 will lost.
    runEvent(ExecutorLost("exec-hostD"))
    runEvent(makeCompletionEvent(taskSets(1).tasks(0), Resubmitted, null))

    // let stage1Resubmit1 complete
    complete(taskSets(2), Seq(
      (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
      (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length))
    ))

    // and let we complete tasksets1.0's active running Tasks
    runEvent(makeCompletionEvent(
      taskSets(1).tasks(1),
      Success,
      makeMapStatus("hostD", shuffleMapRdd.partitions.length)))

    runEvent(makeCompletionEvent(
      taskSets(1).tasks(2),
      Success,
      makeMapStatus("hostD", shuffleMapRdd.partitions.length)))

    // Now all runningTasksets for stage1 was all completed. 
    assert(scheduler.runningStages.head.pendingPartitions.head == 0)

{code}




  was:
We meet that problem in Spark 1.3.0, and I also check the latest Spark code, 
and I think that problem still exist.

1. We know a running *ShuffleMapStage* will have multiple *TaskSet*: one Active 
TaskSet, multiple Zombie TaskSet. 
2. We think a running *ShuffleMapStage* is success only if its partition are 
all process success, namely each task‘s *MapStatus* are all add into 
*outputLocs*
3. *MapStatus* of running *ShuffleMapStage* may succeed by Zombie TaskSet1 / 
Zombie TaskSet2 /..../ Active TaskSetN, and may some MapStatus only belong to 
one TaskSet, and may be a Zombie TaskSet.
4. If lost a executor, it chanced that some lost-executor related *MapStatus* 
are succeed by some Zombie TaskSet.  In current logical, The solution to 
resolved that lost *MapStatus* problem is, each *TaskSet* re-running that those 
tasks which succeed in lost-executor: re-add into *TaskSet's pendingTasks*, and 
re-add it paritions into *Stage‘s pendingPartitions* . but it is useless if 
that lost *MapStatus* only belong to *Zombie TaskSet*, it is Zombie, so will 
never be scheduled his *pendingTasks*
5. The condition for resubmit stage is only if some task throws 
*FetchFailedException*, but may the lost-executor just not empty any 
*MapStatus* of parent Stage for one of running Stages, and it‘s happen to that 
running Stage was lost a *MapStatus*  only belong to a *ZombieTask*. So if all 
Zombie TaskSets are all processed his runningTasks and Active TaskSet are all 
processed his pendingTask, then will removed by *TaskSchedulerImp*, then that 
running Stage's *pending partitions* is still nonEmpty. it will hangs......




> The Stage taskSets may are all removed while stage still have pending 
> partitions after having lost some executors
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-10796
>                 URL: https://issues.apache.org/jira/browse/SPARK-10796
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 1.3.0, 1.4.0, 1.5.0
>            Reporter: SuYan
>            Priority: Minor
>
> {code}
>  test("Resubmit stage while lost partition in ZombieTasksets or 
> RemovedTaskSets") {
>     val firstRDD = new MyRDD(sc, 3, Nil)
>     val firstShuffleDep = new ShuffleDependency(firstRDD, new 
> HashPartitioner(3))
>     val firstShuffleId = firstShuffleDep.shuffleId
>     val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
>     val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
> HashPartitioner(3))
>     val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
>     submit(reduceRdd, Array(0))
>     // things start out smoothly, stage 0 completes with no issues
>     complete(taskSets(0), Seq(
>       (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
>       (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
>       (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
>     ))
>     // then start running stage 1
>     runEvent(makeCompletionEvent(
>       taskSets(1).tasks(0),
>       Success,
>       makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
>     // simulate make stage 1 resubmit, notice for stage1.0
>     // partitionId=1 already finished in hostD, so if we resubmit stage1,
>     // stage 1.1 only resubmit tasks for partitionId = 0,2
>     runEvent(makeCompletionEvent(
>       taskSets(1).tasks(1),
>       FetchFailed(null, firstShuffleId, 2, 1, "Fetch failed"), null))
>     scheduler.resubmitFailedStages()
>     val stage1Resubmit1 = taskSets(2)
>     assert(stage1Resubmit1.stageId == 1)
>     assert(stage1Resubmit1.tasks.size == 2)
>     // now exec-hostD lost, so the output loc of stage1 partitionId=1 will 
> lost.
>     runEvent(ExecutorLost("exec-hostD"))
>     runEvent(makeCompletionEvent(taskSets(1).tasks(0), Resubmitted, null))
>     // let stage1Resubmit1 complete
>     complete(taskSets(2), Seq(
>       (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
>       (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length))
>     ))
>     // and let we complete tasksets1.0's active running Tasks
>     runEvent(makeCompletionEvent(
>       taskSets(1).tasks(1),
>       Success,
>       makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
>     runEvent(makeCompletionEvent(
>       taskSets(1).tasks(2),
>       Success,
>       makeMapStatus("hostD", shuffleMapRdd.partitions.length)))
>     // Now all runningTasksets for stage1 was all completed. 
>     assert(scheduler.runningStages.head.pendingPartitions.head == 0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to