This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5c2a268  [SPARK-34087][3.1][SQL] Fix memory leak of 
ExecutionListenerBus
5c2a268 is described below

commit 5c2a26874ac816d81381e3ebd016b56def3a4355
Author: yi.wu <yi...@databricks.com>
AuthorDate: Fri Mar 19 13:23:06 2021 +0800

    [SPARK-34087][3.1][SQL] Fix memory leak of ExecutionListenerBus
    
    This PR proposes an alternative way to fix the memory leak of 
`ExecutionListenerBus`, which would automatically clean them up.
    
    Basically, the idea is to add `registerSparkListenerForCleanup` to 
`ContextCleaner`, so we can remove the `ExecutionListenerBus` from 
`LiveListenerBus` when the `SparkSession` is GC'ed.
    
    On the other hand, to make the `SparkSession` GC-able, we need to get rid 
of the reference of `SparkSession` in `ExecutionListenerBus`. Therefore, we 
introduced the `sessionUUID`, which is a unique identifier for SparkSession, to 
replace the  `SparkSession` object.
    
    Note that, the proposal wouldn't take effect when 
`spark.cleaner.referenceTracking=false` since it depends on `ContextCleaner`.
    
    Fix the memory leak caused by `ExecutionListenerBus` mentioned in 
SPARK-34087.
    
    Yes, save memory for users.
    
    Added unit test.
    
    Closes #31839 from Ngone51/fix-mem-leak-of-ExecutionListenerBus.
    
    Authored-by: yi.wu <yi.wudatabricks.com>
    Signed-off-by: HyukjinKwon <gurwls223apache.org>
    
    Closes #31881 from Ngone51/SPARK-34087-3.1.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../scala/org/apache/spark/ContextCleaner.scala    | 21 +++++++++++++
 .../scala/org/apache/spark/sql/SparkSession.scala  |  3 ++
 .../spark/sql/util/QueryExecutionListener.scala    | 12 ++++----
 .../spark/sql/SparkSessionBuilderSuite.scala       | 35 +++++++++++++++++++++-
 4 files changed, 65 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 7c3d6d9..5da4679 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -27,6 +27,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
+import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.shuffle.api.ShuffleDriverComponents
 import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, ThreadUtils, 
Utils}
 
@@ -39,6 +40,7 @@ private case class CleanShuffle(shuffleId: Int) extends 
CleanupTask
 private case class CleanBroadcast(broadcastId: Long) extends CleanupTask
 private case class CleanAccum(accId: Long) extends CleanupTask
 private case class CleanCheckpoint(rddId: Int) extends CleanupTask
+private case class CleanSparkListener(listener: SparkListener) extends 
CleanupTask
 
 /**
  * A WeakReference associated with a CleanupTask.
@@ -175,6 +177,13 @@ private[spark] class ContextCleaner(
     referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, 
referenceQueue))
   }
 
+  /** Register a SparkListener to be cleaned up when its owner is garbage 
collected. */
+  def registerSparkListenerForCleanup(
+      listenerOwner: AnyRef,
+      listener: SparkListener): Unit = {
+    registerForCleanup(listenerOwner, CleanSparkListener(listener))
+  }
+
   /** Keep cleaning RDD, shuffle, and broadcast state. */
   private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
     while (!stopped) {
@@ -197,6 +206,8 @@ private[spark] class ContextCleaner(
                 doCleanupAccum(accId, blocking = blockOnCleanupTasks)
               case CleanCheckpoint(rddId) =>
                 doCleanCheckpoint(rddId)
+              case CleanSparkListener(listener) =>
+                doCleanSparkListener(listener)
             }
           }
         }
@@ -272,6 +283,16 @@ private[spark] class ContextCleaner(
     }
   }
 
