Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212206542 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is INDETERMINATE, which means the map tasks may return + // different result when re-try, we need to re-try all the tasks of the failed + // stage and its succeeding stages, because the input data will be changed after the + // map tasks are re-tried. + // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is + // guaranteed to be idempotent, so the input data of the reducers will not change even + // if the map tasks are re-tried. + if (mapStage.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { + def rollBackStage(stage: Stage): Unit = stage match { + case mapStage: ShuffleMapStage => + val numMissingPartitions = mapStage.findMissingPartitions().length + if (numMissingPartitions < mapStage.numTasks) { + markStageAsFinished( + mapStage, + Some("preceding shuffle map stage with random output gets retried."), + willRetry = true) + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + failedStages += mapStage + } + + case resultStage => + val numMissingPartitions = resultStage.findMissingPartitions().length + if (numMissingPartitions < resultStage.numTasks) { --- End diff -- when run `first()`, I think `numTasks` will be 1, but `numPartitions` can be much larger.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org