This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d813f5467e93 [SPARK-49417][CONNECT][SQL] Add Shared 
StreamingQueryManager interface
d813f5467e93 is described below

commit d813f5467e930ed4a22d2ea6aa4333cf379ea7f9
Author: Herman van Hovell <[email protected]>
AuthorDate: Fri Sep 27 19:41:37 2024 -0400

    [SPARK-49417][CONNECT][SQL] Add Shared StreamingQueryManager interface
    
    ### What changes were proposed in this pull request?
    This PR adds a shared StreamingQueryManager interface.
    
    ### Why are the changes needed?
    We are working on a shared Scala SQL interface for Classic and Connect. 
This change is part of this work.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #48217 from hvanhovell/SPARK-49417.
    
    Authored-by: Herman van Hovell <[email protected]>
    Signed-off-by: Herman van Hovell <[email protected]>
---
 .../scala/org/apache/spark/sql/SparkSession.scala  |   1 +
 .../sql/streaming/StreamingQueryManager.scala      |  93 +++------------
 .../org/apache/spark/sql/api/SparkSession.scala    |  11 +-
 .../spark/sql/api/StreamingQueryManager.scala      | 130 +++++++++++++++++++++
 .../scala/org/apache/spark/sql/SparkSession.scala  |   7 +-
 .../sql/streaming/StreamingQueryManager.scala      |  95 +++------------
 6 files changed, 169 insertions(+), 168 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 1b41566ca1d1..b31670c1da57 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -212,6 +212,7 @@ class SparkSession private[sql] (
   /** @inheritdoc */
   def readStream: DataStreamReader = new DataStreamReader(this)
 
+  /** @inheritdoc */
   lazy val streams: StreamingQueryManager = new StreamingQueryManager(this)
 
   /** @inheritdoc */
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 7efced227d6d..647d29c714db 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -27,7 +27,7 @@ import org.apache.spark.connect.proto.Command
 import org.apache.spark.connect.proto.StreamingQueryManagerCommand
 import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{api, SparkSession}
 import org.apache.spark.sql.connect.common.InvalidPlanInput
 
 /**
@@ -36,7 +36,9 @@ import org.apache.spark.sql.connect.common.InvalidPlanInput
  * @since 3.5.0
  */
 @Evolving
-class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends 
Logging {
+class StreamingQueryManager private[sql] (sparkSession: SparkSession)
+    extends api.StreamingQueryManager
+    with Logging {
 
   // Mapping from id to StreamingQueryListener. There's another mapping from 
id to
   // StreamingQueryListener on server side. This is used by removeListener() 
to find the id
@@ -53,29 +55,17 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
     streamingQueryListenerBus.close()
   }
 
-  /**
-   * Returns a list of active queries associated with this SQLContext
-   *
-   * @since 3.5.0
-   */
+  /** @inheritdoc */
   def active: Array[StreamingQuery] = {
     
executeManagerCmd(_.setActive(true)).getActive.getActiveQueriesList.asScala.map 
{ q =>
       RemoteStreamingQuery.fromStreamingQueryInstanceResponse(sparkSession, q)
     }.toArray
   }
 
-  /**
-   * Returns the query if there is an active query with the given id, or null.
-   *
-   * @since 3.5.0
-   */
+  /** @inheritdoc */
   def get(id: UUID): StreamingQuery = get(id.toString)
 
-  /**
-   * Returns the query if there is an active query with the given id, or null.
-   *
-   * @since 3.5.0
-   */
+  /** @inheritdoc */
   def get(id: String): StreamingQuery = {
     val response = executeManagerCmd(_.setGetQuery(id))
     if (response.hasQuery) {
@@ -85,52 +75,13 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
     }
   }
 
-  /**
-   * Wait until any of the queries on the associated SQLContext has terminated 
since the creation
-   * of the context, or since `resetTerminated()` was called. If any query was 
terminated with an
-   * exception, then the exception will be thrown.
-   *
-   * If a query has terminated, then subsequent calls to 
`awaitAnyTermination()` will either
-   * return immediately (if the query was terminated by `query.stop()`), or 
throw the exception
-   * immediately (if the query was terminated with exception). Use 
`resetTerminated()` to clear
-   * past terminations and wait for new terminations.
-   *
-   * In the case where multiple queries have terminated since 
`resetTermination()` was called, if
-   * any query has terminated with exception, then `awaitAnyTermination()` 
will throw any of the
-   * exception. For correctly documenting exceptions across multiple queries, 
users need to stop
-   * all of them after any of them terminates with exception, and then check 
the
-   * `query.exception()` for each query.
-   *
-   * @throws StreamingQueryException
-   *   if any query has terminated with an exception
-   * @since 3.5.0
-   */
+  /** @inheritdoc */
   @throws[StreamingQueryException]
   def awaitAnyTermination(): Unit = {
     executeManagerCmd(_.getAwaitAnyTerminationBuilder.build())
   }
 
-  /**
-   * Wait until any of the queries on the associated SQLContext has terminated 
since the creation
-   * of the context, or since `resetTerminated()` was called. Returns whether 
any query has
-   * terminated or not (multiple may have terminated). If any query has 
terminated with an
-   * exception, then the exception will be thrown.
-   *
-   * If a query has terminated, then subsequent calls to 
`awaitAnyTermination()` will either
-   * return `true` immediately (if the query was terminated by 
`query.stop()`), or throw the
-   * exception immediately (if the query was terminated with exception). Use 
`resetTerminated()`
-   * to clear past terminations and wait for new terminations.
-   *
-   * In the case where multiple queries have terminated since 
`resetTermination()` was called, if
-   * any query has terminated with exception, then `awaitAnyTermination()` 
will throw any of the
-   * exception. For correctly documenting exceptions across multiple queries, 
users need to stop
-   * all of them after any of them terminates with exception, and then check 
the
-   * `query.exception()` for each query.
-   *
-   * @throws StreamingQueryException
-   *   if any query has terminated with an exception
-   * @since 3.5.0
-   */
+  /** @inheritdoc */
   @throws[StreamingQueryException]
   def awaitAnyTermination(timeoutMs: Long): Boolean = {
     require(timeoutMs > 0, "Timeout has to be positive")
@@ -139,40 +90,22 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
         timeoutMs)).getAwaitAnyTermination.getTerminated
   }
 
