[ 
https://issues.apache.org/jira/browse/SPARK-23811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16418434#comment-16418434
 ] 

Li Yuanjian commented on SPARK-23811:
-------------------------------------

 

The scenario can be reproduced by below test case added in 
`{{DAGSchedulerSuite`}}
{code:java}
/**
 * This tests the case where origin task success after speculative task got 
FetchFailed
 * before.
 */
test("[SPARK-23811] Fetch failed task should kill other attempt") {
  // Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- 
rddC
  val rddA = new MyRDD(sc, 2, Nil)
  val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
  val shuffleIdA = shuffleDepA.shuffleId

  val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
  val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))

  val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)

  submit(rddC, Array(0, 1))

  // Complete both tasks in rddA.
  assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
  complete(taskSets(0), Seq(
    (Success, makeMapStatus("hostA", 2)),
    (Success, makeMapStatus("hostB", 2))))

  // The first task success
  runEvent(makeCompletionEvent(
    taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))

  // The second task's speculative attempt fails first, but task self still 
running.
  // This may caused by ExecutorLost.
  runEvent(makeCompletionEvent(
    taskSets(1).tasks(1),
    FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"),
    null))
  // Check currently missing partition
  assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
  val missingPartition = 
mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get(0)

  // The second result task self success soon
  runEvent(makeCompletionEvent(
    taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
  // No missing partitions here, this will cause child stage never succeed
  assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 0)
}
{code}
 

> Same tasks' FetchFailed event comes before Success will cause child stage 
> never succeed
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-23811
>                 URL: https://issues.apache.org/jira/browse/SPARK-23811
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0, 2.3.0
>            Reporter: Li Yuanjian
>            Priority: Major
>         Attachments: 1.png, 2.png
>
>
> This is a bug caused by abnormal scenario describe below:
>  # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA
>  # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` 
> , shuffleStatus changed.
>  # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately.
>  # ShuffleMapTask 1 is the last task of its stage, so this stage will never 
> succeed because of there's no missing task DagScheduler can get.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to