This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 869fc2198a4 [SPARK-39864][SQL] Lazily register ExecutionListenerBus
869fc2198a4 is described below
commit 869fc2198a4bb51bc03dce36fb2b61a57fe3006e
Author: Josh Rosen <[email protected]>
AuthorDate: Wed Jul 27 14:25:54 2022 +0900
[SPARK-39864][SQL] Lazily register ExecutionListenerBus
### What changes were proposed in this pull request?
This PR modifies `ExecutionListenerManager` so that its
`ExecutionListenerBus` SparkListener is lazily registered during the first
`.register(QueryExceutionListener)` (instead of eagerly registering it in the
constructor).
### Why are the changes needed?
This addresses a ListenerBus performance problem in applications with large
numbers of short-lived SparkSessions.
The `ExecutionListenerBus` SparkListener is unregistered by the
ContextCleaner after its associated ExecutionListenerManager/SparkSession is
garbage-collected (see #31839). If many sessions are rapidly created and
destroyed but the driver GC doesn't run then this can result in large number of
unused ExecutionListenerBus listeners being registered on the shared
ListenerBus queue. This can cause performance problems in the ListenerBus
because each listener invocation has some overhead.
In one real-world application with a very large driver heap and high rate
of SparkSession creation (hundreds per minute), I saw 5000 idle
ExecutionListenerBus listeners, resulting in ~50ms median event processing
times on the shared listener queue.
This patch avoids this problem by making the listener registration lazy: if
a short-lived SparkSession never uses QueryExecutionListeners then we won't
register the ExecutionListenerBus and won't incur these overheads.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a new unit test.
Closes #37282 from JoshRosen/SPARK-39864.
Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../spark/sql/util/QueryExecutionListener.scala | 20 ++++++++++++++------
.../sql/util/ExecutionListenerManagerSuite.scala | 15 +++++++++++++++
2 files changed, 29 insertions(+), 6 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
index 7ac06a5cd7e..45482f12f3c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
@@ -81,7 +81,10 @@ class ExecutionListenerManager private[sql](
loadExtensions: Boolean)
extends Logging {
- private val listenerBus = new ExecutionListenerBus(this, session)
+ // SPARK-39864: lazily create the listener bus on the first register() call
in order to
+ // avoid listener overheads when QueryExecutionListeners aren't used:
+ private val listenerBusInitializationLock = new Object()
+ @volatile private var listenerBus: Option[ExecutionListenerBus] = None
if (loadExtensions) {
val conf = session.sparkContext.conf
@@ -97,7 +100,12 @@ class ExecutionListenerManager private[sql](
*/
@DeveloperApi
def register(listener: QueryExecutionListener): Unit = {
- listenerBus.addListener(listener)
+ listenerBusInitializationLock.synchronized {
+ if (listenerBus.isEmpty) {
+ listenerBus = Some(new ExecutionListenerBus(this, session))
+ }
+ }
+ listenerBus.get.addListener(listener)
}
/**
@@ -105,7 +113,7 @@ class ExecutionListenerManager private[sql](
*/
@DeveloperApi
def unregister(listener: QueryExecutionListener): Unit = {
- listenerBus.removeListener(listener)
+ listenerBus.foreach(_.removeListener(listener))
}
/**
@@ -113,12 +121,12 @@ class ExecutionListenerManager private[sql](
*/
@DeveloperApi
def clear(): Unit = {
- listenerBus.removeAllListeners()
+ listenerBus.foreach(_.removeAllListeners())
}
/** Only exposed for testing. */
private[sql] def listListeners(): Array[QueryExecutionListener] = {
- listenerBus.listeners.asScala.toArray
+
listenerBus.map(_.listeners.asScala.toArray).getOrElse(Array.empty[QueryExecutionListener])
}
/**
@@ -127,7 +135,7 @@ class ExecutionListenerManager private[sql](
private[sql] def clone(session: SparkSession, sqlConf: SQLConf):
ExecutionListenerManager = {
val newListenerManager =
new ExecutionListenerManager(session, sqlConf, loadExtensions = false)
- listenerBus.listeners.asScala.foreach(newListenerManager.register)
+
listenerBus.foreach(_.listeners.asScala.foreach(newListenerManager.register))
newListenerManager
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala
index 2ab733eac0b..56219766f70 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala
@@ -69,6 +69,21 @@ class ExecutionListenerManagerSuite extends SparkFunSuite
with LocalSparkSession
assert(INSTANCE_COUNT.get() === 1)
assert(CALLBACK_COUNT.get() === 2)
}
+
+ test("SPARK-39864 ExecutionListenerBus is lazily registered") {
+ spark =
SparkSession.builder().master("local").appName("test").getOrCreate()
+ // Run a query to trigger the lazy initialization of the session state:
+ spark.sql("select 1").collect()
+ // The ExecutionListenerBus shouldn't be registered since no
QueryExecutionListeners
+ // are registered:
+
assert(spark.sparkContext.listenerBus.findListenersByClass[ExecutionListenerBus]().isEmpty)
+ // Registering the first query execution listener registers a listener bus:
+ spark.listenerManager.register(new CountingQueryExecutionListener)
+
assert(spark.sparkContext.listenerBus.findListenersByClass[ExecutionListenerBus]().size
== 1)
+ // Registering additional listeners reuses the same listener bus:
+ spark.listenerManager.register(new CountingQueryExecutionListener)
+
assert(spark.sparkContext.listenerBus.findListenersByClass[ExecutionListenerBus]().size
== 1)
+ }
}
private class CountingQueryExecutionListener extends QueryExecutionListener {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]