Repository: spark Updated Branches: refs/heads/master f4b138082 -> b6b8a6632
[SPARK-25568][CORE] Continue to update the remaining accumulators when failing to update one accumulator ## What changes were proposed in this pull request? Since we don't fail a job when `AccumulatorV2.merge` fails, we should try to update the remaining accumulators so that they can still report correct values. ## How was this patch tested? The new unit test. Closes #22586 from zsxwing/SPARK-25568. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: gatorsmile <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6b8a663 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6b8a663 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6b8a663 Branch: refs/heads/master Commit: b6b8a6632e2b6e5482aaf4bfa093700752a9df80 Parents: f4b1380 Author: Shixiong Zhu <[email protected]> Authored: Sat Sep 29 18:10:04 2018 -0700 Committer: gatorsmile <[email protected]> Committed: Sat Sep 29 18:10:04 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/scheduler/DAGScheduler.scala | 20 ++++++++++++++------ .../spark/scheduler/DAGSchedulerSuite.scala | 20 ++++++++++++++++++++ docs/rdd-programming-guide.md | 4 ++++ 3 files changed, 38 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b6b8a663/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4710835..f93d8a8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1245,9 +1245,10 @@ private[spark] class DAGScheduler( private def updateAccumulators(event: CompletionEvent): Unit = { val task = event.task val stage = stageIdToStage(task.stageId) - try { - event.accumUpdates.foreach { updates => - val id = updates.id + + event.accumUpdates.foreach { updates => + val id = updates.id + try { // Find the corresponding accumulator on the driver and update it val acc: AccumulatorV2[Any, Any] = AccumulatorContext.get(id) match { case Some(accum) => accum.asInstanceOf[AccumulatorV2[Any, Any]] @@ -1261,10 +1262,17 @@ private[spark] class DAGScheduler( event.taskInfo.setAccumulables( acc.toInfo(Some(updates.value), Some(acc.value)) +: event.taskInfo.accumulables) } + } catch { + case NonFatal(e) => + // Log the class name to make it easy to find the bad implementation + val accumClassName = AccumulatorContext.get(id) match { + case Some(accum) => accum.getClass.getName + case None => "Unknown class" + } + logError( + s"Failed to update accumulator $id ($accumClassName) for task ${task.partitionId}", + e) } - } catch { - case NonFatal(e) => - logError(s"Failed to update accumulators for task ${task.partitionId}", e) } } http://git-wip-us.apache.org/repos/asf/spark/blob/b6b8a663/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d6c9ae6..b41d2ac 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1880,6 +1880,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(sc.parallelize(1 to 10, 2).count() === 10) } + test("misbehaved accumulator should not impact other accumulators") { + val bad = new LongAccumulator { + override def merge(other: AccumulatorV2[java.lang.Long, java.lang.Long]): Unit = { + throw new DAGSchedulerSuiteDummyException + } + } + sc.register(bad, "bad") + val good = sc.longAccumulator("good") + + sc.parallelize(1 to 10, 2).foreach { item => + bad.add(1) + good.add(1) + } + + // This is to ensure the `bad` accumulator did fail to update its value + assert(bad.value == 0L) + // Should be able to update the "good" accumulator + assert(good.value == 10L) + } + /** * The job will be failed on first task throwing a DAGSchedulerSuiteDummyException. * Any subsequent task WILL throw a legitimate java.lang.UnsupportedOperationException. http://git-wip-us.apache.org/repos/asf/spark/blob/b6b8a663/docs/rdd-programming-guide.md ---------------------------------------------------------------------- diff --git a/docs/rdd-programming-guide.md b/docs/rdd-programming-guide.md index 0054257..9a07d6c 100644 --- a/docs/rdd-programming-guide.md +++ b/docs/rdd-programming-guide.md @@ -1465,6 +1465,10 @@ jsc.sc().register(myVectorAcc, "MyVectorAcc1"); Note that, when programmers define their own type of AccumulatorV2, the resulting type can be different than that of the elements added. +*Warning*: When a Spark task finishes, Spark will try to merge the accumulated updates in this task to an accumulator. +If it fails, Spark will ignore the failure and still mark the task successful and continue to run other tasks. Hence, +a buggy accumulator will not impact a Spark job, but it may not get updated correctly although a Spark job is successful. + </div> <div data-lang="python" markdown="1"> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
