Repository: spark
Updated Branches:
  refs/heads/branch-1.5 eead87ef2 -> cdf781db6


[SPARK-9824] [CORE] Fix the issue that InternalAccumulator leaks WeakReference

`InternalAccumulator.create` doesn't call `registerAccumulatorForCleanup` to 
register itself with ContextCleaner, so `WeakReference`s for these accumulators 
in `Accumulators.originals` won't be removed.

This PR added `registerAccumulatorForCleanup` for internal accumulators to 
avoid the memory leak.

Author: zsxwing <[email protected]>

Closes #8108 from zsxwing/internal-accumulators-leak.

(cherry picked from commit f16bc68dfb25c7b746ae031a57840ace9bafa87f)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cdf781db
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cdf781db
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cdf781db

Branch: refs/heads/branch-1.5
Commit: cdf781db6dc02f616099ff17ab00df7f9afceac7
Parents: eead87e
Author: zsxwing <[email protected]>
Authored: Tue Aug 11 14:06:23 2015 -0700
Committer: Reynold Xin <[email protected]>
Committed: Tue Aug 11 14:06:29 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulators.scala   | 22 ++++++++++++--------
 .../org/apache/spark/scheduler/Stage.scala      |  2 +-
 .../org/apache/spark/AccumulatorSuite.scala     |  3 ++-
 3 files changed, 16 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cdf781db/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala 
b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 064246d..c39c866 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -382,14 +382,18 @@ private[spark] object InternalAccumulator {
    * add to the same set of accumulators. We do this to report the 
distribution of accumulator
    * values across all tasks within each stage.
    */
-  def create(): Seq[Accumulator[Long]] = {
-    Seq(
-      // Execution memory refers to the memory used by internal data 
structures created
-      // during shuffles, aggregations and joins. The value of this 
accumulator should be
-      // approximately the sum of the peak sizes across all such data 
structures created
-      // in this task. For SQL jobs, this only tracks all unsafe operators and 
ExternalSort.
-      new Accumulator(
-        0L, AccumulatorParam.LongAccumulatorParam, 
Some(PEAK_EXECUTION_MEMORY), internal = true)
-    ) ++ maybeTestAccumulator.toSeq
+  def create(sc: SparkContext): Seq[Accumulator[Long]] = {
+    val internalAccumulators = Seq(
+        // Execution memory refers to the memory used by internal data 
structures created
+        // during shuffles, aggregations and joins. The value of this 
accumulator should be
+        // approximately the sum of the peak sizes across all such data 
structures created
+        // in this task. For SQL jobs, this only tracks all unsafe operators 
and ExternalSort.
+        new Accumulator(
+          0L, AccumulatorParam.LongAccumulatorParam, 
Some(PEAK_EXECUTION_MEMORY), internal = true)
+      ) ++ maybeTestAccumulator.toSeq
+    internalAccumulators.foreach { accumulator =>
+      sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
+    }
+    internalAccumulators
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cdf781db/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index de05ee2..1cf0685 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -81,7 +81,7 @@ private[spark] abstract class Stage(
    * accumulators here again will override partial values from the finished 
tasks.
    */
   def resetInternalAccumulators(): Unit = {
-    _internalAccumulators = InternalAccumulator.create()
+    _internalAccumulators = InternalAccumulator.create(rdd.sparkContext)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/cdf781db/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala 
b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 48f5495..0eb2293 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -160,7 +160,8 @@ class AccumulatorSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
   }
 
   test("internal accumulators in TaskContext") {
-    val accums = InternalAccumulator.create()
+    sc = new SparkContext("local", "test")
+    val accums = InternalAccumulator.create(sc)
     val taskContext = new TaskContextImpl(0, 0, 0, 0, null, null, accums)
     val internalMetricsToAccums = taskContext.internalMetricsToAccumulators
     val collectedInternalAccums = taskContext.collectInternalAccumulators()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to