This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a9fcfb5e0f40 [SPARK-54664][CONNECT] Clean up the code related to 
`listenerCache` from `connect.StreamingQueryManager`
a9fcfb5e0f40 is described below

commit a9fcfb5e0f403825cdd16c951382a53f7f1bd02b
Author: yangjie01 <[email protected]>
AuthorDate: Wed Dec 10 09:08:54 2025 -0800

    [SPARK-54664][CONNECT] Clean up the code related to `listenerCache` from 
`connect.StreamingQueryManager`
    
    ### What changes were proposed in this pull request?
    https://github.com/apache/spark/pull/41752 introduced a `listenerCache` and 
related private methods (`cacheListenerById`, `getIdByListener`, and 
`removeCachedListener`) for `connect.StreamingQueryManager`. However, in 
https://github.com/apache/spark/pull/46287, the usage related to 
`listenerCache` was replaced by `streamingQueryListenerBus`. As a result, 
`listenerCache` and its associated private methods are no longer in use, and 
this current pr cleans them up.
    
    ### Why are the changes needed?
    Code cleanup.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Pass Github Actions
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #53420 from LuciferYang/StreamingQueryManager.
    
    Lead-authored-by: yangjie01 <[email protected]>
    Co-authored-by: YangJie <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/connect/StreamingQueryManager.scala  | 24 ----------------------
 1 file changed, 24 deletions(-)

diff --git 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala
 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala
index ac864a1292c8..da3669bc69fd 100644
--- 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala
+++ 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala
@@ -18,14 +18,12 @@
 package org.apache.spark.sql.connect
 
 import java.util.UUID
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.connect.proto.{Command, StreamingQueryManagerCommand, 
StreamingQueryManagerCommandResult}
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.connect.common.InvalidPlanInput
 import org.apache.spark.sql.streaming
 import org.apache.spark.sql.streaming.{StreamingQueryException, 
StreamingQueryListener}
 
@@ -39,15 +37,6 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession)
     extends streaming.StreamingQueryManager
     with Logging {
 
-  // Mapping from id to StreamingQueryListener. There's another mapping from 
id to
-  // StreamingQueryListener on server side. This is used by removeListener() 
to find the id
-  // of previously added StreamingQueryListener and pass it to server side to 
find the
-  // corresponding listener on server side. We use id to 
StreamingQueryListener mapping
-  // here to make sure there's no hash collision as well as handling the case 
that adds and
-  // removes the same listener instance multiple times properly.
-  private lazy val listenerCache: ConcurrentMap[String, 
StreamingQueryListener] =
-    new ConcurrentHashMap()
-
   private[spark] val streamingQueryListenerBus = new 
StreamingQueryListenerBus(sparkSession)
 
   private[spark] def close(): Unit = {
@@ -128,17 +117,4 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession)
 
     resp.getStreamingQueryManagerCommandResult
   }
-
-  private def cacheListenerById(id: String, listener: StreamingQueryListener): 
Unit = {
-    listenerCache.putIfAbsent(id, listener)
-  }
-
-  private def getIdByListener(listener: StreamingQueryListener): String = {
-    listenerCache.forEach((k, v) => if (listener.equals(v)) return k)
-    throw InvalidPlanInput(s"No id with listener $listener is found.")
-  }
-
-  private def removeCachedListener(id: String): StreamingQueryListener = {
-    listenerCache.remove(id)
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to