Repository: spark
Updated Branches:
  refs/heads/branch-2.3 28973e152 -> 895c95e5b


[SPARK-22371][CORE] Return None instead of throwing an exception when an 
accumulator is garbage collected.

## What changes were proposed in this pull request?

There's a period of time when an accumulator has been garbage collected, but 
hasn't been removed from AccumulatorContext.originals by ContextCleaner. When 
an update is received for such accumulator it will throw an exception and kill 
the whole job. This can happen when a stage completes, but there're still 
running tasks from other attempts, speculation etc. Since 
AccumulatorContext.get() returns an option we can just return None in such case.

## How was this patch tested?

Unit test.

Author: Artem Rudoy <artem.ru...@gmail.com>

Closes #21114 from artemrd/SPARK-22371.

(cherry picked from commit 6c35865d949a8b46f654cd53c7e5f3288def18d0)
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/895c95e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/895c95e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/895c95e5

Branch: refs/heads/branch-2.3
Commit: 895c95e5bde86099c0ed01edce00d5d87b192cbb
Parents: 28973e1
Author: Artem Rudoy <artem.ru...@gmail.com>
Authored: Thu May 17 18:49:46 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri May 18 11:09:47 2018 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/AccumulatorV2.scala   | 14 +++++++++-----
 .../scala/org/apache/spark/AccumulatorSuite.scala     |  6 ++----
 2 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/895c95e5/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala 
b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 2bc8495..3b469a6 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.spark.{InternalAccumulator, SparkContext, TaskContext}
+import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.AccumulableInfo
 
 private[spark] case class AccumulatorMetadata(
@@ -211,7 +212,7 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
 /**
  * An internal class used to track accumulators by Spark itself.
  */
-private[spark] object AccumulatorContext {
+private[spark] object AccumulatorContext extends Logging {
 
   /**
    * This global map holds the original accumulator objects that are created 
on the driver.
@@ -258,13 +259,16 @@ private[spark] object AccumulatorContext {
    * Returns the [[AccumulatorV2]] registered with the given ID, if any.
    */
   def get(id: Long): Option[AccumulatorV2[_, _]] = {
-    Option(originals.get(id)).map { ref =>
-      // Since we are storing weak references, we must check whether the 
underlying data is valid.
+    val ref = originals.get(id)
+    if (ref eq null) {
+      None
+    } else {
+      // Since we are storing weak references, warn when the underlying data 
is not valid.
       val acc = ref.get
       if (acc eq null) {
-        throw new IllegalStateException(s"Attempted to access garbage 
collected accumulator $id")
+        logWarning(s"Attempted to access garbage collected accumulator $id")
       }
-      acc
+      Option(acc)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/895c95e5/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala 
b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 3990ee1..5d0ffd9 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -209,10 +209,8 @@ class AccumulatorSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
     System.gc()
     assert(ref.get.isEmpty)
 
-    // Getting a garbage collected accum should throw error
-    intercept[IllegalStateException] {
-      AccumulatorContext.get(accId)
-    }
+    // Getting a garbage collected accum should return None.
+    assert(AccumulatorContext.get(accId).isEmpty)
 
     // Getting a normal accumulator. Note: this has to be separate because 
referencing an
     // accumulator above in an `assert` would keep it from being garbage 
collected.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to