This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 466a2155ebf [SPARK-43134][CONNECT][SS] JVM client StreamingQuery
exception() API
466a2155ebf is described below
commit 466a2155ebf8507cecc297a198cb990cd3d431f2
Author: Wei Liu <[email protected]>
AuthorDate: Wed Apr 26 21:58:21 2023 +0900
[SPARK-43134][CONNECT][SS] JVM client StreamingQuery exception() API
### What changes were proposed in this pull request?
Add StreamingQuery exception() API for JVM client
### Why are the changes needed?
Development of SS Connect
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Manual test:
```
Spark session available as 'spark'.
_____ __ ______ __
/ ___/____ ____ ______/ /__ / ____/___ ____ ____ ___ _____/ /_
\__ \/ __ \/ __ `/ ___/ //_/ / / / __ \/ __ \/ __ \/ _ \/ ___/ __/
___/ / /_/ / /_/ / / / ,< / /___/ /_/ / / / / / / / __/ /__/ /_
/____/ .___/\__,_/_/ /_/|_| \____/\____/_/ /_/_/ /_/\___/\___/\__/
/_/
val q =
spark.readStream.format("rate").load().writeStream.option("checkpointLocation",
"/home/wei.liu/ckpt").toTable("my_table")
q: org.apache.spark.sql.streaming.StreamingQuery =
org.apache.spark.sql.streaming.RemoteStreamingQuery772f3a3f
q.exception
res1: Option[org.apache.spark.sql.streaming.StreamingQueryException] = None
q.stop()
```
Closes #40906 from WweiL/SPARK-43134-scala-exception.
Authored-by: Wei Liu <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../spark/sql/streaming/StreamingQuery.scala | 20 ++++++++++++
.../sql/streaming/StreamingQueryException.scala | 38 ++++++++++++++++++++++
.../CheckConnectJvmClientCompatibility.scala | 3 --
.../src/main/protobuf/spark/connect/commands.proto | 6 +++-
.../sql/connect/planner/SparkConnectPlanner.scala | 3 +-
.../sql/connect/service/SparkConnectService.scala | 4 ++-
python/pyspark/sql/connect/proto/commands_pb2.py | 24 +++++++-------
python/pyspark/sql/connect/proto/commands_pb2.pyi | 23 ++++++++++++-
python/pyspark/sql/connect/streaming/query.py | 6 +++-
9 files changed, 107 insertions(+), 20 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index 8bb35382162..a1bd8e264cc 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -78,6 +78,12 @@ trait StreamingQuery {
*/
def isActive: Boolean
+ /**
+ * Returns the [[StreamingQueryException]] if the query was terminated by an
exception.
+ * @since 3.5.0
+ */
+ def exception: Option[StreamingQueryException]
+
/**
* Returns the current status of the query.
*
@@ -199,6 +205,20 @@ class RemoteStreamingQuery(
// scalastyle:on println
}
+ override def exception: Option[StreamingQueryException] = {
+ val exception = executeQueryCmd(_.setException(true)).getException
+ if (exception.hasExceptionMessage) {
+ // TODO(SPARK-43206): Add more information to StreamingQueryException.
+ Some(
+ new StreamingQueryException(
+ // message maps to the return value of original
StreamingQueryException's toString method
+ message = exception.getExceptionMessage,
+ errorClass = exception.getErrorClass))
+ } else {
+ None
+ }
+ }
+
private def executeQueryCmd(
setCmdFn: StreamingQueryCommand.Builder => Unit // Sets the command
field, like stop().
): StreamingQueryCommandResult = {
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
new file mode 100644
index 00000000000..875c216a3e7
--- /dev/null
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkThrowable
+import org.apache.spark.annotation.Evolving
+
+/**
+ * Exception that stopped a [[StreamingQuery]] in Spark Connect. Currently not
all fields in the
+ * original StreamingQueryException are supported.
+ * @param message
+ * Maps to return value of original StreamingQueryException's toString method
+ * @param errorClass
+ * Error class of this exception
+ * @since 3.5.0
+ */
+@Evolving
+class StreamingQueryException private[sql] (message: String, errorClass:
String)
+ extends SparkThrowable {
+
+ // TODO(SPARK-43206): Add stack trace
+ override def getErrorClass: String = errorClass
+}
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 1b8aacebc54..c71017bb271 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -247,9 +247,6 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[Problem](
"org.apache.spark.sql.streaming.StreamingQuery.awaitTermination" //
TODO(SPARK-43143)
),
- ProblemFilters.exclude[Problem](
- "org.apache.spark.sql.streaming.StreamingQuery.exception" //
TODO(SPARK-43134)
- ),
ProblemFilters.exclude[Problem](
"org.apache.spark.sql.streaming.StreamingQueryProgress.*" //
TODO(SPARK-43128)
),
diff --git
a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
index 2b648bf0f9a..0d6c29da9f8 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
@@ -308,8 +308,12 @@ message StreamingQueryCommandResult {
}
message ExceptionResult {
- // Exception message as string
+ // (Optional) Exception message as string, maps to the return value of
original
+ // StreamingQueryException's toString method
optional string exception_message = 1;
+ // (Optional) Exception error class as string
+ optional string error_class = 2;
+ // TODO(SPARK-43206): Add stack trace
}
message AwaitTerminationResult {
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 11c02c72187..e7de15f62f9 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -2328,7 +2328,8 @@ class SparkConnectPlanner(val session: SparkSession) {
val result = query.exception
result.foreach(e =>
respBuilder.getExceptionBuilder
- .setExceptionMessage(SparkConnectService.extractErrorMessage(e)))
+
.setExceptionMessage(SparkConnectService.abbreviateErrorMessage(e.toString))
+ .setErrorClass(e.getErrorClass))
case StreamingQueryCommand.CommandCase.AWAIT_TERMINATION =>
if (command.getAwaitTermination.hasTimeoutMs) {
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index b894f30990c..09a3ff39698 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -334,8 +334,10 @@ object SparkConnectService {
}
}
+ def abbreviateErrorMessage(msg: String): String =
StringUtils.abbreviate(msg, 2048)
+
def extractErrorMessage(st: Throwable): String = {
- val message = StringUtils.abbreviate(st.getMessage, 2048)
+ val message = abbreviateErrorMessage(st.getMessage)
if (message != null) {
message
} else {
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py
b/python/pyspark/sql/connect/proto/commands_pb2.py
index 27de95a7aaa..73575fbed85 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.py
+++ b/python/pyspark/sql/connect/proto/commands_pb2.py
@@ -36,7 +36,7 @@ from pyspark.sql.connect.proto import relations_pb2 as
spark_dot_connect_dot_rel
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x90\x06\n\x07\x43ommand\x12]\n\x11register_function\x18\x01
\x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02
\x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x
[...]
+
b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x90\x06\n\x07\x43ommand\x12]\n\x11register_function\x18\x01
\x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02
\x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x
[...]
)
@@ -435,21 +435,21 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_start = 4539
_STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_end = 4615
_STREAMINGQUERYCOMMANDRESULT._serialized_start = 4629
- _STREAMINGQUERYCOMMANDRESULT._serialized_end = 5661
+ _STREAMINGQUERYCOMMANDRESULT._serialized_end = 5716
_STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_start = 5212
_STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_end = 5382
_STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT._serialized_start = 5384
_STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT._serialized_end = 5456
_STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_start = 5458
_STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_end = 5497
- _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_start = 5499
- _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_end = 5588
- _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_start =
5590
- _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_end = 5646
- _GETRESOURCESCOMMAND._serialized_start = 5663
- _GETRESOURCESCOMMAND._serialized_end = 5684
- _GETRESOURCESCOMMANDRESULT._serialized_start = 5687
- _GETRESOURCESCOMMANDRESULT._serialized_end = 5899
- _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 5803
- _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 5899
+ _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_start = 5500
+ _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_end = 5643
+ _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_start =
5645
+ _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_end = 5701
+ _GETRESOURCESCOMMAND._serialized_start = 5718
+ _GETRESOURCESCOMMAND._serialized_end = 5739
+ _GETRESOURCESCOMMANDRESULT._serialized_start = 5742
+ _GETRESOURCESCOMMANDRESULT._serialized_end = 5954
+ _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 5858
+ _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 5954
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi
b/python/pyspark/sql/connect/proto/commands_pb2.pyi
index 972fe7503a1..81856352167 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/commands_pb2.pyi
@@ -1087,18 +1087,30 @@ class
StreamingQueryCommandResult(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
EXCEPTION_MESSAGE_FIELD_NUMBER: builtins.int
+ ERROR_CLASS_FIELD_NUMBER: builtins.int
exception_message: builtins.str
- """Exception message as string"""
+ """(Optional) Exception message as string, maps to the return value of
original
+ StreamingQueryException's toString method
+ """
+ error_class: builtins.str
+ """(Optional) Exception error class as string
+ TODO(SPARK-43206): Add stack trace
+ """
def __init__(
self,
*,
exception_message: builtins.str | None = ...,
+ error_class: builtins.str | None = ...,
) -> None: ...
def HasField(
self,
field_name: typing_extensions.Literal[
+ "_error_class",
+ b"_error_class",
"_exception_message",
b"_exception_message",
+ "error_class",
+ b"error_class",
"exception_message",
b"exception_message",
],
@@ -1106,12 +1118,21 @@ class
StreamingQueryCommandResult(google.protobuf.message.Message):
def ClearField(
self,
field_name: typing_extensions.Literal[
+ "_error_class",
+ b"_error_class",
"_exception_message",
b"_exception_message",
+ "error_class",
+ b"error_class",
"exception_message",
b"exception_message",
],
) -> None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_error_class",
b"_error_class"]
+ ) -> typing_extensions.Literal["error_class"] | None: ...
+ @typing.overload
def WhichOneof(
self,
oneof_group: typing_extensions.Literal["_exception_message",
b"_exception_message"],
diff --git a/python/pyspark/sql/connect/streaming/query.py
b/python/pyspark/sql/connect/streaming/query.py
index eb196971985..fc207243ff3 100644
--- a/python/pyspark/sql/connect/streaming/query.py
+++ b/python/pyspark/sql/connect/streaming/query.py
@@ -148,7 +148,11 @@ class StreamingQuery:
cmd.exception = True
exception = self._execute_streaming_query_cmd(cmd).exception
if exception.HasField("exception_message"):
- return CapturedStreamingQueryException(exception.exception_message)
+ # Drop the Java StreamingQueryException type info
+ # exception_message maps to the return value of original
+ # StreamingQueryException's toString method
+ msg = exception.exception_message.split(": ", 1)[1]
+ return CapturedStreamingQueryException(msg)
else:
return None
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]