[ 
https://issues.apache.org/jira/browse/SPARK-19263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jin xing updated SPARK-19263:
-----------------------------
    Description: 
In current *DAGScheduler handleTaskCompletion* code, when *event.reason* is 
*Success*, it will first do *stage.pendingPartitions -= task.partitionId*, 
which maybe a bug when *FetchFailed* happens. Think about below:

1. Stage 0 runs and generates shuffle output data.
2. Stage 1 reads the output from stage 0 and generates more shuffle data. It 
has two tasks: ShuffleMapTask1 and ShuffleMapTask2, and these tasks are 
launched on executorA.
3. ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to the 
driver. The driver marks executorA as lost and updates failedEpoch;
4. The driver resubmits stage 0 so the missing output can be re-generated, and 
then once it completes, resubmits stage 1 with ShuffleMapTask1x and 
ShuffleMapTask2x.
5. ShuffleMapTask2 (from the original attempt of stage 1) successfully finishes 
on executorA and sends Success back to driver. This causes 
DAGScheduler::handleTaskCompletion to remove partition 2 from 
stage.pendingPartitions (line 1149), but it does not add the partition to the 
set of output locations (line 1192), because the task’s epoch is less than the 
failure epoch for the executor (because of the earlier failure on executor A)
6. ShuffleMapTask1x successfully finishes on executorB, causing the driver to 
remove partition 1 from stage.pendingPartitions. Combined with the previous 
step, this means that there are no more pending partitions for the stage, so 
the DAGScheduler marks the stage as finished (line 1196). However, the shuffle 
stage is not available (line 1215) because the completion for ShuffleMapTask2 
was ignored because of its epoch, so the DAGScheduler resubmits the stage.
7. ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks is 
called for the re-submitted stage, it throws an error, because there’s an 
existing active task set

To reproduce the bug:
1. We need to do some modification in *ShuffleBlockFetcherIterator*: check 
whether the task's index in *TaskSetManager* and stage attempt equal to 0 at 
the same time, if so, throw FetchFailedException;
2. Rebuild spark then submit following job:
{code}
    val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, 
3), (2, 1), (3, 1)), 2)
    rdd.reduceByKey {
      (v1, v2) => {
        Thread.sleep(10000)
        v1 + v2
      }
    }.map {
      keyAndValue => {
        (keyAndValue._1 % 2, keyAndValue._2)
      }
    }.reduceByKey {
      (v1, v2) => {
        Thread.sleep(10000)
        v1 + v2

      }
    }.collect
{code}

  was:
In current *DAGScheduler handleTaskCompletion* code, when *event.reason* is 
*Success*, it will first do *stage.pendingPartitions -= task.partitionId*, 
which maybe a bug when *FetchFailed* happens. Think about below:
1. There are 2 executors A and B, executorA got assigned with ShuffleMapTask1 
and ShuffleMapTask2;
2. ShuffleMapTask1 want's to fetch blocks from local but failed;
3. Driver receives the *FetchFailed* caused by ShuffleMapTask1 on executorA and 
marks executorA as lost and updates *failedEpoch*;
4. Driver resubmits stages, containing ShuffleMapTask1x and ShuffleMapTask2x;
5. ShuffleMapTask2 is successfully finished on executorA and sends *Success* 
back to driver;
6. Driver receives *Success* and do *stage.pendingPartitions -= 
task.partitionId*, but then driver finds task's epoch is not big enough *<= 
failedEpoch(execId)* and just takes it as bogus, does not add the *MapStatus* 
to stage;
7. ShuffleMapTask1x is successfully finished on executorB;
8. Driver receives *Success* from ShuffleMapTask1x on executorB and does 
*stage.pendingPartitions -= task.partitionId*, thus no pending partitions, but 
then finds not all partitions are available because of step 6;
9. Driver resubmits stage; but at this moment ShuffleMapTask2x is still 
running; in *TaskSchedulerImpl submitTasks*, it finds *conflictingTaskSet*, 
then throw *IllegalStateException*
10. Failed.

To reproduce the bug:
1. We need to do some modification in *ShuffleBlockFetcherIterator*: check 
whether the task's index in *TaskSetManager* and stage attempt equal to 0 at 
the same time, if so, throw FetchFailedException;
2. Rebuild spark then submit following job:
{code}
    val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, 
3), (2, 1), (3, 1)), 2)
    rdd.reduceByKey {
      (v1, v2) => {
        Thread.sleep(10000)
        v1 + v2
      }
    }.map {
      keyAndValue => {
        (keyAndValue._1 % 2, keyAndValue._2)
      }
    }.reduceByKey {
      (v1, v2) => {
        Thread.sleep(10000)
        v1 + v2

      }
    }.collect
{code}


> DAGScheduler should avoid sending conflicting task set.
> -------------------------------------------------------
>
>                 Key: SPARK-19263
>                 URL: https://issues.apache.org/jira/browse/SPARK-19263
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.1.0
>            Reporter: jin xing
>
> In current *DAGScheduler handleTaskCompletion* code, when *event.reason* is 
> *Success*, it will first do *stage.pendingPartitions -= task.partitionId*, 
> which maybe a bug when *FetchFailed* happens. Think about below:
> 1. Stage 0 runs and generates shuffle output data.
> 2. Stage 1 reads the output from stage 0 and generates more shuffle data. It 
> has two tasks: ShuffleMapTask1 and ShuffleMapTask2, and these tasks are 
> launched on executorA.
> 3. ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to 
> the driver. The driver marks executorA as lost and updates failedEpoch;
> 4. The driver resubmits stage 0 so the missing output can be re-generated, 
> and then once it completes, resubmits stage 1 with ShuffleMapTask1x and 
> ShuffleMapTask2x.
> 5. ShuffleMapTask2 (from the original attempt of stage 1) successfully 
> finishes on executorA and sends Success back to driver. This causes 
> DAGScheduler::handleTaskCompletion to remove partition 2 from 
> stage.pendingPartitions (line 1149), but it does not add the partition to the 
> set of output locations (line 1192), because the task’s epoch is less than 
> the failure epoch for the executor (because of the earlier failure on 
> executor A)
> 6. ShuffleMapTask1x successfully finishes on executorB, causing the driver to 
> remove partition 1 from stage.pendingPartitions. Combined with the previous 
> step, this means that there are no more pending partitions for the stage, so 
> the DAGScheduler marks the stage as finished (line 1196). However, the 
> shuffle stage is not available (line 1215) because the completion for 
> ShuffleMapTask2 was ignored because of its epoch, so the DAGScheduler 
> resubmits the stage.
> 7. ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks 
> is called for the re-submitted stage, it throws an error, because there’s an 
> existing active task set
> To reproduce the bug:
> 1. We need to do some modification in *ShuffleBlockFetcherIterator*: check 
> whether the task's index in *TaskSetManager* and stage attempt equal to 0 at 
> the same time, if so, throw FetchFailedException;
> 2. Rebuild spark then submit following job:
> {code}
>     val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, 
> 3), (2, 1), (3, 1)), 2)
>     rdd.reduceByKey {
>       (v1, v2) => {
>         Thread.sleep(10000)
>         v1 + v2
>       }
>     }.map {
>       keyAndValue => {
>         (keyAndValue._1 % 2, keyAndValue._2)
>       }
>     }.reduceByKey {
>       (v1, v2) => {
>         Thread.sleep(10000)
>         v1 + v2
>       }
>     }.collect
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to