This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e720cce1813e [SPARK-45516][CONNECT] Include QueryContext in 
SparkThrowable proto message
e720cce1813e is described below

commit e720cce1813e384847d4ef0bac48a202b2e39848
Author: Yihong He <[email protected]>
AuthorDate: Fri Oct 13 08:36:51 2023 +0900

    [SPARK-45516][CONNECT] Include QueryContext in SparkThrowable proto message
    
    ### What changes were proposed in this pull request?
    
    - Include QueryContext in SparkThrowable proto message
    - Reconstruct QueryContext for SparkThrowable exceptions on the client side
    
    ### Why are the changes needed?
    
    - Better integration with the error framework
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    `build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite"`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Closes #43352 from heyihong/SPARK-45516.
    
    Lead-authored-by: Yihong He <[email protected]>
    Co-authored-by: Yihong He <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  |  4 ++
 .../src/main/protobuf/spark/connect/base.proto     | 28 ++++++++-
 .../connect/client/GrpcExceptionConverter.scala    | 50 ++++++++++++----
 .../spark/sql/connect/utils/ErrorUtils.scala       | 11 ++++
 python/pyspark/sql/connect/proto/base_pb2.py       | 22 +++----
 python/pyspark/sql/connect/proto/base_pb2.pyi      | 69 +++++++++++++++++++++-
 6 files changed, 159 insertions(+), 25 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 6e0a04cf4eb4..04d284f2ec23 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -129,6 +129,10 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper with PrivateM
         assert(!ex.messageParameters.isEmpty)
         assert(ex.getSqlState != null)
         assert(!ex.isInternalError)
+        assert(ex.getQueryContext.length == 1)
+        assert(ex.getQueryContext.head.startIndex() == 7)
+        assert(ex.getQueryContext.head.stopIndex() == 7)
+        assert(ex.getQueryContext.head.fragment() == "x")
         assert(
           ex.getStackTrace
             
.find(_.getClassName.contains("org.apache.spark.sql.catalyst.analysis.CheckAnalysis"))
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/base.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index 5b8858f40d26..273512272225 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -819,13 +819,39 @@ message FetchErrorDetailsResponse {
     int32 line_number = 4;
   }
 
+  // QueryContext defines the schema for the query context of a SparkThrowable.
+  // It helps users understand where the error occurs while executing queries.
+  message QueryContext {
+    // The object type of the query which throws the exception.
+    // If the exception is directly from the main query, it should be an empty 
string.
+    // Otherwise, it should be the exact object type in upper case. For 
example, a "VIEW".
+    string object_type = 1;
+
+    // The object name of the query which throws the exception.
+    // If the exception is directly from the main query, it should be an empty 
string.
+    // Otherwise, it should be the object name. For example, a view name "V1".
+    string object_name = 2;
+
+    // The starting index in the query text which throws the exception. The 
index starts from 0.
+    int32 start_index = 3;
+
+    // The stopping index in the query which throws the exception. The index 
starts from 0.
+    int32 stop_index = 4;
+
+    // The corresponding fragment of the query which throws the exception.
+    string fragment = 5;
+  }
+
   // SparkThrowable defines the schema for SparkThrowable exceptions.
   message SparkThrowable {
     // Succinct, human-readable, unique, and consistent representation of the 
error category.
     optional string error_class = 1;
 
-    // message parameters for the error framework.
+    // The message parameters for the error framework.
     map<string, string> message_parameters = 2;
+
+    // The query context of a SparkThrowable.
+    repeated QueryContext query_contexts = 3;
   }
 
   // Error defines the schema for the representing exception.
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index 85a523a13729..a9a6102a0fe6 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -27,7 +27,7 @@ import io.grpc.protobuf.StatusProto
 import org.json4s.DefaultFormats
 import org.json4s.jackson.JsonMethods
 
-import org.apache.spark.{SparkArithmeticException, 
SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, 
SparkIllegalArgumentException, SparkNumberFormatException, 
SparkRuntimeException, SparkUnsupportedOperationException, 
SparkUpgradeException}
+import org.apache.spark.{QueryContext, SparkArithmeticException, 
SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, 
SparkIllegalArgumentException, SparkNumberFormatException, 
SparkRuntimeException, SparkUnsupportedOperationException, 
SparkUpgradeException}
 import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, 
FetchErrorDetailsResponse, UserContext}
 import 
org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub
 import org.apache.spark.internal.Logging
