This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d813f5467e93 [SPARK-49417][CONNECT][SQL] Add Shared
StreamingQueryManager interface
d813f5467e93 is described below
commit d813f5467e930ed4a22d2ea6aa4333cf379ea7f9
Author: Herman van Hovell <[email protected]>
AuthorDate: Fri Sep 27 19:41:37 2024 -0400
[SPARK-49417][CONNECT][SQL] Add Shared StreamingQueryManager interface
### What changes were proposed in this pull request?
This PR adds a shared StreamingQueryManager interface.
### Why are the changes needed?
We are working on a shared Scala SQL interface for Classic and Connect.
This change is part of this work.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48217 from hvanhovell/SPARK-49417.
Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../scala/org/apache/spark/sql/SparkSession.scala | 1 +
.../sql/streaming/StreamingQueryManager.scala | 93 +++------------
.../org/apache/spark/sql/api/SparkSession.scala | 11 +-
.../spark/sql/api/StreamingQueryManager.scala | 130 +++++++++++++++++++++
.../scala/org/apache/spark/sql/SparkSession.scala | 7 +-
.../sql/streaming/StreamingQueryManager.scala | 95 +++------------
6 files changed, 169 insertions(+), 168 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 1b41566ca1d1..b31670c1da57 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -212,6 +212,7 @@ class SparkSession private[sql] (
/** @inheritdoc */
def readStream: DataStreamReader = new DataStreamReader(this)
+ /** @inheritdoc */
lazy val streams: StreamingQueryManager = new StreamingQueryManager(this)
/** @inheritdoc */
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 7efced227d6d..647d29c714db 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -27,7 +27,7 @@ import org.apache.spark.connect.proto.Command
import org.apache.spark.connect.proto.StreamingQueryManagerCommand
import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{api, SparkSession}
import org.apache.spark.sql.connect.common.InvalidPlanInput
/**
@@ -36,7 +36,9 @@ import org.apache.spark.sql.connect.common.InvalidPlanInput
* @since 3.5.0
*/
@Evolving
-class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends
Logging {
+class StreamingQueryManager private[sql] (sparkSession: SparkSession)
+ extends api.StreamingQueryManager
+ with Logging {
// Mapping from id to StreamingQueryListener. There's another mapping from
id to
// StreamingQueryListener on server side. This is used by removeListener()
to find the id
@@ -53,29 +55,17 @@ class StreamingQueryManager private[sql] (sparkSession:
SparkSession) extends Lo
streamingQueryListenerBus.close()
}
- /**
- * Returns a list of active queries associated with this SQLContext
- *
- * @since 3.5.0
- */
+ /** @inheritdoc */
def active: Array[StreamingQuery] = {
executeManagerCmd(_.setActive(true)).getActive.getActiveQueriesList.asScala.map
{ q =>
RemoteStreamingQuery.fromStreamingQueryInstanceResponse(sparkSession, q)
}.toArray
}
- /**
- * Returns the query if there is an active query with the given id, or null.
- *
- * @since 3.5.0
- */
+ /** @inheritdoc */
def get(id: UUID): StreamingQuery = get(id.toString)
- /**
- * Returns the query if there is an active query with the given id, or null.
- *
- * @since 3.5.0
- */
+ /** @inheritdoc */
def get(id: String): StreamingQuery = {
val response = executeManagerCmd(_.setGetQuery(id))
if (response.hasQuery) {
@@ -85,52 +75,13 @@ class StreamingQueryManager private[sql] (sparkSession:
SparkSession) extends Lo
}
}
- /**
- * Wait until any of the queries on the associated SQLContext has terminated
since the creation
- * of the context, or since `resetTerminated()` was called. If any query was
terminated with an
- * exception, then the exception will be thrown.
- *
- * If a query has terminated, then subsequent calls to
`awaitAnyTermination()` will either
- * return immediately (if the query was terminated by `query.stop()`), or
throw the exception
- * immediately (if the query was terminated with exception). Use
`resetTerminated()` to clear
- * past terminations and wait for new terminations.
- *
- * In the case where multiple queries have terminated since
`resetTermination()` was called, if
- * any query has terminated with exception, then `awaitAnyTermination()`
will throw any of the
- * exception. For correctly documenting exceptions across multiple queries,
users need to stop
- * all of them after any of them terminates with exception, and then check
the
- * `query.exception()` for each query.
- *
- * @throws StreamingQueryException
- * if any query has terminated with an exception
- * @since 3.5.0
- */
+ /** @inheritdoc */
@throws[StreamingQueryException]
def awaitAnyTermination(): Unit = {
executeManagerCmd(_.getAwaitAnyTerminationBuilder.build())
}
- /**
- * Wait until any of the queries on the associated SQLContext has terminated
since the creation
- * of the context, or since `resetTerminated()` was called. Returns whether
any query has
- * terminated or not (multiple may have terminated). If any query has
terminated with an
- * exception, then the exception will be thrown.
- *
- * If a query has terminated, then subsequent calls to
`awaitAnyTermination()` will either
- * return `true` immediately (if the query was terminated by
`query.stop()`), or throw the
- * exception immediately (if the query was terminated with exception). Use
`resetTerminated()`
- * to clear past terminations and wait for new terminations.
- *
- * In the case where multiple queries have terminated since
`resetTermination()` was called, if
- * any query has terminated with exception, then `awaitAnyTermination()`
will throw any of the
- * exception. For correctly documenting exceptions across multiple queries,
users need to stop
- * all of them after any of them terminates with exception, and then check
the
- * `query.exception()` for each query.
- *
- * @throws StreamingQueryException
- * if any query has terminated with an exception
- * @since 3.5.0
- */
+ /** @inheritdoc */
@throws[StreamingQueryException]
def awaitAnyTermination(timeoutMs: Long): Boolean = {
require(timeoutMs > 0, "Timeout has to be positive")
@@ -139,40 +90,22 @@ class StreamingQueryManager private[sql] (sparkSession:
SparkSession) extends Lo
timeoutMs)).getAwaitAnyTermination.getTerminated
}
- /**
- * Forget about past terminated queries so that `awaitAnyTermination()` can
be used again to
- * wait for new terminations.
- *
- * @since 3.5.0
- */
+ /** @inheritdoc */
def resetTerminated(): Unit = {
executeManagerCmd(_.setResetTerminated(true))
}
- /**
- * Register a [[StreamingQueryListener]] to receive up-calls for life cycle
events of
- * [[StreamingQuery]].
- *
- * @since 3.5.0
- */
+ /** @inheritdoc */
def addListener(listener: StreamingQueryListener): Unit = {
streamingQueryListenerBus.append(listener)
}
- /**
- * Deregister a [[StreamingQueryListener]].
- *
- * @since 3.5.0
- */
+ /** @inheritdoc */
def removeListener(listener: StreamingQueryListener): Unit = {
streamingQueryListenerBus.remove(listener)
}
- /**
- * List all [[StreamingQueryListener]]s attached to this
[[StreamingQueryManager]].
- *
- * @since 3.5.0
- */
+ /** @inheritdoc */
def listListeners(): Array[StreamingQueryListener] = {
streamingQueryListenerBus.list()
}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
index 0f73a94c3c4a..4dfeb87a11d9 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
@@ -25,7 +25,7 @@ import _root_.java.lang
import _root_.java.net.URI
import _root_.java.util
-import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable}
+import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable,
Unstable}
import org.apache.spark.sql.{Encoder, Row, RuntimeConfig}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SparkClassUtils
@@ -93,6 +93,15 @@ abstract class SparkSession extends Serializable with
Closeable {
*/
def udf: UDFRegistration
+ /**
+ * Returns a `StreamingQueryManager` that allows managing all the
`StreamingQuery`s active on
+ * `this`.
+ *
+ * @since 2.0.0
+ */
+ @Unstable
+ def streams: StreamingQueryManager
+
/**
* Start a new session with isolated SQL configurations, temporary tables,
registered functions
* are isolated, but sharing the underlying `SparkContext` and cached data.
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/api/StreamingQueryManager.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/StreamingQueryManager.scala
new file mode 100644
index 000000000000..88ba9a493d06
--- /dev/null
+++
b/sql/api/src/main/scala/org/apache/spark/sql/api/StreamingQueryManager.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.api
+
+import _root_.java.util.UUID
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.streaming.{StreamingQueryException,
StreamingQueryListener}
+
+/**
+ * A class to manage all the [[StreamingQuery]] active in a `SparkSession`.
+ *
+ * @since 2.0.0
+ */
+@Evolving
+abstract class StreamingQueryManager {
+
+ /**
+ * Returns a list of active queries associated with this SQLContext
+ *
+ * @since 2.0.0
+ */
+ def active: Array[_ <: StreamingQuery]
+
+ /**
+ * Returns the query if there is an active query with the given id, or null.
+ *
+ * @since 2.1.0
+ */
+ def get(id: UUID): StreamingQuery
+
+ /**
+ * Returns the query if there is an active query with the given id, or null.
+ *
+ * @since 2.1.0
+ */
+ def get(id: String): StreamingQuery
+
+ /**
+ * Wait until any of the queries on the associated SQLContext has terminated
since the creation
+ * of the context, or since `resetTerminated()` was called. If any query was
terminated with an
+ * exception, then the exception will be thrown.
+ *
+ * If a query has terminated, then subsequent calls to
`awaitAnyTermination()` will either
+ * return immediately (if the query was terminated by `query.stop()`), or
throw the exception
+ * immediately (if the query was terminated with exception). Use
`resetTerminated()` to clear
+ * past terminations and wait for new terminations.
+ *
+ * In the case where multiple queries have terminated since
`resetTermination()` was called, if
+ * any query has terminated with exception, then `awaitAnyTermination()`
will throw any of the
+ * exception. For correctly documenting exceptions across multiple queries,
users need to stop
+ * all of them after any of them terminates with exception, and then check
the
+ * `query.exception()` for each query.
+ *
+ * @throws org.apache.spark.sql.streaming.StreamingQueryException
+ * if any query has terminated with an exception
+ * @since 2.0.0
+ */
+ @throws[StreamingQueryException]
+ def awaitAnyTermination(): Unit
+
+ /**
+ * Wait until any of the queries on the associated SQLContext has terminated
since the creation
+ * of the context, or since `resetTerminated()` was called. Returns whether
any query has
+ * terminated or not (multiple may have terminated). If any query has
terminated with an
+ * exception, then the exception will be thrown.
+ *
+ * If a query has terminated, then subsequent calls to
`awaitAnyTermination()` will either
+ * return `true` immediately (if the query was terminated by
`query.stop()`), or throw the
+ * exception immediately (if the query was terminated with exception). Use
`resetTerminated()`
+ * to clear past terminations and wait for new terminations.
+ *
+ * In the case where multiple queries have terminated since
`resetTermination()` was called, if
+ * any query has terminated with exception, then `awaitAnyTermination()`
will throw any of the
+ * exception. For correctly documenting exceptions across multiple queries,
users need to stop
+ * all of them after any of them terminates with exception, and then check
the
+ * `query.exception()` for each query.
+ *
+ * @throws org.apache.spark.sql.streaming.StreamingQueryException
+ * if any query has terminated with an exception
+ * @since 2.0.0
+ */
+ @throws[StreamingQueryException]
+ def awaitAnyTermination(timeoutMs: Long): Boolean
+
+ /**
+ * Forget about past terminated queries so that `awaitAnyTermination()` can
be used again to
+ * wait for new terminations.
+ *
+ * @since 2.0.0
+ */
+ def resetTerminated(): Unit
+
+ /**
+ * Register a [[org.apache.spark.sql.streaming.StreamingQueryListener]] to
receive up-calls for
+ * life cycle events of [[StreamingQuery]].
+ *
+ * @since 2.0.0
+ */
+ def addListener(listener: StreamingQueryListener): Unit
+
+ /**
+ * Deregister a [[org.apache.spark.sql.streaming.StreamingQueryListener]].
+ *
+ * @since 2.0.0
+ */
+ def removeListener(listener: StreamingQueryListener): Unit
+
+ /**
+ * List all [[org.apache.spark.sql.streaming.StreamingQueryListener]]s
attached to this
+ * [[StreamingQueryManager]].
+ *
+ * @since 3.0.0
+ */
+ def listListeners(): Array[StreamingQueryListener]
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 983cc24718fd..eeb46fbf145d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -229,12 +229,7 @@ class SparkSession private(
@Unstable
def dataSource: DataSourceRegistration = sessionState.dataSourceRegistration
- /**
- * Returns a `StreamingQueryManager` that allows managing all the
- * `StreamingQuery`s active on `this`.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
@Unstable
def streams: StreamingQueryManager = sessionState.streamingQueryManager
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 9d6fd2e28dea..42f6d04466b0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{CLASS_NAME, QUERY_ID, RUN_ID}
-import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.{api, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.streaming.{WriteToStream,
WriteToStreamStatement}
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite,
Table, TableCatalog}
@@ -47,7 +47,9 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
@Evolving
class StreamingQueryManager private[sql] (
sparkSession: SparkSession,
- sqlConf: SQLConf) extends Logging {
+ sqlConf: SQLConf)
+ extends api.StreamingQueryManager
+ with Logging {
private[sql] val stateStoreCoordinator =
StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
@@ -70,7 +72,7 @@ class StreamingQueryManager private[sql] (
* failed. The exception is the exception of the last failed query.
*/
@GuardedBy("awaitTerminationLock")
- private var lastTerminatedQueryException: Option[StreamingQueryException] =
null
+ private var lastTerminatedQueryException: Option[StreamingQueryException] = _
try {
sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach {
classNames =>
@@ -90,51 +92,20 @@ class StreamingQueryManager private[sql] (
throw QueryExecutionErrors.registeringStreamingQueryListenerError(e)
}
- /**
- * Returns a list of active queries associated with this SQLContext
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def active: Array[StreamingQuery] = activeQueriesSharedLock.synchronized {
activeQueries.values.toArray
}
- /**
- * Returns the query if there is an active query with the given id, or null.
- *
- * @since 2.1.0
- */
+ /** @inheritdoc */
def get(id: UUID): StreamingQuery = activeQueriesSharedLock.synchronized {
activeQueries.get(id).orNull
}
- /**
- * Returns the query if there is an active query with the given id, or null.
- *
- * @since 2.1.0
- */
+ /** @inheritdoc */
def get(id: String): StreamingQuery = get(UUID.fromString(id))
- /**
- * Wait until any of the queries on the associated SQLContext has terminated
since the
- * creation of the context, or since `resetTerminated()` was called. If any
query was terminated
- * with an exception, then the exception will be thrown.
- *
- * If a query has terminated, then subsequent calls to
`awaitAnyTermination()` will either
- * return immediately (if the query was terminated by `query.stop()`),
- * or throw the exception immediately (if the query was terminated with
exception). Use
- * `resetTerminated()` to clear past terminations and wait for new
terminations.
- *
- * In the case where multiple queries have terminated since
`resetTermination()` was called,
- * if any query has terminated with exception, then `awaitAnyTermination()`
will
- * throw any of the exception. For correctly documenting exceptions across
multiple queries,
- * users need to stop all of them after any of them terminates with
exception, and then check the
- * `query.exception()` for each query.
- *
- * @throws StreamingQueryException if any query has terminated with an
exception
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
@throws[StreamingQueryException]
def awaitAnyTermination(): Unit = {
awaitTerminationLock.synchronized {
@@ -147,27 +118,7 @@ class StreamingQueryManager private[sql] (
}
}
- /**
- * Wait until any of the queries on the associated SQLContext has terminated
since the
- * creation of the context, or since `resetTerminated()` was called. Returns
whether any query
- * has terminated or not (multiple may have terminated). If any query has
terminated with an
- * exception, then the exception will be thrown.
- *
- * If a query has terminated, then subsequent calls to
`awaitAnyTermination()` will either
- * return `true` immediately (if the query was terminated by `query.stop()`),
- * or throw the exception immediately (if the query was terminated with
exception). Use
- * `resetTerminated()` to clear past terminations and wait for new
terminations.
- *
- * In the case where multiple queries have terminated since
`resetTermination()` was called,
- * if any query has terminated with exception, then `awaitAnyTermination()`
will
- * throw any of the exception. For correctly documenting exceptions across
multiple queries,
- * users need to stop all of them after any of them terminates with
exception, and then check the
- * `query.exception()` for each query.
- *
- * @throws StreamingQueryException if any query has terminated with an
exception
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
@throws[StreamingQueryException]
def awaitAnyTermination(timeoutMs: Long): Boolean = {
@@ -187,42 +138,24 @@ class StreamingQueryManager private[sql] (
}
}
- /**
- * Forget about past terminated queries so that `awaitAnyTermination()` can
be used again to
- * wait for new terminations.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def resetTerminated(): Unit = {
awaitTerminationLock.synchronized {
lastTerminatedQueryException = null
}
}
- /**
- * Register a [[StreamingQueryListener]] to receive up-calls for life cycle
events of
- * [[StreamingQuery]].
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def addListener(listener: StreamingQueryListener): Unit = {
listenerBus.addListener(listener)
}
- /**
- * Deregister a [[StreamingQueryListener]].
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def removeListener(listener: StreamingQueryListener): Unit = {
listenerBus.removeListener(listener)
}
- /**
- * List all [[StreamingQueryListener]]s attached to this
[[StreamingQueryManager]].
- *
- * @since 3.0.0
- */
+ /** @inheritdoc */
def listListeners(): Array[StreamingQueryListener] = {
listenerBus.listeners.asScala.toArray
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]