-  /**
-   * Forget about past terminated queries so that `awaitAnyTermination()` can 
be used again to
-   * wait for new terminations.
-   *
-   * @since 3.5.0
-   */
+  /** @inheritdoc */
   def resetTerminated(): Unit = {
     executeManagerCmd(_.setResetTerminated(true))
   }
 
-  /**
-   * Register a [[StreamingQueryListener]] to receive up-calls for life cycle 
events of
-   * [[StreamingQuery]].
-   *
-   * @since 3.5.0
-   */
+  /** @inheritdoc */
   def addListener(listener: StreamingQueryListener): Unit = {
     streamingQueryListenerBus.append(listener)
   }
 
-  /**
-   * Deregister a [[StreamingQueryListener]].
-   *
-   * @since 3.5.0
-   */
+  /** @inheritdoc */
   def removeListener(listener: StreamingQueryListener): Unit = {
     streamingQueryListenerBus.remove(listener)
   }
 
-  /**
-   * List all [[StreamingQueryListener]]s attached to this 
[[StreamingQueryManager]].
-   *
-   * @since 3.5.0
-   */
+  /** @inheritdoc */
   def listListeners(): Array[StreamingQueryListener] = {
     streamingQueryListenerBus.list()
   }
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
index 0f73a94c3c4a..4dfeb87a11d9 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
@@ -25,7 +25,7 @@ import _root_.java.lang
 import _root_.java.net.URI
 import _root_.java.util
 
-import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable}
+import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, 
Unstable}
 import org.apache.spark.sql.{Encoder, Row, RuntimeConfig}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SparkClassUtils
