Repository: spark
Updated Branches:
  refs/heads/branch-2.2 708f68c8a -> 0bd918f67


[SPARK-12837][SPARK-20666][CORE][FOLLOWUP] getting name should not fail if 
accumulator is garbage collected

## What changes were proposed in this pull request?

After https://github.com/apache/spark/pull/17596 , we do not send internal 
accumulator name to executor side anymore, and always look up the accumulator 
name in `AccumulatorContext`.

This cause a regression if the accumulator is already garbage collected, this 
PR fixes this by still sending accumulator name for `SQLMetrics`.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenc...@databricks.com>

Closes #17931 from cloud-fan/bug.

(cherry picked from commit e1aaab1e277b1b07c26acea75ade78e39bdac209)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0bd918f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0bd918f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0bd918f6

Branch: refs/heads/branch-2.2
Commit: 0bd918f67630f83cdc2922a2f48bd28b023ef821
Parents: 708f68c
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Mon May 15 09:22:06 2017 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Mon May 15 09:22:24 2017 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/AccumulatorV2.scala    | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0bd918f6/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala 
b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index a65ec75..1a9a692 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -84,10 +84,11 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
    * Returns the name of this accumulator, can only be called after 
registration.
    */
   final def name: Option[String] = {
+    assertMetadataNotNull()
+
     if (atDriverSide) {
-      AccumulatorContext.get(id).flatMap(_.metadata.name)
+      metadata.name.orElse(AccumulatorContext.get(id).flatMap(_.metadata.name))
     } else {
-      assertMetadataNotNull()
       metadata.name
     }
   }
@@ -165,13 +166,15 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
       }
       val copyAcc = copyAndReset()
       assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
-      val isInternalAcc =
-        (name.isDefined && 
name.get.startsWith(InternalAccumulator.METRICS_PREFIX)) ||
-          getClass.getSimpleName == "SQLMetric"
+      val isInternalAcc = name.isDefined && 
name.get.startsWith(InternalAccumulator.METRICS_PREFIX)
       if (isInternalAcc) {
         // Do not serialize the name of internal accumulator and send it to 
executor.
         copyAcc.metadata = metadata.copy(name = None)
       } else {
+        // For non-internal accumulators, we still need to send the name 
because users may need to
+        // access the accumulator name at executor side, or they may keep the 
accumulators sent from
+        // executors and access the name when the registered accumulator is 
already garbage
+        // collected(e.g. SQLMetrics).
         copyAcc.metadata = metadata
       }
       copyAcc


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to