[ 
https://issues.apache.org/jira/browse/SPARK-5259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Imran Rashid updated SPARK-5259:
--------------------------------
    Description: 
Track pending tasks by partition ID instead of Task objects.

Before this change, failure & retry could result in a case where a stage got 
submitted before the map output from its dependencies get registered. This was 
due to an error in the condition for registering map outputs.

More complete explanation of the original problem:

1. while shuffle stage was retry, there may have 2 taskSet running. 

we call the 2 taskSet:taskSet0.0, taskSet0.1, and we know, taskSet0.1 will 
re-run taskSet0.0's un-complete task

if taskSet0.0 was run all the task that the taskSet0.1 not complete yet but 
covered the partitions.

then stage is Available is true.
{code}
  def isAvailable: Boolean = {
    if (!isShuffleMap) {
      true
    } else {
      numAvailableOutputs == numPartitions
    }
  } 
{code}

but stage.pending task is not empty, to protect register mapStatus in 
mapOutputTracker.

because if task is complete success, pendingTasks is minus Task in 
reference-level because the task is not override hashcode() and equals()
pendingTask -= task

but numAvailableOutputs is according to partitionID.

here is the testcase to prove:

{code}
  test("Make sure mapStage.pendingtasks is set() " +
    "while MapStage.isAvailable is true while stage was retry ") {
    val firstRDD = new MyRDD(sc, 6, Nil)
    val firstShuffleDep = new ShuffleDependency(firstRDD, null)
    val firstShuyffleId = firstShuffleDep.shuffleId
    val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep))
    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
    val shuffleId = shuffleDep.shuffleId
    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
    submit(reduceRdd, Array(0, 1))
    complete(taskSets(0), Seq(
      (Success, makeMapStatus("hostB", 1)),
      (Success, makeMapStatus("hostB", 2)),
      (Success, makeMapStatus("hostC", 3)),
      (Success, makeMapStatus("hostB", 4)),
      (Success, makeMapStatus("hostB", 5)),
      (Success, makeMapStatus("hostC", 6))
    ))
    complete(taskSets(1), Seq(
      (Success, makeMapStatus("hostA", 1)),
      (Success, makeMapStatus("hostB", 2)),
      (Success, makeMapStatus("hostA", 1)),
      (Success, makeMapStatus("hostB", 2)),
      (Success, makeMapStatus("hostA", 1))
    ))
    runEvent(ExecutorLost("exec-hostA"))
    runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, null, null, 
null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, null, null, 
null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(0),
      FetchFailed(null, firstShuyffleId, -1, 0, "Fetch Mata data failed"),
      null, null, null, null))
    scheduler.resubmitFailedStages()
    runEvent(CompletionEvent(taskSets(1).tasks(0), Success,
      makeMapStatus("hostC", 1), null, null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(2), Success,
      makeMapStatus("hostC", 1), null, null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(4), Success,
      makeMapStatus("hostC", 1), null, null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(5), Success,
      makeMapStatus("hostB", 2), null, null, null))
    val stage = scheduler.stageIdToStage(taskSets(1).stageId)
    assert(stage.attemptId == 2)
    assert(stage.isAvailable)
    assert(stage.pendingTasks.size == 0)
  }


{code}


  was:
1. while shuffle stage was retry, there may have 2 taskSet running. 

we call the 2 taskSet:taskSet0.0, taskSet0.1, and we know, taskSet0.1 will 
re-run taskSet0.0's un-complete task

if taskSet0.0 was run all the task that the taskSet0.1 not complete yet but 
covered the partitions.

then stage is Available is true.
{code}
  def isAvailable: Boolean = {
    if (!isShuffleMap) {
      true
    } else {
      numAvailableOutputs == numPartitions
    }
  } 
{code}

but stage.pending task is not empty, to protect register mapStatus in 
mapOutputTracker.

because if task is complete success, pendingTasks is minus Task in 
reference-level because the task is not override hashcode() and equals()
pendingTask -= task

but numAvailableOutputs is according to partitionID.

here is the testcase to prove:

{code}
  test("Make sure mapStage.pendingtasks is set() " +
    "while MapStage.isAvailable is true while stage was retry ") {
    val firstRDD = new MyRDD(sc, 6, Nil)
    val firstShuffleDep = new ShuffleDependency(firstRDD, null)
    val firstShuyffleId = firstShuffleDep.shuffleId
    val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep))
    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
    val shuffleId = shuffleDep.shuffleId
    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
    submit(reduceRdd, Array(0, 1))
    complete(taskSets(0), Seq(
      (Success, makeMapStatus("hostB", 1)),
      (Success, makeMapStatus("hostB", 2)),
      (Success, makeMapStatus("hostC", 3)),
      (Success, makeMapStatus("hostB", 4)),
      (Success, makeMapStatus("hostB", 5)),
      (Success, makeMapStatus("hostC", 6))
    ))
    complete(taskSets(1), Seq(
      (Success, makeMapStatus("hostA", 1)),
      (Success, makeMapStatus("hostB", 2)),
      (Success, makeMapStatus("hostA", 1)),
      (Success, makeMapStatus("hostB", 2)),
      (Success, makeMapStatus("hostA", 1))
    ))
    runEvent(ExecutorLost("exec-hostA"))
    runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, null, null, 