@@ -93,6 +93,15 @@ abstract class SparkSession extends Serializable with 
Closeable {
    */
   def udf: UDFRegistration
 
+  /**
+   * Returns a `StreamingQueryManager` that allows managing all the 
`StreamingQuery`s active on
+   * `this`.
+   *
+   * @since 2.0.0
+   */
+  @Unstable
+  def streams: StreamingQueryManager
+
   /**
    * Start a new session with isolated SQL configurations, temporary tables, 
registered functions
    * are isolated, but sharing the underlying `SparkContext` and cached data.
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/api/StreamingQueryManager.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/api/StreamingQueryManager.scala
new file mode 100644
index 000000000000..88ba9a493d06
--- /dev/null
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/api/StreamingQueryManager.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.api
+
+import _root_.java.util.UUID
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.streaming.{StreamingQueryException, 
StreamingQueryListener}
+
+/**
+ * A class to manage all the [[StreamingQuery]] active in a `SparkSession`.
+ *
+ * @since 2.0.0
+ */
+@Evolving
+abstract class StreamingQueryManager {
+
+  /**
+   * Returns a list of active queries associated with this SQLContext
+   *
+   * @since 2.0.0
+   */
+  def active: Array[_ <: StreamingQuery]
+
+  /**
+   * Returns the query if there is an active query with the given id, or null.
+   *
+   * @since 2.1.0
+   */
+  def get(id: UUID): StreamingQuery
+
+  /**
+   * Returns the query if there is an active query with the given id, or null.
+   *
+   * @since 2.1.0
+   */
+  def get(id: String): StreamingQuery
+
+  /**
+   * Wait until any of the queries on the associated SQLContext has terminated 
since the creation
+   * of the context, or since `resetTerminated()` was called. If any query was 
terminated with an
+   * exception, then the exception will be thrown.
+   *
+   * If a query has terminated, then subsequent calls to 
`awaitAnyTermination()` will either
+   * return immediately (if the query was terminated by `query.stop()`), or 
throw the exception
+   * immediately (if the query was terminated with exception). Use 
`resetTerminated()` to clear
+   * past terminations and wait for new terminations.
+   *
+   * In the case where multiple queries have terminated since 
`resetTermination()` was called, if
+   * any query has terminated with exception, then `awaitAnyTermination()` 
will throw any of the
+   * exception. For correctly documenting exceptions across multiple queries, 
users need to stop
+   * all of them after any of them terminates with exception, and then check 
the
+   * `query.exception()` for each query.
+   *
+   * @throws org.apache.spark.sql.streaming.StreamingQueryException
+   *   if any query has terminated with an exception
+   * @since 2.0.0
+   */
+  @throws[StreamingQueryException]
+  def awaitAnyTermination(): Unit
+
+  /**
+   * Wait until any of the queries on the associated SQLContext has terminated 
since the creation
+   * of the context, or since `resetTerminated()` was called. Returns whether 
any query has
+   * terminated or not (multiple may have terminated). If any query has 
terminated with an
+   * exception, then the exception will be thrown.
+   *
+   * If a query has terminated, then subsequent calls to 
`awaitAnyTermination()` will either
+   * return `true` immediately (if the query was terminated by 
`query.stop()`), or throw the
+   * exception immediately (if the query was terminated with exception). Use 
`resetTerminated()`
+   * to clear past terminations and wait for new terminations.
+   *
+   * In the case where multiple queries have terminated since 
`resetTermination()` was called, if
+   * any query has terminated with exception, then `awaitAnyTermination()` 
will throw any of the
+   * exception. For correctly documenting exceptions across multiple queries, 
users need to stop
+   * all of them after any of them terminates with exception, and then check 
the
+   * `query.exception()` for each query.
+   *
+   * @throws org.apache.spark.sql.streaming.StreamingQueryException
+   *   if any query has terminated with an exception
+   * @since 2.0.0
+   */
+  @throws[StreamingQueryException]
+  def awaitAnyTermination(timeoutMs: Long): Boolean
+
+  /**
+   * Forget about past terminated queries so that `awaitAnyTermination()` can 
be used again to
+   * wait for new terminations.
+   *
+   * @since 2.0.0
+   */
+  def resetTerminated(): Unit
+
+  /**
+   * Register a [[org.apache.spark.sql.streaming.StreamingQueryListener]] to 
receive up-calls for
+   * life cycle events of [[StreamingQuery]].
+   *
+   * @since 2.0.0
+   */
+  def addListener(listener: StreamingQueryListener): Unit
+
+  /**
+   * Deregister a [[org.apache.spark.sql.streaming.StreamingQueryListener]].
+   *
+   * @since 2.0.0
+   */
+  def removeListener(listener: StreamingQueryListener): Unit
+
+  /**
+   * List all [[org.apache.spark.sql.streaming.StreamingQueryListener]]s 
attached to this
+   * [[StreamingQueryManager]].
+   *
+   * @since 3.0.0
+   */
+  def listListeners(): Array[StreamingQueryListener]
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 983cc24718fd..eeb46fbf145d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -229,12 +229,7 @@ class SparkSession private(
   @Unstable
   def dataSource: DataSourceRegistration = sessionState.dataSourceRegistration
 
-  /**
-   * Returns a `StreamingQueryManager` that allows managing all the
-   * `StreamingQuery`s active on `this`.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   @Unstable
   def streams: StreamingQueryManager = sessionState.streamingQueryManager
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 9d6fd2e28dea..42f6d04466b0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.internal.{Logging, MDC}
 import org.apache.spark.internal.LogKeys.{CLASS_NAME, QUERY_ID, RUN_ID}
-import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.{api, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.streaming.{WriteToStream, 
WriteToStreamStatement}
 import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, 
Table, TableCatalog}
@@ -47,7 +47,9 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
 @Evolving
 class StreamingQueryManager private[sql] (
     sparkSession: SparkSession,
-    sqlConf: SQLConf) extends Logging {
+    sqlConf: SQLConf)
+  extends api.StreamingQueryManager
+  with Logging {
 
   private[sql] val stateStoreCoordinator =
     StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
@@ -70,7 +72,7 @@ class StreamingQueryManager private[sql] (
    *   failed. The exception is the exception of the last failed query.
    */
   @GuardedBy("awaitTerminationLock")
-  private var lastTerminatedQueryException: Option[StreamingQueryException] = 
null
+  private var lastTerminatedQueryException: Option[StreamingQueryException] = _
 
   try {
     sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { 
classNames =>
@@ -90,51 +92,20 @@ class StreamingQueryManager private[sql] (
       throw QueryExecutionErrors.registeringStreamingQueryListenerError(e)
   }
 
-  /**
-   * Returns a list of active queries associated with this SQLContext
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def active: Array[StreamingQuery] = activeQueriesSharedLock.synchronized {
     activeQueries.values.toArray
   }
 
-  /**
-   * Returns the query if there is an active query with the given id, or null.
-   *
-   * @since 2.1.0
-   */
+  /** @inheritdoc */
   def get(id: UUID): StreamingQuery = activeQueriesSharedLock.synchronized {
     activeQueries.get(id).orNull
   }
 
-  /**
-   * Returns the query if there is an active query with the given id, or null.
-   *
-   * @since 2.1.0
-   */
+  /** @inheritdoc */
   def get(id: String): StreamingQuery = get(UUID.fromString(id))
 
-  /**
-   * Wait until any of the queries on the associated SQLContext has terminated 
since the
-   * creation of the context, or since `resetTerminated()` was called. If any 
query was terminated
-   * with an exception, then the exception will be thrown.
-   *
-   * If a query has terminated, then subsequent calls to 
`awaitAnyTermination()` will either
-   * return immediately (if the query was terminated by `query.stop()`),
-   * or throw the exception immediately (if the query was terminated with 
exception). Use
-   * `resetTerminated()` to clear past terminations and wait for new 
terminations.
-   *
-   * In the case where multiple queries have terminated since 
`resetTermination()` was called,
-   * if any query has terminated with exception, then `awaitAnyTermination()` 
will
-   * throw any of the exception. For correctly documenting exceptions across 
multiple queries,
-   * users need to stop all of them after any of them terminates with 
exception, and then check the
-   * `query.exception()` for each query.
-   *
-   * @throws StreamingQueryException if any query has terminated with an 
exception
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   @throws[StreamingQueryException]
   def awaitAnyTermination(): Unit = {
     awaitTerminationLock.synchronized {
@@ -147,27 +118,7 @@ class StreamingQueryManager private[sql] (
     }
   }
 
-  /**
-   * Wait until any of the queries on the associated SQLContext has terminated 
since the
-   * creation of the context, or since `resetTerminated()` was called. Returns 
whether any query
-   * has terminated or not (multiple may have terminated). If any query has 
terminated with an
-   * exception, then the exception will be thrown.
-   *
-   * If a query has terminated, then subsequent calls to 
`awaitAnyTermination()` will either
-   * return `true` immediately (if the query was terminated by `query.stop()`),
-   * or throw the exception immediately (if the query was terminated with 
exception). Use
-   * `resetTerminated()` to clear past terminations and wait for new 
terminations.
-   *
-   * In the case where multiple queries have terminated since 
`resetTermination()` was called,
-   * if any query has terminated with exception, then `awaitAnyTermination()` 
will
-   * throw any of the exception. For correctly documenting exceptions across 
multiple queries,
-   * users need to stop all of them after any of them terminates with 
exception, and then check the
-   * `query.exception()` for each query.
-   *
-   * @throws StreamingQueryException if any query has terminated with an 
exception
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   @throws[StreamingQueryException]
   def awaitAnyTermination(timeoutMs: Long): Boolean = {
 
@@ -187,42 +138,24 @@ class StreamingQueryManager private[sql] (
     }
   }
 
-  /**
-   * Forget about past terminated queries so that `awaitAnyTermination()` can 
be used again to
-   * wait for new terminations.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def resetTerminated(): Unit = {
     awaitTerminationLock.synchronized {
       lastTerminatedQueryException = null
     }
   }
 
-  /**
-   * Register a [[StreamingQueryListener]] to receive up-calls for life cycle 
events of
-   * [[StreamingQuery]].
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def addListener(listener: StreamingQueryListener): Unit = {
     listenerBus.addListener(listener)
   }
 
-  /**
-   * Deregister a [[StreamingQueryListener]].
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def removeListener(listener: StreamingQueryListener): Unit = {
     listenerBus.removeListener(listener)
   }
 
-  /**
-   * List all [[StreamingQueryListener]]s attached to this 
[[StreamingQueryManager]].
-   *
-   * @since 3.0.0
-   */
+  /** @inheritdoc */
   def listListeners(): Array[StreamingQueryListener] = {
     listenerBus.listeners.asScala.toArray
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to