This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5c2a268 [SPARK-34087][3.1][SQL] Fix memory leak of ExecutionListenerBus 5c2a268 is described below commit 5c2a26874ac816d81381e3ebd016b56def3a4355 Author: yi.wu <yi...@databricks.com> AuthorDate: Fri Mar 19 13:23:06 2021 +0800 [SPARK-34087][3.1][SQL] Fix memory leak of ExecutionListenerBus This PR proposes an alternative way to fix the memory leak of `ExecutionListenerBus`, which would automatically clean them up. Basically, the idea is to add `registerSparkListenerForCleanup` to `ContextCleaner`, so we can remove the `ExecutionListenerBus` from `LiveListenerBus` when the `SparkSession` is GC'ed. On the other hand, to make the `SparkSession` GC-able, we need to get rid of the reference of `SparkSession` in `ExecutionListenerBus`. Therefore, we introduced the `sessionUUID`, which is a unique identifier for SparkSession, to replace the `SparkSession` object. Note that, the proposal wouldn't take effect when `spark.cleaner.referenceTracking=false` since it depends on `ContextCleaner`. Fix the memory leak caused by `ExecutionListenerBus` mentioned in SPARK-34087. Yes, save memory for users. Added unit test. Closes #31839 from Ngone51/fix-mem-leak-of-ExecutionListenerBus. Authored-by: yi.wu <yi.wudatabricks.com> Signed-off-by: HyukjinKwon <gurwls223apache.org> Closes #31881 from Ngone51/SPARK-34087-3.1. Authored-by: yi.wu <yi...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../scala/org/apache/spark/ContextCleaner.scala | 21 +++++++++++++ .../scala/org/apache/spark/sql/SparkSession.scala | 3 ++ .../spark/sql/util/QueryExecutionListener.scala | 12 ++++---- .../spark/sql/SparkSessionBuilderSuite.scala | 35 +++++++++++++++++++++- 4 files changed, 65 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 7c3d6d9..5da4679 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -27,6 +27,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData} +import org.apache.spark.scheduler.SparkListener import org.apache.spark.shuffle.api.ShuffleDriverComponents import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, ThreadUtils, Utils} @@ -39,6 +40,7 @@ private case class CleanShuffle(shuffleId: Int) extends CleanupTask private case class CleanBroadcast(broadcastId: Long) extends CleanupTask private case class CleanAccum(accId: Long) extends CleanupTask private case class CleanCheckpoint(rddId: Int) extends CleanupTask +private case class CleanSparkListener(listener: SparkListener) extends CleanupTask /** * A WeakReference associated with a CleanupTask. @@ -175,6 +177,13 @@ private[spark] class ContextCleaner( referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)) } + /** Register a SparkListener to be cleaned up when its owner is garbage collected. */ + def registerSparkListenerForCleanup( + listenerOwner: AnyRef, + listener: SparkListener): Unit = { + registerForCleanup(listenerOwner, CleanSparkListener(listener)) + } + /** Keep cleaning RDD, shuffle, and broadcast state. */ private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) { while (!stopped) { @@ -197,6 +206,8 @@ private[spark] class ContextCleaner( doCleanupAccum(accId, blocking = blockOnCleanupTasks) case CleanCheckpoint(rddId) => doCleanCheckpoint(rddId) + case CleanSparkListener(listener) => + doCleanSparkListener(listener) } } } @@ -272,6 +283,16 @@ private[spark] class ContextCleaner( } } + def doCleanSparkListener(listener: SparkListener): Unit = { + try { + logDebug(s"Cleaning Spark listener $listener") + sc.listenerBus.removeListener(listener) + logDebug(s"Cleaned Spark listener $listener") + } catch { + case e: Exception => logError(s"Error cleaning Spark listener $listener", e) + } + } + private def broadcastManager = sc.env.broadcastManager private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index f89e58c..28b3a0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import java.io.Closeable +import java.util.UUID import java.util.concurrent.TimeUnit._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} @@ -100,6 +101,8 @@ class SparkSession private( new SparkSessionExtensions)) } + private[sql] val sessionUUID: String = UUID.randomUUID.toString + sparkContext.assertNotStopped() // If there is no active SparkSession, uses the default SQL conf. Otherwise, use the session's. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala index 0b5951e..d8b630d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala @@ -76,7 +76,11 @@ trait QueryExecutionListener { class ExecutionListenerManager private[sql](session: SparkSession, loadExtensions: Boolean) extends Logging { - private val listenerBus = new ExecutionListenerBus(session) + private val listenerBus = new ExecutionListenerBus(session.sessionUUID) + session.sparkContext.listenerBus.addToSharedQueue(listenerBus) + session.sparkContext.cleaner.foreach { cleaner => + cleaner.registerSparkListenerForCleanup(this, listenerBus) + } if (loadExtensions) { val conf = session.sparkContext.conf @@ -124,11 +128,9 @@ class ExecutionListenerManager private[sql](session: SparkSession, loadExtension } } -private[sql] class ExecutionListenerBus(session: SparkSession) +private[sql] class ExecutionListenerBus(sessionUUID: String) extends SparkListener with ListenerBus[QueryExecutionListener, SparkListenerSQLExecutionEnd] { - session.sparkContext.listenerBus.addToSharedQueue(this) - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case e: SparkListenerSQLExecutionEnd => postToAll(e) case _ => @@ -158,6 +160,6 @@ private[sql] class ExecutionListenerBus(session: SparkSession) private def shouldReport(e: SparkListenerSQLExecutionEnd): Boolean = { // Only catch SQL execution with a name, and triggered by the same spark session that this // listener manager belongs. - e.executionName.isDefined && e.qe.sparkSession.eq(this.session) + e.executionName.isDefined && e.qe.sparkSession.sessionUUID == sessionUUID } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 159d2c0..aff4e4d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -17,18 +17,23 @@ package org.apache.spark.sql +import scala.collection.JavaConverters._ + import org.scalatest.BeforeAndAfterEach +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite} import org.apache.spark.internal.config.EXECUTOR_ALLOW_SPARK_CONTEXT import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.sql.util.ExecutionListenerBus /** * Test cases for the builder pattern of [[SparkSession]]. */ -class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { +class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach with Eventually { override def afterEach(): Unit = { // This suite should not interfere with the other test suites. @@ -38,6 +43,34 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { SparkSession.clearDefaultSession() } + test("SPARK-34087: Fix memory leak of ExecutionListenerBus") { + val spark = SparkSession.builder() + .master("local") + .getOrCreate() + + @inline def listenersNum(): Int = { + spark.sparkContext + .listenerBus + .listeners + .asScala + .count(_.isInstanceOf[ExecutionListenerBus]) + } + + (1 to 10).foreach { _ => + spark.cloneSession() + SparkSession.clearActiveSession() + } + + eventually(timeout(10.seconds), interval(1.seconds)) { + System.gc() + // After GC, the number of ExecutionListenerBus should be less than 11 (we created 11 + // SparkSessions in total). + // Since GC can't 100% guarantee all out-of-referenced objects be cleaned at one time, + // here, we check at least one listener is cleaned up to prove the mechanism works. + assert(listenersNum() < 11) + } + } + test("create with config options and propagate them to SparkContext and SparkSession") { val session = SparkSession.builder() .master("local") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org