This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2e46a8616fb [SPARK-45347][SQL][CONNECT] Include SparkThrowable in
FetchErrorDetailsResponse
2e46a8616fb is described below
commit 2e46a8616fbfc1d11949daa539ba9fdba8e438b8
Author: Yihong He <[email protected]>
AuthorDate: Wed Oct 4 08:33:52 2023 +0900
[SPARK-45347][SQL][CONNECT] Include SparkThrowable in
FetchErrorDetailsResponse
### What changes were proposed in this pull request?
- Include SparkThrowable in FetchErrorDetailsResponse
- Reconstruct server exceptions with SparkThrowable on the client side
### Why are the changes needed?
- Better integration with the error framework
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
`build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite"`
### Was this patch authored or co-authored using generative AI tooling?
Closes #43136 from heyihong/SPARK-45347.
Authored-by: Yihong He <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/sql/ClientE2ETestSuite.scala | 33 +++++++--
.../src/main/protobuf/spark/connect/base.proto | 12 +++
.../connect/client/GrpcExceptionConverter.scala | 86 +++++++++++++++-------
.../spark/sql/connect/utils/ErrorUtils.scala | 12 +++
python/pyspark/sql/connect/proto/base_pb2.py | 18 +++--
python/pyspark/sql/connect/proto/base_pb2.pyi | 81 +++++++++++++++++++-
6 files changed, 202 insertions(+), 40 deletions(-)
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 85d98babcf9..6d825f22b35 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -106,8 +106,14 @@ class ClientE2ETestSuite extends RemoteSparkSession with
SQLHelper with PrivateM
Seq("1").toDS.withColumn("udf_val", throwException($"value")).collect()
}
+ assert(ex.getErrorClass != null)
+ assert(!ex.getMessageParameters.isEmpty)
assert(ex.getCause.isInstanceOf[SparkException])
- assert(ex.getCause.getMessage.contains("test" * 10000))
+
+ val cause = ex.getCause.asInstanceOf[SparkException]
+ assert(cause.getErrorClass == null)
+ assert(cause.getMessageParameters.isEmpty)
+ assert(cause.getMessage.contains("test" * 10000))
}
}
@@ -119,6 +125,10 @@ class ClientE2ETestSuite extends RemoteSparkSession with
SQLHelper with PrivateM
val ex = intercept[AnalysisException] {
spark.sql("select x").collect()
}
+ assert(ex.getErrorClass != null)
+ assert(!ex.messageParameters.isEmpty)
+ assert(ex.getSqlState != null)
+ assert(!ex.isInternalError)
assert(
ex.getStackTrace
.find(_.getClassName.contains("org.apache.spark.sql.catalyst.analysis.CheckAnalysis"))
@@ -137,23 +147,26 @@ class ClientE2ETestSuite extends RemoteSparkSession with
SQLHelper with PrivateM
}
test("throw NoSuchDatabaseException") {
- intercept[NoSuchDatabaseException] {
+ val ex = intercept[NoSuchDatabaseException] {
spark.sql("use database123")
}
+ assert(ex.getErrorClass != null)
}
test("throw NoSuchTableException") {
- intercept[NoSuchTableException] {
+ val ex = intercept[NoSuchTableException] {
spark.catalog.getTable("test_table")
}
+ assert(ex.getErrorClass != null)
}
test("throw NamespaceAlreadyExistsException") {
try {
spark.sql("create database test_db")
- intercept[NamespaceAlreadyExistsException] {
+ val ex = intercept[NamespaceAlreadyExistsException] {
spark.sql("create database test_db")
}
+ assert(ex.getErrorClass != null)
} finally {
spark.sql("drop database test_db")
}
@@ -162,9 +175,10 @@ class ClientE2ETestSuite extends RemoteSparkSession with
SQLHelper with PrivateM
test("throw TempTableAlreadyExistsException") {
try {
spark.sql("create temporary view test_view as select 1")
- intercept[TempTableAlreadyExistsException] {
+ val ex = intercept[TempTableAlreadyExistsException] {
spark.sql("create temporary view test_view as select 1")
}
+ assert(ex.getErrorClass != null)
} finally {
spark.sql("drop view test_view")
}
@@ -173,16 +187,21 @@ class ClientE2ETestSuite extends RemoteSparkSession with
SQLHelper with PrivateM
test("throw TableAlreadyExistsException") {
withTable("testcat.test_table") {
spark.sql(s"create table testcat.test_table (id int)")
- intercept[TableAlreadyExistsException] {
+ val ex = intercept[TableAlreadyExistsException] {
spark.sql(s"create table testcat.test_table (id int)")
}
+ assert(ex.getErrorClass != null)
}
}
test("throw ParseException") {
- intercept[ParseException] {
+ val ex = intercept[ParseException] {
spark.sql("selet 1").collect()
}
+ assert(ex.getErrorClass != null)
+ assert(!ex.messageParameters.isEmpty)
+ assert(ex.getSqlState != null)
+ assert(!ex.isInternalError)
}
test("spark deep recursion") {
diff --git
a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index b30c578421c..65e5b18d59b 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -814,6 +814,15 @@ message FetchErrorDetailsResponse {
int32 line_number = 4;
}
+ // SparkThrowable defines the schema for SparkThrowable exceptions.
+ message SparkThrowable {
+ // Succinct, human-readable, unique, and consistent representation of the
error category.
+ optional string error_class = 1;
+
+ // message parameters for the error framework.
+ map<string, string> message_parameters = 2;
+ }
+
// Error defines the schema for the representing exception.
message Error {
// The fully qualified names of the exception class and its parent classes.
@@ -828,6 +837,9 @@ message FetchErrorDetailsResponse {
// The index of the cause error in errors.
optional int32 cause_idx = 4;
+
+ // The structured data of a SparkThrowable exception.
+ optional SparkThrowable spark_throwable = 5;
}
// The index of the root error in errors. The field will not be set if the
error is not found.
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index 0cc0fed52b0..1d608bdf03c 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -163,34 +163,58 @@ private[client] class GrpcExceptionConverter(grpcStub:
SparkConnectServiceBlocki
private object GrpcExceptionConverter {
+ private case class ErrorParams(
+ message: String,
+ cause: Option[Throwable],
+ // errorClass will only be set if the error is enriched and
SparkThrowable.
+ errorClass: Option[String],
+ // messageParameters will only be set if the error is enriched and
SparkThrowable.
+ messageParameters: Map[String, String])
+
private def errorConstructor[T <: Throwable: ClassTag](
- throwableCtr: (String, Option[Throwable]) => T)
- : (String, (String, Option[Throwable]) => Throwable) = {
+ throwableCtr: ErrorParams => T): (String, ErrorParams => Throwable) = {
val className = implicitly[reflect.ClassTag[T]].runtimeClass.getName
(className, throwableCtr)
}
private val errorFactory = Map(
- errorConstructor((message, _) => new ParseException(None, message,
Origin(), Origin())),
- errorConstructor((message, cause) => new AnalysisException(message, cause
= cause)),
- errorConstructor((message, _) => new
NamespaceAlreadyExistsException(message)),
- errorConstructor((message, cause) => new
TableAlreadyExistsException(message, cause)),
- errorConstructor((message, cause) => new
TempTableAlreadyExistsException(message, cause)),
- errorConstructor((message, cause) => new NoSuchDatabaseException(message,
cause)),
- errorConstructor((message, cause) => new NoSuchTableException(message,
cause)),
- errorConstructor[NumberFormatException]((message, _) =>
- new SparkNumberFormatException(message)),
- errorConstructor[IllegalArgumentException]((message, cause) =>
- new SparkIllegalArgumentException(message, cause)),
- errorConstructor[ArithmeticException]((message, _) => new
SparkArithmeticException(message)),
- errorConstructor[UnsupportedOperationException]((message, _) =>
- new SparkUnsupportedOperationException(message)),
- errorConstructor[ArrayIndexOutOfBoundsException]((message, _) =>
- new SparkArrayIndexOutOfBoundsException(message)),
- errorConstructor[DateTimeException]((message, _) => new
SparkDateTimeException(message)),
- errorConstructor((message, cause) => new SparkRuntimeException(message,
cause)),
- errorConstructor((message, cause) => new SparkUpgradeException(message,
cause)),
- errorConstructor((message, cause) => new SparkException(message,
cause.orNull)))
+ errorConstructor(params =>
+ new ParseException(
+ None,
+ params.message,
+ Origin(),
+ Origin(),
+ errorClass = params.errorClass,
+ messageParameters = params.messageParameters)),
+ errorConstructor(params =>
+ new AnalysisException(
+ params.message,
+ cause = params.cause,
+ errorClass = params.errorClass,
+ messageParameters = params.messageParameters)),
+ errorConstructor(params => new
NamespaceAlreadyExistsException(params.message)),
+ errorConstructor(params => new TableAlreadyExistsException(params.message,
params.cause)),
+ errorConstructor(params => new
TempTableAlreadyExistsException(params.message, params.cause)),
+ errorConstructor(params => new NoSuchDatabaseException(params.message,
params.cause)),
+ errorConstructor(params => new NoSuchTableException(params.message,
params.cause)),
+ errorConstructor[NumberFormatException](params =>
+ new SparkNumberFormatException(params.message)),
+ errorConstructor[IllegalArgumentException](params =>
+ new SparkIllegalArgumentException(params.message, params.cause)),
+ errorConstructor[ArithmeticException](params => new
SparkArithmeticException(params.message)),
+ errorConstructor[UnsupportedOperationException](params =>
+ new SparkUnsupportedOperationException(params.message)),
+ errorConstructor[ArrayIndexOutOfBoundsException](params =>
+ new SparkArrayIndexOutOfBoundsException(params.message)),
+ errorConstructor[DateTimeException](params => new
SparkDateTimeException(params.message)),
+ errorConstructor(params => new SparkRuntimeException(params.message,
params.cause)),
+ errorConstructor(params => new SparkUpgradeException(params.message,
params.cause)),
+ errorConstructor(params =>
+ new SparkException(
+ message = params.message,
+ cause = params.cause.orNull,
+ errorClass = params.errorClass,
+ messageParameters = params.messageParameters)))
/**
* errorsToThrowable reconstructs the exception based on a list of protobuf
messages
@@ -202,20 +226,30 @@ private object GrpcExceptionConverter {
errors: Seq[FetchErrorDetailsResponse.Error]): Throwable = {
val error = errors(errorIdx)
-
val classHierarchy = error.getErrorTypeHierarchyList.asScala
val constructor =
classHierarchy
.flatMap(errorFactory.get)
.headOption
- .getOrElse((message: String, cause: Option[Throwable]) =>
- new SparkException(s"${classHierarchy.head}: ${message}",
cause.orNull))
+ .getOrElse((params: ErrorParams) =>
+ errorFactory
+ .get(classOf[SparkException].getName)
+ .get(params.copy(message = s"${classHierarchy.head}:
${params.message}")))
val causeOpt =
if (error.hasCauseIdx) Some(errorsToThrowable(error.getCauseIdx,
errors)) else None
- val exception = constructor(error.getMessage, causeOpt)
+ val exception = constructor(
+ ErrorParams(
+ message = error.getMessage,
+ cause = causeOpt,
+ errorClass = if (error.hasSparkThrowable &&
error.getSparkThrowable.hasErrorClass) {
+ Some(error.getSparkThrowable.getErrorClass)
+ } else None,
+ messageParameters = if (error.hasSparkThrowable) {
+ error.getSparkThrowable.getMessageParametersMap.asScala.toMap
+ } else Map.empty))
if (!error.getStackTraceList.isEmpty) {
exception.setStackTrace(error.getStackTraceList.asScala.toArray.map {
stackTraceElement =>
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
index 78c1f723c90..6e905895236 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
@@ -108,6 +108,18 @@ private[connect] object ErrorUtils extends Logging {
.asJava)
}
+ currentError match {
+ case sparkThrowable: SparkThrowable =>
+ val sparkThrowableBuilder = FetchErrorDetailsResponse.SparkThrowable
+ .newBuilder()
+ if (sparkThrowable.getErrorClass != null) {
+ sparkThrowableBuilder.setErrorClass(sparkThrowable.getErrorClass)
+ }
+
sparkThrowableBuilder.putAllMessageParameters(sparkThrowable.getMessageParameters)
+ builder.setSparkThrowable(sparkThrowableBuilder.build())
+ case _ =>
+ }
+
val causeIdx = buffer.size + 1
if (causeIdx < MAX_ERROR_CHAIN_LENGTH && currentError.getCause != null) {
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py
b/python/pyspark/sql/connect/proto/base_pb2.py
index 0cc8085763c..19843e39330 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -37,7 +37,7 @@ from pyspark.sql.connect.proto import types_pb2 as
spark_dot_connect_dot_types__
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01
\x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02
\x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17
[...]
+
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01
\x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02
\x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17
[...]
)
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -51,6 +51,8 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_options
= b"8\001"
_ARTIFACTSTATUSESRESPONSE_STATUSESENTRY._options = None
_ARTIFACTSTATUSESRESPONSE_STATUSESENTRY._serialized_options = b"8\001"
+ _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._options
= None
+
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_options
= b"8\001"
_PLAN._serialized_start = 219
_PLAN._serialized_end = 335
_USERCONTEXT._serialized_start = 337
@@ -200,11 +202,15 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_FETCHERRORDETAILSREQUEST._serialized_start = 11301
_FETCHERRORDETAILSREQUEST._serialized_end = 11502
_FETCHERRORDETAILSRESPONSE._serialized_start = 11505
- _FETCHERRORDETAILSRESPONSE._serialized_end = 12070
+ _FETCHERRORDETAILSRESPONSE._serialized_end = 12463
_FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_start = 11650
_FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_end = 11824
- _FETCHERRORDETAILSRESPONSE_ERROR._serialized_start = 11827
- _FETCHERRORDETAILSRESPONSE_ERROR._serialized_end = 12051
- _SPARKCONNECTSERVICE._serialized_start = 12073
- _SPARKCONNECTSERVICE._serialized_end = 12922
+ _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_start = 11827
+ _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_end = 12094
+
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_start
= 12010
+
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_end
= 12078
+ _FETCHERRORDETAILSRESPONSE_ERROR._serialized_start = 12097
+ _FETCHERRORDETAILSRESPONSE_ERROR._serialized_end = 12444
+ _SPARKCONNECTSERVICE._serialized_start = 12466
+ _SPARKCONNECTSERVICE._serialized_end = 13315
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi
b/python/pyspark/sql/connect/proto/base_pb2.pyi
index 6320ec7bb56..636fed63125 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -2844,6 +2844,64 @@ class
FetchErrorDetailsResponse(google.protobuf.message.Message):
self, oneof_group: typing_extensions.Literal["_file_name",
b"_file_name"]
) -> typing_extensions.Literal["file_name"] | None: ...
+ class SparkThrowable(google.protobuf.message.Message):
+ """SparkThrowable defines the schema for SparkThrowable exceptions."""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ class MessageParametersEntry(google.protobuf.message.Message):
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ KEY_FIELD_NUMBER: builtins.int
+ VALUE_FIELD_NUMBER: builtins.int
+ key: builtins.str
+ value: builtins.str
+ def __init__(
+ self,
+ *,
+ key: builtins.str = ...,
+ value: builtins.str = ...,
+ ) -> None: ...
+ def ClearField(
+ self, field_name: typing_extensions.Literal["key", b"key",
"value", b"value"]
+ ) -> None: ...
+
+ ERROR_CLASS_FIELD_NUMBER: builtins.int
+ MESSAGE_PARAMETERS_FIELD_NUMBER: builtins.int
+ error_class: builtins.str
+ """Succinct, human-readable, unique, and consistent representation of
the error category."""
+ @property
+ def message_parameters(
+ self,
+ ) -> google.protobuf.internal.containers.ScalarMap[builtins.str,
builtins.str]:
+ """message parameters for the error framework."""
+ def __init__(
+ self,
+ *,
+ error_class: builtins.str | None = ...,
+ message_parameters: collections.abc.Mapping[builtins.str,
builtins.str] | None = ...,
+ ) -> None: ...
+ def HasField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_error_class", b"_error_class", "error_class", b"error_class"
+ ],
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_error_class",
+ b"_error_class",
+ "error_class",
+ b"error_class",
+ "message_parameters",
+ b"message_parameters",
+ ],
+ ) -> None: ...
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_error_class",
b"_error_class"]
+ ) -> typing_extensions.Literal["error_class"] | None: ...
+
class Error(google.protobuf.message.Message):
"""Error defines the schema for the representing exception."""
@@ -2853,6 +2911,7 @@ class
FetchErrorDetailsResponse(google.protobuf.message.Message):
MESSAGE_FIELD_NUMBER: builtins.int
STACK_TRACE_FIELD_NUMBER: builtins.int
CAUSE_IDX_FIELD_NUMBER: builtins.int
+ SPARK_THROWABLE_FIELD_NUMBER: builtins.int
@property
def error_type_hierarchy(
self,
@@ -2871,6 +2930,9 @@ class
FetchErrorDetailsResponse(google.protobuf.message.Message):
"""
cause_idx: builtins.int
"""The index of the cause error in errors."""
+ @property
+ def spark_throwable(self) ->
global___FetchErrorDetailsResponse.SparkThrowable:
+ """The structured data of a SparkThrowable exception."""
def __init__(
self,
*,
@@ -2881,11 +2943,19 @@ class
FetchErrorDetailsResponse(google.protobuf.message.Message):
]
| None = ...,
cause_idx: builtins.int | None = ...,
+ spark_throwable: global___FetchErrorDetailsResponse.SparkThrowable
| None = ...,
) -> None: ...
def HasField(
self,
field_name: typing_extensions.Literal[
- "_cause_idx", b"_cause_idx", "cause_idx", b"cause_idx"
+ "_cause_idx",
+ b"_cause_idx",
+ "_spark_throwable",
+ b"_spark_throwable",
+ "cause_idx",
+ b"cause_idx",
+ "spark_throwable",
+ b"spark_throwable",
],
) -> builtins.bool: ...
def ClearField(
@@ -2893,19 +2963,28 @@ class
FetchErrorDetailsResponse(google.protobuf.message.Message):
field_name: typing_extensions.Literal[
"_cause_idx",
b"_cause_idx",
+ "_spark_throwable",
+ b"_spark_throwable",
"cause_idx",
b"cause_idx",
"error_type_hierarchy",
b"error_type_hierarchy",
"message",
b"message",
+ "spark_throwable",
+ b"spark_throwable",
"stack_trace",
b"stack_trace",
],
) -> None: ...
+ @typing.overload
def WhichOneof(
self, oneof_group: typing_extensions.Literal["_cause_idx",
b"_cause_idx"]
) -> typing_extensions.Literal["cause_idx"] | None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_spark_throwable",
b"_spark_throwable"]
+ ) -> typing_extensions.Literal["spark_throwable"] | None: ...
ROOT_ERROR_IDX_FIELD_NUMBER: builtins.int
ERRORS_FIELD_NUMBER: builtins.int
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]