This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 26527c43e65 [SPARK-43299][SS][CONNECT] Convert StreamingQueryException
in Scala Client
26527c43e65 is described below
commit 26527c43e652718c6d6be8f2ae2f92e835e1b328
Author: Yihong He <[email protected]>
AuthorDate: Tue Oct 10 08:41:07 2023 +0900
[SPARK-43299][SS][CONNECT] Convert StreamingQueryException in Scala Client
### What changes were proposed in this pull request?
- Convert StreamingQueryException in Scala Client
- Move StreamingQueryException to common/utils module
- Implement (message, cause) constructor and getMessage() for
StreamingQueryException
- Get StreamingQueryException in StreamingQuery.exception by throw instead
of GRPC response
### Why are the changes needed?
- Compatibility with the existing behavior
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Existing tests
### Was this patch authored or co-authored using generative AI tooling?
Closes #42859 from heyihong/SPARK-43299.
Authored-by: Yihong He <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/streaming/StreamingQueryException.scala | 10 +++++
.../spark/sql/streaming/StreamingQuery.scala | 18 ++++-----
.../sql/streaming/StreamingQueryException.scala | 47 ----------------------
.../CheckConnectJvmClientCompatibility.scala | 12 ------
.../sql/streaming/ClientStreamingQuerySuite.scala | 4 +-
.../connect/client/GrpcExceptionConverter.scala | 7 ++++
.../sql/connect/planner/SparkConnectPlanner.scala | 6 +++
project/MimaExcludes.scala | 3 ++
8 files changed, 37 insertions(+), 70 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
similarity index 89%
rename from
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
rename to
common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index 738c79769bb..77415fb4759 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++
b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -42,6 +42,14 @@ class StreamingQueryException private[sql](
messageParameters: Map[String, String])
extends Exception(message, cause) with SparkThrowable {
+ private[spark] def this(
+ message: String,
+ cause: Throwable,
+ errorClass: String,
+ messageParameters: Map[String, String]) = {
+ this("", message, cause, null, null, errorClass, messageParameters)
+ }
+
def this(
queryDebugString: String,
cause: Throwable,
@@ -62,6 +70,8 @@ class StreamingQueryException private[sql](
/** Time when the exception occurred */
val time: Long = System.currentTimeMillis
+ override def getMessage: String = s"${message}\n${queryDebugString}"
+
override def toString(): String =
s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage}
|$queryDebugString""".stripMargin
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index a48367b468d..48ef0a907b5 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -242,17 +242,15 @@ class RemoteStreamingQuery(
}
override def exception: Option[StreamingQueryException] = {
- val exception = executeQueryCmd(_.setException(true)).getException
- if (exception.hasExceptionMessage) {
- Some(
- new StreamingQueryException(
- // message maps to the return value of original
StreamingQueryException's toString method
- message = exception.getExceptionMessage,
- errorClass = exception.getErrorClass,
- stackTrace = exception.getStackTrace))
- } else {
- None
+ try {
+ // When exception field is set to false, the server throws a
StreamingQueryException
+ // to the client.
+ executeQueryCmd(_.setException(false))
+ } catch {
+ case e: StreamingQueryException => return Some(e)
}
+
+ None
}
private def executeQueryCmd(
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
deleted file mode 100644
index 512c94f5c70..00000000000
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.apache.spark.SparkThrowable
-import org.apache.spark.annotation.Evolving
-
-/**
- * Exception that stopped a [[StreamingQuery]] in Spark Connect. Currently not
all fields in the
- * original StreamingQueryException are supported.
- * @param message
- * Maps to return value of original StreamingQueryException's toString method
- * @param errorClass
- * Error class of this exception
- * @param stackTrace
- * Stack trace of this exception
- * @since 3.5.0
- */
-@Evolving
-class StreamingQueryException private[sql] (
- message: String,
- errorClass: String,
- stackTrace: String)
- extends Exception(message)
- with SparkThrowable {
-
- override def getErrorClass: String = errorClass
-
- override def toString: String = s"""$message
- |JVM stacktrace: $stackTrace
- |""".stripMargin
-}
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index a6cd20aff68..785e1fa4017 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -254,18 +254,6 @@ object CheckConnectJvmClientCompatibility {
"org.apache.spark.sql.streaming.DataStreamWriter.SOURCE*" // These are
constant vals.
),
- // StreamingQueryException
- ProblemFilters.exclude[DirectMissingMethodProblem](
- "org.apache.spark.sql.streaming.StreamingQueryException.message"),
- ProblemFilters.exclude[DirectMissingMethodProblem](
- "org.apache.spark.sql.streaming.StreamingQueryException.cause"),
- ProblemFilters.exclude[DirectMissingMethodProblem](
- "org.apache.spark.sql.streaming.StreamingQueryException.startOffset"),
- ProblemFilters.exclude[DirectMissingMethodProblem](
- "org.apache.spark.sql.streaming.StreamingQueryException.endOffset"),
- ProblemFilters.exclude[DirectMissingMethodProblem](
- "org.apache.spark.sql.streaming.StreamingQueryException.time"),
-
// Classes missing from streaming API
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.streaming.TestGroupState"),
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
index 8bb83a1ac25..f995a0c3d94 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
@@ -199,10 +199,12 @@ class ClientStreamingQuerySuite extends QueryTest with
SQLHelper with Logging {
.format("console")
.start()
- val exception = intercept[SparkException] {
+ val exception = intercept[StreamingQueryException] {
query.awaitTermination()
}
+ assert(exception.getErrorClass != null)
+ assert(!exception.getMessageParameters.isEmpty)
assert(exception.getCause.isInstanceOf[SparkException])
assert(exception.getCause.getCause.isInstanceOf[SparkException])
assert(exception.getCause.getCause.getCause.isInstanceOf[SparkException])
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index 1d608bdf03c..85a523a1372 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.AnalysisException
import
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException,
NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException,
TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.trees.Origin
+import org.apache.spark.sql.streaming.StreamingQueryException
/**
* GrpcExceptionConverter handles the conversion of StatusRuntimeExceptions
into Spark exceptions.
@@ -178,6 +179,12 @@ private object GrpcExceptionConverter {
}
private val errorFactory = Map(
+ errorConstructor(params =>
+ new StreamingQueryException(
+ params.message,
+ params.cause.orNull,
+ params.errorClass.orNull,
+ params.messageParameters)),
errorConstructor(params =>
new ParseException(
None,
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 52c61c2cbe9..eead5cb38ad 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -3076,6 +3076,12 @@ class SparkConnectPlanner(val sessionHolder:
SessionHolder) extends Logging {
case StreamingQueryCommand.CommandCase.EXCEPTION =>
val result = query.exception
if (result.isDefined) {
+ // Throw StreamingQueryException directly and rely on error
translation on the
+ // client-side to reconstruct the exception. Keep the remaining
implementation
+ // for backward-compatibility
+ if (!command.getException) {
+ throw result.get
+ }
val e = result.get
val exception_builder = StreamingQueryCommandResult.ExceptionResult
.newBuilder()
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 52440ca7d17..691425cfc5d 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -74,6 +74,9 @@ object MimaExcludes {
ProblemFilters.exclude[Problem]("org.sparkproject.spark_protobuf.protobuf.*"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.protobuf.utils.SchemaConverters.*"),
+ // SPARK-43299: Convert StreamingQueryException in Scala Client
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryException"),
+
(problem: Problem) => problem match {
case MissingClassProblem(cls) =>
!cls.fullName.startsWith("org.sparkproject.jpmml") &&
!cls.fullName.startsWith("org.sparkproject.dmg.pmml")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]