null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, null, null, 
null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(0),
      FetchFailed(null, firstShuyffleId, -1, 0, "Fetch Mata data failed"),
      null, null, null, null))
    scheduler.resubmitFailedStages()
    runEvent(CompletionEvent(taskSets(1).tasks(0), Success,
      makeMapStatus("hostC", 1), null, null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(2), Success,
      makeMapStatus("hostC", 1), null, null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(4), Success,
      makeMapStatus("hostC", 1), null, null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(5), Success,
      makeMapStatus("hostB", 2), null, null, null))
    val stage = scheduler.stageIdToStage(taskSets(1).stageId)
    assert(stage.attemptId == 2)
    assert(stage.isAvailable)
    assert(stage.pendingTasks.size == 0)
  }


{code}



> Do not submit stage until its dependencies map outputs are registered
> ---------------------------------------------------------------------
>
>                 Key: SPARK-5259
>                 URL: https://issues.apache.org/jira/browse/SPARK-5259
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.1.1, 1.2.0
>            Reporter: SuYan
>            Priority: Critical
>
> Track pending tasks by partition ID instead of Task objects.
> Before this change, failure & retry could result in a case where a stage got 
> submitted before the map output from its dependencies get registered. This 
> was due to an error in the condition for registering map outputs.
> More complete explanation of the original problem:
> 1. while shuffle stage was retry, there may have 2 taskSet running. 
> we call the 2 taskSet:taskSet0.0, taskSet0.1, and we know, taskSet0.1 will 
> re-run taskSet0.0's un-complete task
> if taskSet0.0 was run all the task that the taskSet0.1 not complete yet but 
> covered the partitions.
> then stage is Available is true.
> {code}
>   def isAvailable: Boolean = {
>     if (!isShuffleMap) {
>       true
>     } else {
>       numAvailableOutputs == numPartitions
>     }
>   } 
> {code}
> but stage.pending task is not empty, to protect register mapStatus in 
> mapOutputTracker.
> because if task is complete success, pendingTasks is minus Task in 
> reference-level because the task is not override hashcode() and equals()
> pendingTask -= task
> but numAvailableOutputs is according to partitionID.
> here is the testcase to prove:
> {code}
>   test("Make sure mapStage.pendingtasks is set() " +
>     "while MapStage.isAvailable is true while stage was retry ") {
>     val firstRDD = new MyRDD(sc, 6, Nil)
>     val firstShuffleDep = new ShuffleDependency(firstRDD, null)
>     val firstShuyffleId = firstShuffleDep.shuffleId
>     val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep))
>     val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
>     val shuffleId = shuffleDep.shuffleId
>     val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
>     submit(reduceRdd, Array(0, 1))
>     complete(taskSets(0), Seq(
>       (Success, makeMapStatus("hostB", 1)),
>       (Success, makeMapStatus("hostB", 2)),
>       (Success, makeMapStatus("hostC", 3)),
>       (Success, makeMapStatus("hostB", 4)),
>       (Success, makeMapStatus("hostB", 5)),
>       (Success, makeMapStatus("hostC", 6))
>     ))
>     complete(taskSets(1), Seq(
>       (Success, makeMapStatus("hostA", 1)),
>       (Success, makeMapStatus("hostB", 2)),
>       (Success, makeMapStatus("hostA", 1)),
>       (Success, makeMapStatus("hostB", 2)),
>       (Success, makeMapStatus("hostA", 1))
>     ))
>     runEvent(ExecutorLost("exec-hostA"))
>     runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, null, null, 
> null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, null, null, 
> null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(0),
>       FetchFailed(null, firstShuyffleId, -1, 0, "Fetch Mata data failed"),
>       null, null, null, null))
>     scheduler.resubmitFailedStages()
>     runEvent(CompletionEvent(taskSets(1).tasks(0), Success,
>       makeMapStatus("hostC", 1), null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(2), Success,
>       makeMapStatus("hostC", 1), null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(4), Success,
>       makeMapStatus("hostC", 1), null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(5), Success,
>       makeMapStatus("hostB", 2), null, null, null))
>     val stage = scheduler.stageIdToStage(taskSets(1).stageId)
>     assert(stage.attemptId == 2)
>     assert(stage.isAvailable)
>     assert(stage.pendingTasks.size == 0)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to