This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e720cce1813e [SPARK-45516][CONNECT] Include QueryContext in
SparkThrowable proto message
e720cce1813e is described below
commit e720cce1813e384847d4ef0bac48a202b2e39848
Author: Yihong He <[email protected]>
AuthorDate: Fri Oct 13 08:36:51 2023 +0900
[SPARK-45516][CONNECT] Include QueryContext in SparkThrowable proto message
### What changes were proposed in this pull request?
- Include QueryContext in SparkThrowable proto message
- Reconstruct QueryContext for SparkThrowable exceptions on the client side
### Why are the changes needed?
- Better integration with the error framework
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
`build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite"`
### Was this patch authored or co-authored using generative AI tooling?
Closes #43352 from heyihong/SPARK-45516.
Lead-authored-by: Yihong He <[email protected]>
Co-authored-by: Yihong He <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/sql/ClientE2ETestSuite.scala | 4 ++
.../src/main/protobuf/spark/connect/base.proto | 28 ++++++++-
.../connect/client/GrpcExceptionConverter.scala | 50 ++++++++++++----
.../spark/sql/connect/utils/ErrorUtils.scala | 11 ++++
python/pyspark/sql/connect/proto/base_pb2.py | 22 +++----
python/pyspark/sql/connect/proto/base_pb2.pyi | 69 +++++++++++++++++++++-
6 files changed, 159 insertions(+), 25 deletions(-)
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 6e0a04cf4eb4..04d284f2ec23 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -129,6 +129,10 @@ class ClientE2ETestSuite extends RemoteSparkSession with
SQLHelper with PrivateM
assert(!ex.messageParameters.isEmpty)
assert(ex.getSqlState != null)
assert(!ex.isInternalError)
+ assert(ex.getQueryContext.length == 1)
+ assert(ex.getQueryContext.head.startIndex() == 7)
+ assert(ex.getQueryContext.head.stopIndex() == 7)
+ assert(ex.getQueryContext.head.fragment() == "x")
assert(
ex.getStackTrace
.find(_.getClassName.contains("org.apache.spark.sql.catalyst.analysis.CheckAnalysis"))
diff --git
a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index 5b8858f40d26..273512272225 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -819,13 +819,39 @@ message FetchErrorDetailsResponse {
int32 line_number = 4;
}
+ // QueryContext defines the schema for the query context of a SparkThrowable.
+ // It helps users understand where the error occurs while executing queries.
+ message QueryContext {
+ // The object type of the query which throws the exception.
+ // If the exception is directly from the main query, it should be an empty
string.
+ // Otherwise, it should be the exact object type in upper case. For
example, a "VIEW".
+ string object_type = 1;
+
+ // The object name of the query which throws the exception.
+ // If the exception is directly from the main query, it should be an empty
string.
+ // Otherwise, it should be the object name. For example, a view name "V1".
+ string object_name = 2;
+
+ // The starting index in the query text which throws the exception. The
index starts from 0.
+ int32 start_index = 3;
+
+ // The stopping index in the query which throws the exception. The index
starts from 0.
+ int32 stop_index = 4;
+
+ // The corresponding fragment of the query which throws the exception.
+ string fragment = 5;
+ }
+
// SparkThrowable defines the schema for SparkThrowable exceptions.
message SparkThrowable {
// Succinct, human-readable, unique, and consistent representation of the
error category.
optional string error_class = 1;
- // message parameters for the error framework.
+ // The message parameters for the error framework.
map<string, string> message_parameters = 2;
+
+ // The query context of a SparkThrowable.
+ repeated QueryContext query_contexts = 3;
}
// Error defines the schema for the representing exception.
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index 85a523a13729..a9a6102a0fe6 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -27,7 +27,7 @@ import io.grpc.protobuf.StatusProto
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods
-import org.apache.spark.{SparkArithmeticException,
SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException,
SparkIllegalArgumentException, SparkNumberFormatException,
SparkRuntimeException, SparkUnsupportedOperationException,
SparkUpgradeException}
+import org.apache.spark.{QueryContext, SparkArithmeticException,
SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException,
SparkIllegalArgumentException, SparkNumberFormatException,
SparkRuntimeException, SparkUnsupportedOperationException,
SparkUpgradeException}
import org.apache.spark.connect.proto.{FetchErrorDetailsRequest,
FetchErrorDetailsResponse, UserContext}
import
org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub
import org.apache.spark.internal.Logging
@@ -167,10 +167,12 @@ private object GrpcExceptionConverter {
private case class ErrorParams(
message: String,
cause: Option[Throwable],
- // errorClass will only be set if the error is enriched and
SparkThrowable.
+ // errorClass will only be set if the error is both enriched and
SparkThrowable.
errorClass: Option[String],
- // messageParameters will only be set if the error is enriched and
SparkThrowable.
- messageParameters: Map[String, String])
+ // messageParameters will only be set if the error is both enriched and
SparkThrowable.
+ messageParameters: Map[String, String],
+ // queryContext will only be set if the error is both enriched and
SparkThrowable.
+ queryContext: Array[QueryContext])
private def errorConstructor[T <: Throwable: ClassTag](
throwableCtr: ErrorParams => T): (String, ErrorParams => Throwable) = {
@@ -192,13 +194,15 @@ private object GrpcExceptionConverter {
Origin(),
Origin(),
errorClass = params.errorClass,
- messageParameters = params.messageParameters)),
+ messageParameters = params.messageParameters,
+ queryContext = params.queryContext)),
errorConstructor(params =>
new AnalysisException(
params.message,
cause = params.cause,
errorClass = params.errorClass,
- messageParameters = params.messageParameters)),
+ messageParameters = params.messageParameters,
+ context = params.queryContext)),
errorConstructor(params => new
NamespaceAlreadyExistsException(params.message)),
errorConstructor(params => new TableAlreadyExistsException(params.message,
params.cause)),
errorConstructor(params => new
TempTableAlreadyExistsException(params.message, params.cause)),
@@ -221,7 +225,8 @@ private object GrpcExceptionConverter {
message = params.message,
cause = params.cause.orNull,
errorClass = params.errorClass,
- messageParameters = params.messageParameters)))
+ messageParameters = params.messageParameters,
+ context = params.queryContext)))
/**
* errorsToThrowable reconstructs the exception based on a list of protobuf
messages
@@ -247,16 +252,35 @@ private object GrpcExceptionConverter {
val causeOpt =
if (error.hasCauseIdx) Some(errorsToThrowable(error.getCauseIdx,
errors)) else None
+ val errorClass = if (error.hasSparkThrowable &&
error.getSparkThrowable.hasErrorClass) {
+ Some(error.getSparkThrowable.getErrorClass)
+ } else None
+
+ val messageParameters = if (error.hasSparkThrowable) {
+ error.getSparkThrowable.getMessageParametersMap.asScala.toMap
+ } else Map.empty[String, String]
+
+ val queryContext =
error.getSparkThrowable.getQueryContextsList.asScala.map { queryCtx =>
+ new QueryContext {
+ override def objectType(): String = queryCtx.getObjectType
+
+ override def objectName(): String = queryCtx.getObjectName
+
+ override def startIndex(): Int = queryCtx.getStartIndex
+
+ override def stopIndex(): Int = queryCtx.getStopIndex
+
+ override def fragment(): String = queryCtx.getFragment
+ }
+ }.toArray
+
val exception = constructor(
ErrorParams(
message = error.getMessage,
cause = causeOpt,
- errorClass = if (error.hasSparkThrowable &&
error.getSparkThrowable.hasErrorClass) {
- Some(error.getSparkThrowable.getErrorClass)
- } else None,
- messageParameters = if (error.hasSparkThrowable) {
- error.getSparkThrowable.getMessageParametersMap.asScala.toMap
- } else Map.empty))
+ errorClass = errorClass,
+ messageParameters = messageParameters,
+ queryContext = queryContext))
if (!error.getStackTraceList.isEmpty) {
exception.setStackTrace(error.getStackTraceList.asScala.toArray.map {
stackTraceElement =>
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
index 6e9058952360..875b5bd5b9cc 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
@@ -115,6 +115,17 @@ private[connect] object ErrorUtils extends Logging {
if (sparkThrowable.getErrorClass != null) {
sparkThrowableBuilder.setErrorClass(sparkThrowable.getErrorClass)
}
+ for (queryCtx <- sparkThrowable.getQueryContext) {
+ sparkThrowableBuilder.addQueryContexts(
+ FetchErrorDetailsResponse.QueryContext
+ .newBuilder()
+ .setObjectType(queryCtx.objectType())
+ .setObjectName(queryCtx.objectName())
+ .setStartIndex(queryCtx.startIndex())
+ .setStopIndex(queryCtx.stopIndex())
+ .setFragment(queryCtx.fragment())
+ .build())
+ }
sparkThrowableBuilder.putAllMessageParameters(sparkThrowable.getMessageParameters)
builder.setSparkThrowable(sparkThrowableBuilder.build())
case _ =>
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py
b/python/pyspark/sql/connect/proto/base_pb2.py
index 24d779fdf2e7..77c660c36c8d 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -37,7 +37,7 @@ from pyspark.sql.connect.proto import types_pb2 as
spark_dot_connect_dot_types__
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01
\x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02
\x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17
[...]
+
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01
\x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02
\x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17
[...]
)
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -202,15 +202,17 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_FETCHERRORDETAILSREQUEST._serialized_start = 11358
_FETCHERRORDETAILSREQUEST._serialized_end = 11559
_FETCHERRORDETAILSRESPONSE._serialized_start = 11562
- _FETCHERRORDETAILSRESPONSE._serialized_end = 12520
+ _FETCHERRORDETAILSRESPONSE._serialized_end = 12789
_FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_start = 11707
_FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_end = 11881
- _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_start = 11884
- _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_end = 12151
-
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_start
= 12067
-
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_end
= 12135
- _FETCHERRORDETAILSRESPONSE_ERROR._serialized_start = 12154
- _FETCHERRORDETAILSRESPONSE_ERROR._serialized_end = 12501
- _SPARKCONNECTSERVICE._serialized_start = 12523
- _SPARKCONNECTSERVICE._serialized_end = 13372
+ _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT._serialized_start = 11884
+ _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT._serialized_end = 12056
+ _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_start = 12059
+ _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_end = 12420
+
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_start
= 12336
+
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_end
= 12404
+ _FETCHERRORDETAILSRESPONSE_ERROR._serialized_start = 12423
+ _FETCHERRORDETAILSRESPONSE_ERROR._serialized_end = 12770
+ _SPARKCONNECTSERVICE._serialized_start = 12792
+ _SPARKCONNECTSERVICE._serialized_end = 13641
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi
b/python/pyspark/sql/connect/proto/base_pb2.pyi
index 9ecf0cade45a..b1b09b1a2c2c 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -2869,6 +2869,59 @@ class
FetchErrorDetailsResponse(google.protobuf.message.Message):
self, oneof_group: typing_extensions.Literal["_file_name",
b"_file_name"]
) -> typing_extensions.Literal["file_name"] | None: ...
+ class QueryContext(google.protobuf.message.Message):
+ """QueryContext defines the schema for the query context of a
SparkThrowable.
+ It helps users understand where the error occurs while executing
queries.
+ """
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ OBJECT_TYPE_FIELD_NUMBER: builtins.int
+ OBJECT_NAME_FIELD_NUMBER: builtins.int
+ START_INDEX_FIELD_NUMBER: builtins.int
+ STOP_INDEX_FIELD_NUMBER: builtins.int
+ FRAGMENT_FIELD_NUMBER: builtins.int
+ object_type: builtins.str
+ """The object type of the query which throws the exception.
+ If the exception is directly from the main query, it should be an
empty string.
+ Otherwise, it should be the exact object type in upper case. For
example, a "VIEW".
+ """
+ object_name: builtins.str
+ """The object name of the query which throws the exception.
+ If the exception is directly from the main query, it should be an
empty string.
+ Otherwise, it should be the object name. For example, a view name "V1".
+ """
+ start_index: builtins.int
+ """The starting index in the query text which throws the exception.
The index starts from 0."""
+ stop_index: builtins.int
+ """The stopping index in the query which throws the exception. The
index starts from 0."""
+ fragment: builtins.str
+ """The corresponding fragment of the query which throws the
exception."""
+ def __init__(
+ self,
+ *,
+ object_type: builtins.str = ...,
+ object_name: builtins.str = ...,
+ start_index: builtins.int = ...,
+ stop_index: builtins.int = ...,
+ fragment: builtins.str = ...,
+ ) -> None: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "fragment",
+ b"fragment",
+ "object_name",
+ b"object_name",
+ "object_type",
+ b"object_type",
+ "start_index",
+ b"start_index",
+ "stop_index",
+ b"stop_index",
+ ],
+ ) -> None: ...
+
class SparkThrowable(google.protobuf.message.Message):
"""SparkThrowable defines the schema for SparkThrowable exceptions."""
@@ -2893,18 +2946,30 @@ class
FetchErrorDetailsResponse(google.protobuf.message.Message):
ERROR_CLASS_FIELD_NUMBER: builtins.int
MESSAGE_PARAMETERS_FIELD_NUMBER: builtins.int
+ QUERY_CONTEXTS_FIELD_NUMBER: builtins.int
error_class: builtins.str
"""Succinct, human-readable, unique, and consistent representation of
the error category."""
@property
def message_parameters(
self,
) -> google.protobuf.internal.containers.ScalarMap[builtins.str,
builtins.str]:
- """message parameters for the error framework."""
+ """The message parameters for the error framework."""
+ @property
+ def query_contexts(
+ self,
+ ) ->
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+ global___FetchErrorDetailsResponse.QueryContext
+ ]:
+ """The query context of a SparkThrowable."""
def __init__(
self,
*,
error_class: builtins.str | None = ...,
message_parameters: collections.abc.Mapping[builtins.str,
builtins.str] | None = ...,
+ query_contexts: collections.abc.Iterable[
+ global___FetchErrorDetailsResponse.QueryContext
+ ]
+ | None = ...,
) -> None: ...
def HasField(
self,
@@ -2921,6 +2986,8 @@ class
FetchErrorDetailsResponse(google.protobuf.message.Message):
b"error_class",
"message_parameters",
b"message_parameters",
+ "query_contexts",
+ b"query_contexts",
],
) -> None: ...
def WhichOneof(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]