Repository: spark Updated Branches: refs/heads/master 326f1d672 -> 03fdc92e4
[SPARK-22681] Accumulator should only be updated once for each task in result stage ## What changes were proposed in this pull request? As the doc says "For accumulator updates performed inside actions only, Spark guarantees that each taskâs update to the accumulator will only be applied once, i.e. restarted tasks will not update the value." But currently the code doesn't guarantee this. ## How was this patch tested? New added tests. Author: Carson Wang <carson.w...@intel.com> Closes #19877 from carsonwang/fixAccum. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03fdc92e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03fdc92e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03fdc92e Branch: refs/heads/master Commit: 03fdc92e42d260a2b7c0090115f162ba5c091aae Parents: 326f1d6 Author: Carson Wang <carson.w...@intel.com> Authored: Tue Dec 5 09:15:22 2017 -0800 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Tue Dec 5 09:15:22 2017 -0800 ---------------------------------------------------------------------- .../apache/spark/scheduler/DAGScheduler.scala | 14 +++++++++++--- .../spark/scheduler/DAGSchedulerSuite.scala | 20 ++++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/03fdc92e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9153751..c2498d4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1187,9 +1187,17 @@ class DAGScheduler( // only updated in certain cases. event.reason match { case Success => - stage match { - case rs: ResultStage if rs.activeJob.isEmpty => - // Ignore update if task's job has finished. + task match { + case rt: ResultTask[_, _] => + val resultStage = stage.asInstanceOf[ResultStage] + resultStage.activeJob match { + case Some(job) => + // Only update the accumulator once for each result task. + if (!job.finished(rt.outputId)) { + updateAccumulators(event) + } + case None => // Ignore update if task's job has finished. + } case _ => updateAccumulators(event) } http://git-wip-us.apache.org/repos/asf/spark/blob/03fdc92e/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index feefb6a..d812b5b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1832,6 +1832,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assertDataStructuresEmpty() } + test("accumulator not calculated for resubmitted task in result stage") { + val accum = AccumulatorSuite.createLongAccum("a") + val finalRdd = new MyRDD(sc, 2, Nil) + submit(finalRdd, Array(0, 1)) + // finish the first task + completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42))) + // verify stage exists + assert(scheduler.stageIdToStage.contains(0)) + + // finish the first task again (simulate a speculative task or a resubmitted task) + completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42))) + assert(results === Map(0 -> 42)) + + // The accumulator should only be updated once. + assert(accum.value === 1) + + runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success, 42)) + assertDataStructuresEmpty() + } + test("accumulators are updated on exception failures") { val acc1 = AccumulatorSuite.createLongAccum("ingenieur") val acc2 = AccumulatorSuite.createLongAccum("boulanger") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org