This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 5f46bee [SPARK-34087][3.1][SQL] Fix memory leak of
ExecutionListenerBus
5f46bee is described below
commit 5f46bee4d09a655e38a89f9134138f1826855870
Author: yi.wu <[email protected]>
AuthorDate: Fri Mar 19 13:23:06 2021 +0800
[SPARK-34087][3.1][SQL] Fix memory leak of ExecutionListenerBus
### What changes were proposed in this pull request?
This PR proposes an alternative way to fix the memory leak of
`ExecutionListenerBus`, which would automatically clean them up.
Basically, the idea is to add `registerSparkListenerForCleanup` to
`ContextCleaner`, so we can remove the `ExecutionListenerBus` from
`LiveListenerBus` when the `SparkSession` is GC'ed.
On the other hand, to make the `SparkSession` GC-able, we need to get rid
of the reference of `SparkSession` in `ExecutionListenerBus`. Therefore, we
introduced the `sessionUUID`, which is a unique identifier for SparkSession, to
replace the `SparkSession` object.
Note that, the proposal wouldn't take effect when
`spark.cleaner.referenceTracking=false` since it depends on `ContextCleaner`.
### Why are the changes needed?
Fix the memory leak caused by `ExecutionListenerBus` mentioned in
SPARK-34087.
### Does this PR introduce _any_ user-facing change?
Yes, save memory for users.
### How was this patch tested?
Added unit test.
Closes #31839 from Ngone51/fix-mem-leak-of-ExecutionListenerBus.
Authored-by: yi.wu <yi.wudatabricks.com>
Signed-off-by: HyukjinKwon <gurwls223apache.org>
Closes #31881 from Ngone51/SPARK-34087-3.1.
Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../scala/org/apache/spark/ContextCleaner.scala | 21 +++++++++++++
.../scala/org/apache/spark/sql/SparkSession.scala | 3 ++
.../spark/sql/util/QueryExecutionListener.scala | 12 ++++----
.../spark/sql/SparkSessionBuilderSuite.scala | 35 +++++++++++++++++++++-
4 files changed, 65 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index cfa1139..34b3089 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -27,6 +27,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
+import org.apache.spark.scheduler.SparkListener
import org.apache.spark.shuffle.api.ShuffleDriverComponents
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, ThreadUtils,
Utils}
@@ -39,6 +40,7 @@ private case class CleanShuffle(shuffleId: Int) extends
CleanupTask
private case class CleanBroadcast(broadcastId: Long) extends CleanupTask
private case class CleanAccum(accId: Long) extends CleanupTask
private case class CleanCheckpoint(rddId: Int) extends CleanupTask
+private case class CleanSparkListener(listener: SparkListener) extends
CleanupTask
/**
* A WeakReference associated with a CleanupTask.
@@ -175,6 +177,13 @@ private[spark] class ContextCleaner(
referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup,
referenceQueue))
}
+ /** Register a SparkListener to be cleaned up when its owner is garbage
collected. */
+ def registerSparkListenerForCleanup(
+ listenerOwner: AnyRef,
+ listener: SparkListener): Unit = {
+ registerForCleanup(listenerOwner, CleanSparkListener(listener))
+ }
+
/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
@@ -197,6 +206,8 @@ private[spark] class ContextCleaner(
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
+ case CleanSparkListener(listener) =>
+ doCleanSparkListener(listener)
}
}
}
@@ -276,6 +287,16 @@ private[spark] class ContextCleaner(
}
}
+ def doCleanSparkListener(listener: SparkListener): Unit = {
+ try {
+ logDebug(s"Cleaning Spark listener $listener")
+ sc.listenerBus.removeListener(listener)
+ logDebug(s"Cleaned Spark listener $listener")
+ } catch {
+ case e: Exception => logError(s"Error cleaning Spark listener
$listener", e)
+ }
+ }
+
private def broadcastManager = sc.env.broadcastManager
private def mapOutputTrackerMaster =
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 50e9dee..dae1bc4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql
import java.io.Closeable
+import java.util.UUID
import java.util.concurrent.TimeUnit._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
@@ -102,6 +103,8 @@ class SparkSession private(
new SparkSessionExtensions), Map.empty)
}
+ private[sql] val sessionUUID: String = UUID.randomUUID.toString
+
sparkContext.assertNotStopped()
// If there is no active SparkSession, uses the default SQL conf. Otherwise,
use the session's.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
index 0b5951e..d8b630d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
@@ -76,7 +76,11 @@ trait QueryExecutionListener {
class ExecutionListenerManager private[sql](session: SparkSession,
loadExtensions: Boolean)
extends Logging {
- private val listenerBus = new ExecutionListenerBus(session)
+ private val listenerBus = new ExecutionListenerBus(session.sessionUUID)
+ session.sparkContext.listenerBus.addToSharedQueue(listenerBus)
+ session.sparkContext.cleaner.foreach { cleaner =>
+ cleaner.registerSparkListenerForCleanup(this, listenerBus)
+ }
if (loadExtensions) {
val conf = session.sparkContext.conf
@@ -124,11 +128,9 @@ class ExecutionListenerManager private[sql](session:
SparkSession, loadExtension
}
}
-private[sql] class ExecutionListenerBus(session: SparkSession)
+private[sql] class ExecutionListenerBus(sessionUUID: String)
extends SparkListener with ListenerBus[QueryExecutionListener,
SparkListenerSQLExecutionEnd] {
- session.sparkContext.listenerBus.addToSharedQueue(this)
-
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case e: SparkListenerSQLExecutionEnd => postToAll(e)
case _ =>
@@ -158,6 +160,6 @@ private[sql] class ExecutionListenerBus(session:
SparkSession)
private def shouldReport(e: SparkListenerSQLExecutionEnd): Boolean = {
// Only catch SQL execution with a name, and triggered by the same spark
session that this
// listener manager belongs.
- e.executionName.isDefined && e.qe.sparkSession.eq(this.session)
+ e.executionName.isDefined && e.qe.sparkSession.sessionUUID == sessionUUID
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index 1f16bb6..c729e1e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -17,19 +17,24 @@
package org.apache.spark.sql
+import scala.collection.JavaConverters._
+
import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.SpanSugar._
import org.apache.spark.{SparkConf, SparkContext, SparkException,
SparkFunSuite}
import org.apache.spark.internal.config.EXECUTOR_ALLOW_SPARK_CONTEXT
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.sql.util.ExecutionListenerBus
import org.apache.spark.util.ThreadUtils
/**
* Test cases for the builder pattern of [[SparkSession]].
*/
-class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
+class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach
with Eventually {
override def afterEach(): Unit = {
// This suite should not interfere with the other test suites.
@@ -39,6 +44,34 @@ class SparkSessionBuilderSuite extends SparkFunSuite with
BeforeAndAfterEach {
SparkSession.clearDefaultSession()
}
+ test("SPARK-34087: Fix memory leak of ExecutionListenerBus") {
+ val spark = SparkSession.builder()
+ .master("local")
+ .getOrCreate()
+
+ @inline def listenersNum(): Int = {
+ spark.sparkContext
+ .listenerBus
+ .listeners
+ .asScala
+ .count(_.isInstanceOf[ExecutionListenerBus])
+ }
+
+ (1 to 10).foreach { _ =>
+ spark.cloneSession()
+ SparkSession.clearActiveSession()
+ }
+
+ eventually(timeout(10.seconds), interval(1.seconds)) {
+ System.gc()
+ // After GC, the number of ExecutionListenerBus should be less than 11
(we created 11
+ // SparkSessions in total).
+ // Since GC can't 100% guarantee all out-of-referenced objects be
cleaned at one time,
+ // here, we check at least one listener is cleaned up to prove the
mechanism works.
+ assert(listenersNum() < 11)
+ }
+ }
+
test("create with config options and propagate them to SparkContext and
SparkSession") {
val session = SparkSession.builder()
.master("local")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]