This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7a01ba65b74 [SPARK-45061][SS][CONNECT] Clean up Running python
StreamingQueryLIstener processes when session expires
7a01ba65b74 is described below
commit 7a01ba65b7408bc3b907aa7b0b27279913caafe9
Author: Wei Liu <[email protected]>
AuthorDate: Mon Sep 4 09:36:49 2023 +0900
[SPARK-45061][SS][CONNECT] Clean up Running python StreamingQueryLIstener
processes when session expires
### What changes were proposed in this pull request?
Clean up all running python StreamingQueryLIstener processes when session
expires
### Why are the changes needed?
Improvement
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Test will be added in SPARK-44462. Currently there is no way to test this
because the session will never expire. This is because the started python
listener process (on the server) will establish a connection with the server
process with the same session id and ping it all the time.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #42687 from WweiL/SPARK-44433-followup-listener-cleanup.
Authored-by: Wei Liu <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/connect/planner/SparkConnectPlanner.scala | 4 +++-
.../spark/sql/connect/service/SessionHolder.scala | 21 ++++++++++++++++-----
2 files changed, 19 insertions(+), 6 deletions(-)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 11300631491..579b378d09f 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -2900,7 +2900,9 @@ class SparkConnectPlanner(val sessionHolder:
SessionHolder) extends Logging {
SparkConnectService.streamingSessionManager.registerNewStreamingQuery(sessionHolder,
query)
// Register the runner with the query if Python foreachBatch is enabled.
foreachBatchRunnerCleaner.foreach { cleaner =>
- sessionHolder.streamingRunnerCleanerCache.registerCleanerForQuery(query,
cleaner)
+
sessionHolder.streamingForeachBatchRunnerCleanerCache.registerCleanerForQuery(
+ query,
+ cleaner)
}
executeHolder.eventsManager.postFinished()
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
index 2034a97fce9..1cef02d7e34 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
@@ -57,7 +57,7 @@ case class SessionHolder(userId: String, sessionId: String,
session: SparkSessio
new ConcurrentHashMap()
// Handles Python process clean up for streaming queries. Initialized on
first use in a query.
- private[connect] lazy val streamingRunnerCleanerCache =
+ private[connect] lazy val streamingForeachBatchRunnerCleanerCache =
new StreamingForeachBatchHelper.CleanerCache(this)
/** Add ExecuteHolder to this session. Called only by
SparkConnectExecutionManager. */
@@ -160,7 +160,8 @@ case class SessionHolder(userId: String, sessionId: String,
session: SparkSessio
eventManager.postClosed()
// Clean up running queries
SparkConnectService.streamingSessionManager.cleanupRunningQueries(this)
- streamingRunnerCleanerCache.cleanUpAll() // Clean up any streaming workers.
+ streamingForeachBatchRunnerCleanerCache.cleanUpAll() // Clean up any
streaming workers.
+ removeAllListeners() // removes all listener and stop python listener
processes if necessary.
}
/**
@@ -237,11 +238,21 @@ case class SessionHolder(userId: String, sessionId:
String, session: SparkSessio
* Spark Connect PythonStreamingQueryListener.
*/
private[connect] def removeCachedListener(id: String): Unit = {
- listenerCache.get(id) match {
- case pyListener: PythonStreamingQueryListener =>
pyListener.stopListenerProcess()
+ Option(listenerCache.remove(id)) match {
+ case Some(pyListener: PythonStreamingQueryListener) =>
pyListener.stopListenerProcess()
case _ => // do nothing
}
- listenerCache.remove(id)
+ }
+
+ /**
+ * Stop all streaming listener threads, and removes all python process if
applicable. Only
+ * called when session is expired.
+ */
+ private def removeAllListeners(): Unit = {
+ listenerCache.forEach((id, listener) => {
+ session.streams.removeListener(listener)
+ removeCachedListener(id)
+ })
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]