+  def doCleanSparkListener(listener: SparkListener): Unit = {
+    try {
+      logDebug(s"Cleaning Spark listener $listener")
+      sc.listenerBus.removeListener(listener)
+      logDebug(s"Cleaned Spark listener $listener")
+    } catch {
+      case e: Exception => logError(s"Error cleaning Spark listener 
$listener", e)
+    }
+  }
+
   private def broadcastManager = sc.env.broadcastManager
   private def mapOutputTrackerMaster = 
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index f89e58c..28b3a0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import java.io.Closeable
+import java.util.UUID
 import java.util.concurrent.TimeUnit._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 
@@ -100,6 +101,8 @@ class SparkSession private(
         new SparkSessionExtensions))
   }
 
+  private[sql] val sessionUUID: String = UUID.randomUUID.toString
+
   sparkContext.assertNotStopped()
 
   // If there is no active SparkSession, uses the default SQL conf. Otherwise, 
use the session's.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
index 0b5951e..d8b630d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
@@ -76,7 +76,11 @@ trait QueryExecutionListener {
 class ExecutionListenerManager private[sql](session: SparkSession, 
loadExtensions: Boolean)
   extends Logging {
 
-  private val listenerBus = new ExecutionListenerBus(session)
+  private val listenerBus = new ExecutionListenerBus(session.sessionUUID)
+  session.sparkContext.listenerBus.addToSharedQueue(listenerBus)
+  session.sparkContext.cleaner.foreach { cleaner =>
+    cleaner.registerSparkListenerForCleanup(this, listenerBus)
+  }
 
   if (loadExtensions) {
     val conf = session.sparkContext.conf
@@ -124,11 +128,9 @@ class ExecutionListenerManager private[sql](session: 
SparkSession, loadExtension
   }
 }
 
-private[sql] class ExecutionListenerBus(session: SparkSession)
+private[sql] class ExecutionListenerBus(sessionUUID: String)
   extends SparkListener with ListenerBus[QueryExecutionListener, 
SparkListenerSQLExecutionEnd] {
 
-  session.sparkContext.listenerBus.addToSharedQueue(this)
-
   override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
     case e: SparkListenerSQLExecutionEnd => postToAll(e)
     case _ =>
@@ -158,6 +160,6 @@ private[sql] class ExecutionListenerBus(session: 
SparkSession)
   private def shouldReport(e: SparkListenerSQLExecutionEnd): Boolean = {
     // Only catch SQL execution with a name, and triggered by the same spark 
session that this
     // listener manager belongs.
-    e.executionName.isDefined && e.qe.sparkSession.eq(this.session)
+    e.executionName.isDefined && e.qe.sparkSession.sessionUUID == sessionUUID
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index 159d2c0..aff4e4d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -17,18 +17,23 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
+
 import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{SparkConf, SparkContext, SparkException, 
SparkFunSuite}
 import org.apache.spark.internal.config.EXECUTOR_ALLOW_SPARK_CONTEXT
 import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.sql.util.ExecutionListenerBus
 
 /**
  * Test cases for the builder pattern of [[SparkSession]].
  */
-class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
+class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach 
with Eventually {
 
   override def afterEach(): Unit = {
     // This suite should not interfere with the other test suites.
@@ -38,6 +43,34 @@ class SparkSessionBuilderSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     SparkSession.clearDefaultSession()
   }
 
+  test("SPARK-34087: Fix memory leak of ExecutionListenerBus") {
+    val spark = SparkSession.builder()
+      .master("local")
+      .getOrCreate()
+
+    @inline def listenersNum(): Int = {
+      spark.sparkContext
+        .listenerBus
+        .listeners
+        .asScala
+        .count(_.isInstanceOf[ExecutionListenerBus])
+    }
+
+    (1 to 10).foreach { _ =>
+      spark.cloneSession()
+      SparkSession.clearActiveSession()
+    }
+
+    eventually(timeout(10.seconds), interval(1.seconds)) {
+      System.gc()
+      // After GC, the number of ExecutionListenerBus should be less than 11 
(we created 11
+      // SparkSessions in total).
+      // Since GC can't 100% guarantee all out-of-referenced objects be 
cleaned at one time,
+      // here, we check at least one listener is cleaned up to prove the 
mechanism works.
+      assert(listenersNum() < 11)
+    }
+  }
+
   test("create with config options and propagate them to SparkContext and 
SparkSession") {
     val session = SparkSession.builder()
       .master("local")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to