@@ -167,10 +167,12 @@ private object GrpcExceptionConverter {
   private case class ErrorParams(
       message: String,
       cause: Option[Throwable],
-      // errorClass will only be set if the error is enriched and 
SparkThrowable.
+      // errorClass will only be set if the error is both enriched and 
SparkThrowable.
       errorClass: Option[String],
-      // messageParameters will only be set if the error is enriched and 
SparkThrowable.
-      messageParameters: Map[String, String])
+      // messageParameters will only be set if the error is both enriched and 
SparkThrowable.
+      messageParameters: Map[String, String],
+      // queryContext will only be set if the error is both enriched and 
SparkThrowable.
+      queryContext: Array[QueryContext])
 
   private def errorConstructor[T <: Throwable: ClassTag](
       throwableCtr: ErrorParams => T): (String, ErrorParams => Throwable) = {
@@ -192,13 +194,15 @@ private object GrpcExceptionConverter {
         Origin(),
         Origin(),
         errorClass = params.errorClass,
-        messageParameters = params.messageParameters)),
+        messageParameters = params.messageParameters,
+        queryContext = params.queryContext)),
     errorConstructor(params =>
       new AnalysisException(
         params.message,
         cause = params.cause,
         errorClass = params.errorClass,
-        messageParameters = params.messageParameters)),
+        messageParameters = params.messageParameters,
+        context = params.queryContext)),
     errorConstructor(params => new 
NamespaceAlreadyExistsException(params.message)),
     errorConstructor(params => new TableAlreadyExistsException(params.message, 
params.cause)),
     errorConstructor(params => new 
TempTableAlreadyExistsException(params.message, params.cause)),
@@ -221,7 +225,8 @@ private object GrpcExceptionConverter {
         message = params.message,
         cause = params.cause.orNull,
         errorClass = params.errorClass,
-        messageParameters = params.messageParameters)))
+        messageParameters = params.messageParameters,
+        context = params.queryContext)))
 
   /**
    * errorsToThrowable reconstructs the exception based on a list of protobuf 
messages
@@ -247,16 +252,35 @@ private object GrpcExceptionConverter {
     val causeOpt =
       if (error.hasCauseIdx) Some(errorsToThrowable(error.getCauseIdx, 
errors)) else None
 
+    val errorClass = if (error.hasSparkThrowable && 
error.getSparkThrowable.hasErrorClass) {
+      Some(error.getSparkThrowable.getErrorClass)
+    } else None
+
+    val messageParameters = if (error.hasSparkThrowable) {
+      error.getSparkThrowable.getMessageParametersMap.asScala.toMap
+    } else Map.empty[String, String]
+
+    val queryContext = 
error.getSparkThrowable.getQueryContextsList.asScala.map { queryCtx =>
+      new QueryContext {
+        override def objectType(): String = queryCtx.getObjectType
+
+        override def objectName(): String = queryCtx.getObjectName
+
+        override def startIndex(): Int = queryCtx.getStartIndex
+
+        override def stopIndex(): Int = queryCtx.getStopIndex
+
+        override def fragment(): String = queryCtx.getFragment
+      }
+    }.toArray
+
     val exception = constructor(
       ErrorParams(
         message = error.getMessage,
         cause = causeOpt,
-        errorClass = if (error.hasSparkThrowable && 
error.getSparkThrowable.hasErrorClass) {
-          Some(error.getSparkThrowable.getErrorClass)
-        } else None,
-        messageParameters = if (error.hasSparkThrowable) {
-          error.getSparkThrowable.getMessageParametersMap.asScala.toMap
-        } else Map.empty))
+        errorClass = errorClass,
+        messageParameters = messageParameters,
+        queryContext = queryContext))
 
     if (!error.getStackTraceList.isEmpty) {
       exception.setStackTrace(error.getStackTraceList.asScala.toArray.map { 
stackTraceElement =>
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
index 6e9058952360..875b5bd5b9cc 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
@@ -115,6 +115,17 @@ private[connect] object ErrorUtils extends Logging {
           if (sparkThrowable.getErrorClass != null) {
             sparkThrowableBuilder.setErrorClass(sparkThrowable.getErrorClass)
           }
+          for (queryCtx <- sparkThrowable.getQueryContext) {
+            sparkThrowableBuilder.addQueryContexts(
+              FetchErrorDetailsResponse.QueryContext
+                .newBuilder()
+                .setObjectType(queryCtx.objectType())
+                .setObjectName(queryCtx.objectName())
+                .setStartIndex(queryCtx.startIndex())
+                .setStopIndex(queryCtx.stopIndex())
+                .setFragment(queryCtx.fragment())
+                .build())
+          }
           
sparkThrowableBuilder.putAllMessageParameters(sparkThrowable.getMessageParameters)
           builder.setSparkThrowable(sparkThrowableBuilder.build())
         case _ =>
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py 
b/python/pyspark/sql/connect/proto/base_pb2.py
index 24d779fdf2e7..77c660c36c8d 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -37,7 +37,7 @@ from pyspark.sql.connect.proto import types_pb2 as 
spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01
 
\x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02
 
\x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17
 [...]
+    
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01
 
\x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02
 
\x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17
 [...]
 )
 
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -202,15 +202,17 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _FETCHERRORDETAILSREQUEST._serialized_start = 11358
     _FETCHERRORDETAILSREQUEST._serialized_end = 11559
     _FETCHERRORDETAILSRESPONSE._serialized_start = 11562
-    _FETCHERRORDETAILSRESPONSE._serialized_end = 12520
+    _FETCHERRORDETAILSRESPONSE._serialized_end = 12789
     _FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_start = 11707
     _FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_end = 11881
-    _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_start = 11884
-    _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_end = 12151
-    
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_start
 = 12067
-    
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_end
 = 12135
-    _FETCHERRORDETAILSRESPONSE_ERROR._serialized_start = 12154
-    _FETCHERRORDETAILSRESPONSE_ERROR._serialized_end = 12501
-    _SPARKCONNECTSERVICE._serialized_start = 12523
-    _SPARKCONNECTSERVICE._serialized_end = 13372
+    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT._serialized_start = 11884
+    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT._serialized_end = 12056
+    _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_start = 12059
+    _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_end = 12420
+    
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_start
 = 12336
+    
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_end
 = 12404
+    _FETCHERRORDETAILSRESPONSE_ERROR._serialized_start = 12423
+    _FETCHERRORDETAILSRESPONSE_ERROR._serialized_end = 12770
+    _SPARKCONNECTSERVICE._serialized_start = 12792
+    _SPARKCONNECTSERVICE._serialized_end = 13641
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi 
b/python/pyspark/sql/connect/proto/base_pb2.pyi
index 9ecf0cade45a..b1b09b1a2c2c 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -2869,6 +2869,59 @@ class 
FetchErrorDetailsResponse(google.protobuf.message.Message):
             self, oneof_group: typing_extensions.Literal["_file_name", 
b"_file_name"]
         ) -> typing_extensions.Literal["file_name"] | None: ...
 
+    class QueryContext(google.protobuf.message.Message):
+        """QueryContext defines the schema for the query context of a 
SparkThrowable.
+        It helps users understand where the error occurs while executing 
queries.
+        """
+
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        OBJECT_TYPE_FIELD_NUMBER: builtins.int
+        OBJECT_NAME_FIELD_NUMBER: builtins.int
+        START_INDEX_FIELD_NUMBER: builtins.int
+        STOP_INDEX_FIELD_NUMBER: builtins.int
+        FRAGMENT_FIELD_NUMBER: builtins.int
+        object_type: builtins.str
+        """The object type of the query which throws the exception.
+        If the exception is directly from the main query, it should be an 
empty string.
+        Otherwise, it should be the exact object type in upper case. For 
example, a "VIEW".
+        """
+        object_name: builtins.str
+        """The object name of the query which throws the exception.
+        If the exception is directly from the main query, it should be an 
empty string.
+        Otherwise, it should be the object name. For example, a view name "V1".
+        """
+        start_index: builtins.int
+        """The starting index in the query text which throws the exception. 
The index starts from 0."""
+        stop_index: builtins.int
+        """The stopping index in the query which throws the exception. The 
index starts from 0."""
+        fragment: builtins.str
+        """The corresponding fragment of the query which throws the 
exception."""
+        def __init__(
+            self,
+            *,
+            object_type: builtins.str = ...,
+            object_name: builtins.str = ...,
+            start_index: builtins.int = ...,
+            stop_index: builtins.int = ...,
+            fragment: builtins.str = ...,
+        ) -> None: ...
+        def ClearField(
+            self,
+            field_name: typing_extensions.Literal[
+                "fragment",
+                b"fragment",
+                "object_name",
+                b"object_name",
+                "object_type",
+                b"object_type",
+                "start_index",
+                b"start_index",
+                "stop_index",
+                b"stop_index",
+            ],
+        ) -> None: ...
+
     class SparkThrowable(google.protobuf.message.Message):
         """SparkThrowable defines the schema for SparkThrowable exceptions."""
 
@@ -2893,18 +2946,30 @@ class 
FetchErrorDetailsResponse(google.protobuf.message.Message):
 
         ERROR_CLASS_FIELD_NUMBER: builtins.int
         MESSAGE_PARAMETERS_FIELD_NUMBER: builtins.int
+        QUERY_CONTEXTS_FIELD_NUMBER: builtins.int
         error_class: builtins.str
         """Succinct, human-readable, unique, and consistent representation of 
the error category."""
         @property
         def message_parameters(
             self,
         ) -> google.protobuf.internal.containers.ScalarMap[builtins.str, 
builtins.str]:
-            """message parameters for the error framework."""
+            """The message parameters for the error framework."""
+        @property
+        def query_contexts(
+            self,
+        ) -> 
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+            global___FetchErrorDetailsResponse.QueryContext
+        ]:
+            """The query context of a SparkThrowable."""
         def __init__(
             self,
             *,
             error_class: builtins.str | None = ...,
             message_parameters: collections.abc.Mapping[builtins.str, 
builtins.str] | None = ...,
+            query_contexts: collections.abc.Iterable[
+                global___FetchErrorDetailsResponse.QueryContext
+            ]
+            | None = ...,
         ) -> None: ...
         def HasField(
             self,
@@ -2921,6 +2986,8 @@ class 
FetchErrorDetailsResponse(google.protobuf.message.Message):
                 b"error_class",
                 "message_parameters",
                 b"message_parameters",
+                "query_contexts",
+                b"query_contexts",
             ],
         ) -> None: ...
         def WhichOneof(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to