This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a9fcfb5e0f40 [SPARK-54664][CONNECT] Clean up the code related to
`listenerCache` from `connect.StreamingQueryManager`
a9fcfb5e0f40 is described below
commit a9fcfb5e0f403825cdd16c951382a53f7f1bd02b
Author: yangjie01 <[email protected]>
AuthorDate: Wed Dec 10 09:08:54 2025 -0800
[SPARK-54664][CONNECT] Clean up the code related to `listenerCache` from
`connect.StreamingQueryManager`
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/41752 introduced a `listenerCache` and
related private methods (`cacheListenerById`, `getIdByListener`, and
`removeCachedListener`) for `connect.StreamingQueryManager`. However, in
https://github.com/apache/spark/pull/46287, the usage related to
`listenerCache` was replaced by `streamingQueryListenerBus`. As a result,
`listenerCache` and its associated private methods are no longer in use, and
this current pr cleans them up.
### Why are the changes needed?
Code cleanup.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass Github Actions
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53420 from LuciferYang/StreamingQueryManager.
Lead-authored-by: yangjie01 <[email protected]>
Co-authored-by: YangJie <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/connect/StreamingQueryManager.scala | 24 ----------------------
1 file changed, 24 deletions(-)
diff --git
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala
index ac864a1292c8..da3669bc69fd 100644
---
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala
+++
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala
@@ -18,14 +18,12 @@
package org.apache.spark.sql.connect
import java.util.UUID
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import scala.jdk.CollectionConverters._
import org.apache.spark.annotation.Evolving
import org.apache.spark.connect.proto.{Command, StreamingQueryManagerCommand,
StreamingQueryManagerCommandResult}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.connect.common.InvalidPlanInput
import org.apache.spark.sql.streaming
import org.apache.spark.sql.streaming.{StreamingQueryException,
StreamingQueryListener}
@@ -39,15 +37,6 @@ class StreamingQueryManager private[sql] (sparkSession:
SparkSession)
extends streaming.StreamingQueryManager
with Logging {
- // Mapping from id to StreamingQueryListener. There's another mapping from
id to
- // StreamingQueryListener on server side. This is used by removeListener()
to find the id
- // of previously added StreamingQueryListener and pass it to server side to
find the
- // corresponding listener on server side. We use id to
StreamingQueryListener mapping
- // here to make sure there's no hash collision as well as handling the case
that adds and
- // removes the same listener instance multiple times properly.
- private lazy val listenerCache: ConcurrentMap[String,
StreamingQueryListener] =
- new ConcurrentHashMap()
-
private[spark] val streamingQueryListenerBus = new
StreamingQueryListenerBus(sparkSession)
private[spark] def close(): Unit = {
@@ -128,17 +117,4 @@ class StreamingQueryManager private[sql] (sparkSession:
SparkSession)
resp.getStreamingQueryManagerCommandResult
}
-
- private def cacheListenerById(id: String, listener: StreamingQueryListener):
Unit = {
- listenerCache.putIfAbsent(id, listener)
- }
-
- private def getIdByListener(listener: StreamingQueryListener): String = {
- listenerCache.forEach((k, v) => if (listener.equals(v)) return k)
- throw InvalidPlanInput(s"No id with listener $listener is found.")
- }
-
- private def removeCachedListener(id: String): StreamingQueryListener = {
- listenerCache.remove(id)
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]