Repository: spark
Updated Branches:
  refs/heads/master 326f1d672 -> 03fdc92e4


[SPARK-22681] Accumulator should only be updated once for each task in result 
stage

## What changes were proposed in this pull request?
As the doc says "For accumulator updates performed inside actions only, Spark 
guarantees that each task’s update to the accumulator will only be applied 
once, i.e. restarted tasks will not update the value."
But currently the code doesn't guarantee this.

## How was this patch tested?
New added tests.

Author: Carson Wang <carson.w...@intel.com>

Closes #19877 from carsonwang/fixAccum.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03fdc92e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03fdc92e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03fdc92e

Branch: refs/heads/master
Commit: 03fdc92e42d260a2b7c0090115f162ba5c091aae
Parents: 326f1d6
Author: Carson Wang <carson.w...@intel.com>
Authored: Tue Dec 5 09:15:22 2017 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Tue Dec 5 09:15:22 2017 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 14 +++++++++++---
 .../spark/scheduler/DAGSchedulerSuite.scala     | 20 ++++++++++++++++++++
 2 files changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/03fdc92e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 9153751..c2498d4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1187,9 +1187,17 @@ class DAGScheduler(
     // only updated in certain cases.
     event.reason match {
       case Success =>
-        stage match {
-          case rs: ResultStage if rs.activeJob.isEmpty =>
-            // Ignore update if task's job has finished.
+        task match {
+          case rt: ResultTask[_, _] =>
+            val resultStage = stage.asInstanceOf[ResultStage]
+            resultStage.activeJob match {
+              case Some(job) =>
+                // Only update the accumulator once for each result task.
+                if (!job.finished(rt.outputId)) {
+                  updateAccumulators(event)
+                }
+              case None => // Ignore update if task's job has finished.
+            }
           case _ =>
             updateAccumulators(event)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/03fdc92e/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index feefb6a..d812b5b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1832,6 +1832,26 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     assertDataStructuresEmpty()
   }
 
+  test("accumulator not calculated for resubmitted task in result stage") {
+    val accum = AccumulatorSuite.createLongAccum("a")
+    val finalRdd = new MyRDD(sc, 2, Nil)
+    submit(finalRdd, Array(0, 1))
+    // finish the first task
+    completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
+    // verify stage exists
+    assert(scheduler.stageIdToStage.contains(0))
+
+    // finish the first task again (simulate a speculative task or a 
resubmitted task)
+    completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
+    assert(results === Map(0 -> 42))
+
+    // The accumulator should only be updated once.
+    assert(accum.value === 1)
+
+    runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success, 42))
+    assertDataStructuresEmpty()
+  }
+
   test("accumulators are updated on exception failures") {
     val acc1 = AccumulatorSuite.createLongAccum("ingenieur")
     val acc2 = AccumulatorSuite.createLongAccum("boulanger")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to