[
https://issues.apache.org/jira/browse/SPARK-19263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kay Ousterhout resolved SPARK-19263.
------------------------------------
Resolution: Fixed
Assignee: jin xing
Fix Version/s: 1.2.0
> DAGScheduler should avoid sending conflicting task set.
> -------------------------------------------------------
>
> Key: SPARK-19263
> URL: https://issues.apache.org/jira/browse/SPARK-19263
> Project: Spark
> Issue Type: Bug
> Components: Scheduler
> Affects Versions: 2.1.0
> Reporter: jin xing
> Assignee: jin xing
> Fix For: 1.2.0
>
>
> In current *DAGScheduler handleTaskCompletion* code, when *event.reason* is
> *Success*, it will first do *stage.pendingPartitions -= task.partitionId*,
> which maybe a bug when *FetchFailed* happens. Think about below:
> # Stage 0 runs and generates shuffle output data.
> # Stage 1 reads the output from stage 0 and generates more shuffle data. It
> has two tasks: ShuffleMapTask1 and ShuffleMapTask2, and these tasks are
> launched on executorA.
> # ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to
> the driver. The driver marks executorA as lost and updates failedEpoch;
> # The driver resubmits stage 0 so the missing output can be re-generated, and
> then once it completes, resubmits stage 1 with ShuffleMapTask1x and
> ShuffleMapTask2x.
> # ShuffleMapTask2 (from the original attempt of stage 1) successfully
> finishes on executorA and sends Success back to driver. This causes
> DAGScheduler::handleTaskCompletion to remove partition 2 from
> stage.pendingPartitions (line 1149), but it does not add the partition to the
> set of output locations (line 1192), because the task’s epoch is less than
> the failure epoch for the executor (because of the earlier failure on
> executor A)
> # ShuffleMapTask1x successfully finishes on executorB, causing the driver to
> remove partition 1 from stage.pendingPartitions. Combined with the previous
> step, this means that there are no more pending partitions for the stage, so
> the DAGScheduler marks the stage as finished (line 1196). However, the
> shuffle stage is not available (line 1215) because the completion for
> ShuffleMapTask2 was ignored because of its epoch, so the DAGScheduler
> resubmits the stage.
> # ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks
> is called for the re-submitted stage, it throws an error, because there’s an
> existing active task set
> To reproduce the bug:
> 1. We need to do some modification in *ShuffleBlockFetcherIterator*: check
> whether the task's index in *TaskSetManager* and stage attempt equal to 0 at
> the same time, if so, throw FetchFailedException;
> 2. Rebuild spark then submit following job:
> {code}
> val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0,
> 3), (2, 1), (3, 1)), 2)
> rdd.reduceByKey {
> (v1, v2) => {
> Thread.sleep(10000)
> v1 + v2
> }
> }.map {
> keyAndValue => {
> (keyAndValue._1 % 2, keyAndValue._2)
> }
> }.reduceByKey {
> (v1, v2) => {
> Thread.sleep(10000)
> v1 + v2
> }
> }.collect
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]