This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8acdc7a83803 [SPARK-54887][CONNECT] Always set a sql state in spark
connect client
8acdc7a83803 is described below
commit 8acdc7a83803145cd0a40f631223ef6055513ec4
Author: Garland Zhang <[email protected]>
AuthorDate: Tue Jan 27 09:51:00 2026 +0100
[SPARK-54887][CONNECT] Always set a sql state in spark connect client
### What changes were proposed in this pull request?
Ensure there exists an error class in every error thrown from spark connect
client by providing fallback error class and fallback sql state.
```
*
+--------------+--------------+------------------------------------------------+
* | errorClass | sqlState | Description
|
*
+--------------+--------------+------------------------------------------------+
* | null | null | Set errorClass to
|
* | | |
CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE. |
* | | | sqlState will be read from JSON file
as "XXKCM"|
*
+--------------+--------------+------------------------------------------------+
* | null | not null | Do nothing since sqlState is already
provided. |
*
+--------------+--------------+------------------------------------------------+
* | not null | null | Try to read sqlState from error class
JSON |
* | | | file using errorClass. If not found,
the |
* | | | client is out of date so fallback to
"XXKCM". |
*
+--------------+--------------+------------------------------------------------+
* | not null | not null | Do nothing since the error is fully
|
* | | | constructed.
|
*
+--------------+--------------+------------------------------------------------+
```
Expanded some exception class definitions to also accept more parameters
like sqlState
Note: This PR introduces a behavior change for Non spark exceptions thrown
from GRPC layer as all exceptions thrown from GRPC layer will now be some form
of SparkThrowable (this ensures we have sql state defined).
### Why are the changes needed?
At the moment there are some cases where an errorClass is not thrown. This
goes against the goal of promoting a definition for providing a sql state for
every exception in better sql error messages.
### Does this PR introduce _any_ user-facing change?
Yes. A previous error message that did not throw an error class now does.
### How was this patch tested?
Unit testing
### Was this patch authored or co-authored using generative AI tooling?
Yes
Closes #52589 from garlandz-db/SPARK-53883.
Authored-by: Garland Zhang <[email protected]>
Signed-off-by: Herman van Hövell <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 6 +
.../src/main/resources/error/error-states.json | 6 +
.../scala/org/apache/spark/SparkException.scala | 121 ++++++++++++-
.../sql/streaming/StreamingQueryException.scala | 17 ++
.../org/apache/spark/sql/AnalysisException.scala | 17 ++
.../spark/sql/connect/ClientE2ETestSuite.scala | 4 +-
.../connect/client/SparkConnectClientSuite.scala | 7 +-
.../connect/client/GrpcExceptionConverter.scala | 194 ++++++++++++++++-----
8 files changed, 323 insertions(+), 49 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 430cd1ea2000..630bad76f6cd 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -912,6 +912,12 @@
},
"sqlState" : "56K00"
},
+ "CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE" : {
+ "message" : [
+ "Unidentified Error: <message>"
+ ],
+ "sqlState" : "XXKCM"
+ },
"CONNECT_INVALID_PLAN" : {
"message" : [
"The Spark Connect plan is invalid."
diff --git a/common/utils/src/main/resources/error/error-states.json
b/common/utils/src/main/resources/error/error-states.json
index 4fddbeed4090..7b3050bd2266 100644
--- a/common/utils/src/main/resources/error/error-states.json
+++ b/common/utils/src/main/resources/error/error-states.json
@@ -7524,6 +7524,12 @@
"standard": "N",
"usedBy": ["PostgreSQL", "Redshift"]
},
+ "XXKCM": {
+ "description": "Connect Client - Unexpected missing SQL state",
+ "origin": "Spark",
+ "standard": "N",
+ "usedBy": ["Spark"]
+ },
"XXKD0": {
"description": "Analysis - Bad plan",
"origin": "Databricks",
diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala
b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
index fc4b3e50be4b..9566826008f2 100644
--- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala
+++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
@@ -198,6 +198,20 @@ private[spark] class SparkUpgradeException private(
)
}
+ def this(
+ errorClass: String,
+ messageParameters: Map[String, String],
+ cause: Throwable,
+ sqlState: Option[String]) = {
+ this(
+ SparkThrowableHelper.getMessage(errorClass, messageParameters),
+ Option(cause),
+ Option(errorClass),
+ messageParameters,
+ sqlState
+ )
+ }
+
override def getMessageParameters: java.util.Map[String, String] =
messageParameters.asJava
override def getCondition: String = errorClass.orNull
@@ -229,6 +243,20 @@ private[spark] class SparkArithmeticException private(
)
}
+ def this(
+ errorClass: String,
+ messageParameters: Map[String, String],
+ context: Array[QueryContext],
+ sqlState: Option[String]) = {
+ this(
+ SparkThrowableHelper.getMessage(errorClass, messageParameters, ""),
+ Option(errorClass),
+ messageParameters,
+ context,
+ sqlState
+ )
+ }
+
def this(
errorClass: String,
messageParameters: Map[String, String],
@@ -263,6 +291,18 @@ private[spark] class SparkUnsupportedOperationException
private(
)
}
+ def this(
+ errorClass: String,
+ messageParameters: Map[String, String],
+ sqlState: Option[String]) = {
+ this(
+ SparkThrowableHelper.getMessage(errorClass, messageParameters),
+ Option(errorClass),
+ messageParameters,
+ sqlState
+ )
+ }
+
def this(
errorClass: String,
messageParameters: java.util.Map[String, String]) =
@@ -273,7 +313,8 @@ private[spark] class SparkUnsupportedOperationException
private(
this(
SparkThrowableHelper.getMessage(errorClass, Map.empty[String, String]),
Option(errorClass),
- Map.empty)
+ Map.empty,
+ None)
}
override def getMessageParameters: java.util.Map[String, String] =
messageParameters.asJava
@@ -376,6 +417,23 @@ private[spark] class SparkDateTimeException private(
)
}
+ def this(
+ errorClass: String,
+ messageParameters: Map[String, String],
+ context: Array[QueryContext],
+ summary: String,
+ cause: Option[Throwable],
+ sqlState: Option[String]) = {
+ this(
+ SparkThrowableHelper.getMessage(errorClass, messageParameters, summary),
+ Option(errorClass),
+ messageParameters,
+ context,
+ cause.orElse(None),
+ sqlState
+ )
+ }
+
def this(
errorClass: String,
messageParameters: Map[String, String],
@@ -433,6 +491,20 @@ private[spark] class SparkNumberFormatException private(
)
}
+ def this(
+ errorClass: String,
+ messageParameters: Map[String, String],
+ context: Array[QueryContext],
+ sqlState: Option[String]) = {
+ this(
+ SparkThrowableHelper.getMessage(errorClass, messageParameters, ""),
+ Option(errorClass),
+ messageParameters,
+ context,
+ sqlState
+ )
+ }
+
def this(
errorClass: String,
messageParameters: Map[String, String],
@@ -475,6 +547,23 @@ private[spark] class SparkIllegalArgumentException private(
)
}
+ def this(
+ errorClass: String,
+ messageParameters: Map[String, String],
+ context: Array[QueryContext],
+ summary: String,
+ cause: Throwable,
+ sqlState: Option[String]) = {
+ this(
+ SparkThrowableHelper.getMessage(errorClass, messageParameters, summary),
+ Option(cause),
+ Option(errorClass),
+ messageParameters,
+ context,
+ sqlState
+ )
+ }
+
def this(
errorClass: String,
messageParameters: Map[String, String],
@@ -550,6 +639,22 @@ private[spark] class SparkRuntimeException private(
)
}
+ def this(
+ errorClass: String,
+ messageParameters: Map[String, String],
+ cause: Throwable,
+ context: Array[QueryContext],
+ sqlState: Option[String]) = {
+ this(
+ SparkThrowableHelper.getMessage(errorClass, messageParameters, ""),
+ Option(cause),
+ Option(errorClass),
+ messageParameters,
+ context,
+ sqlState
+ )
+ }
+
override def getMessageParameters: java.util.Map[String, String] =
messageParameters.asJava
override def getCondition: String = errorClass.orNull
@@ -658,6 +763,20 @@ private[spark] class SparkArrayIndexOutOfBoundsException
private(
)
}
+ def this(
+ errorClass: String,
+ messageParameters: Map[String, String],
+ context: Array[QueryContext],
+ sqlState: Option[String]) = {
+ this(
+ SparkThrowableHelper.getMessage(errorClass, messageParameters, ""),
+ Option(errorClass),
+ messageParameters,
+ context,
+ sqlState
+ )
+ }
+
def this(
errorClass: String,
messageParameters: Map[String, String],
diff --git
a/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index b30260b8d6cc..3abff53f3265 100644
---
a/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++
b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -58,6 +58,23 @@ class StreamingQueryException private[sql](
messageParameters)
}
+ private[spark] def this(
+ message: String,
+ cause: Throwable,
+ errorClass: String,
+ messageParameters: Map[String, String],
+ sqlState: Option[String]) = {
+ this(
+ messageParameters.getOrElse("queryDebugString", ""),
+ message,
+ cause,
+ messageParameters.getOrElse("startOffset", ""),
+ messageParameters.getOrElse("endOffset", ""),
+ errorClass,
+ messageParameters,
+ sqlState)
+ }
+
def this(
queryDebugString: String,
cause: Throwable,
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/AnalysisException.scala
b/sql/api/src/main/scala/org/apache/spark/sql/AnalysisException.scala
index ea45f20a0593..12eebf866a1f 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/AnalysisException.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/AnalysisException.scala
@@ -138,6 +138,23 @@ class AnalysisException protected (
context = origin.getQueryContext,
cause = cause)
+ def this(
+ message: String,
+ cause: Option[Throwable],
+ errorClass: Option[String],
+ messageParameters: Map[String, String],
+ context: Array[QueryContext],
+ sqlState: Option[String]) =
+ this(
+ message = message,
+ line = None,
+ startPosition = None,
+ cause = cause,
+ errorClass = errorClass,
+ messageParameters = messageParameters,
+ context = context,
+ sqlState = sqlState)
+
def copy(
message: String,
line: Option[Int],
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
index b9f72badd45f..8161e327569a 100644
---
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
@@ -132,8 +132,8 @@ class ClientE2ETestSuite
assert(ex.getCause.isInstanceOf[SparkException])
val cause = ex.getCause.asInstanceOf[SparkException]
- assert(cause.getCondition == null)
- assert(cause.getMessageParameters.isEmpty)
+ assert(cause.getCondition ==
"CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE")
+ assert(cause.getMessageParameters.asScala == Map("message" ->
"test".repeat(10000)))
assert(cause.getMessage.contains("test".repeat(10000)))
}
}
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index 20d1187d2a8f..98fc5dd78ee4 100644
---
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -227,13 +227,15 @@ class SparkConnectClientSuite extends ConnectFunSuite {
cause = None,
errorClass = Some("DUPLICATE_KEY"),
messageParameters = Map("keyColumn" -> "`abc`"),
- queryContext = Array.empty)
+ queryContext = Array.empty,
+ sqlState = None)
val error = constructor(testParams).asInstanceOf[Throwable with
SparkThrowable]
assert(error.getMessage.contains(testParams.message))
assert(error.getCause == null)
assert(error.getCondition == testParams.errorClass.get)
assert(error.getMessageParameters.asScala ==
testParams.messageParameters)
assert(error.getQueryContext.isEmpty)
+ assert(error.getSqlState == "23505")
}
}
@@ -244,7 +246,8 @@ class SparkConnectClientSuite extends ConnectFunSuite {
cause = None,
errorClass = None,
messageParameters = Map.empty,
- queryContext = Array.empty)
+ queryContext = Array.empty,
+ sqlState = None)
val error = constructor(testParams)
assert(error.getMessage.contains(testParams.message))
assert(error.getCause == null)
diff --git
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index 7e0b0949fcf1..5bcf4c9acd40 100644
---
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -27,7 +27,7 @@ import io.grpc.protobuf.StatusProto
import org.json4s.{DefaultFormats, Formats}
import org.json4s.jackson.JsonMethods
-import org.apache.spark.{QueryContext, QueryContextType,
SparkArithmeticException, SparkArrayIndexOutOfBoundsException,
SparkDateTimeException, SparkException, SparkIllegalArgumentException,
SparkNumberFormatException, SparkRuntimeException,
SparkUnsupportedOperationException, SparkUpgradeException}
+import org.apache.spark.{QueryContext, QueryContextType,
SparkArithmeticException, SparkArrayIndexOutOfBoundsException,
SparkDateTimeException, SparkException, SparkIllegalArgumentException,
SparkNumberFormatException, SparkRuntimeException, SparkThrowableHelper,
SparkUnsupportedOperationException, SparkUpgradeException}
import org.apache.spark.connect.proto.{FetchErrorDetailsRequest,
FetchErrorDetailsResponse, SparkConnectServiceGrpc, UserContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
@@ -160,12 +160,45 @@ private[client] class GrpcExceptionConverter(channel:
ManagedChannel) extends Lo
}
// If no ErrorInfo is found, create a SparkException based on the
StatusRuntimeException.
- new SparkException(ex.toString, ex.getCause)
+ new SparkException(
+ message = ex.toString,
+ cause = ex.getCause,
+ errorClass = Some("CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE"),
+ messageParameters = Map("message" -> ex.toString),
+ context = Array.empty)
}
}
private[client] object GrpcExceptionConverter {
+ /**
+ * Error Class and SQL State Resolution Logic
+ * ===========================================
+ *
+ * When constructing exceptions from server responses, the errorClass and
sqlState
+ * may or may not be present. The following table describes how they are
resolved:
+ *
+ *
+--------------+--------------+------------------------------------------------+
+ * | errorClass | sqlState | Description
|
+ *
+--------------+--------------+------------------------------------------------+
+ * | null | null | Set errorClass to
|
+ * | | |
CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE. |
+ * | | | sqlState will be read from JSON file as
"XXKCM"|
+ *
+--------------+--------------+------------------------------------------------+
+ * | null | not null | Do nothing since sqlState is already
provided. |
+ *
+--------------+--------------+------------------------------------------------+
+ * | not null | null | Try to read sqlState from error class
JSON |
+ * | | | file using errorClass. If not found, the
|
+ * | | | client is out of date so fallback to
"XXKCM". |
+ *
+--------------+--------------+------------------------------------------------+
+ * | not null | not null | Do nothing since the error is fully
|
+ * | | | constructed.
|
+ *
+--------------+--------------+------------------------------------------------+
+ *
+ * Note: "XXKCM" is the fallback SQL state used when the client cannot read
the
+ * sqlState from the JSON error definitions file (e.g., client is out of
date).
+ */
+
private[client] case class ErrorParams(
message: String,
cause: Option[Throwable],
@@ -174,7 +207,45 @@ private[client] object GrpcExceptionConverter {
// messageParameters will only be set if the error is both enriched and
SparkThrowable.
messageParameters: Map[String, String],
// queryContext will only be set if the error is both enriched and
SparkThrowable.
- queryContext: Array[QueryContext])
+ queryContext: Array[QueryContext],
+ // sqlState will be set if the server provided it (from metadata or
FetchErrorDetails).
+ sqlState: Option[String])
+
+ /**
+ * Resolves errorClass and sqlState based on the above table
+ */
+ private def resolveParams(params: ErrorParams): ErrorParams = {
+ (params.errorClass, params.sqlState) match {
+ case (None, None) =>
+ val fallbackErrorClass = "CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE"
+ val resolvedSqlState =
Option(SparkThrowableHelper.getSqlState(fallbackErrorClass))
+ .orElse(Some("XXKCM"))
+ params.copy(errorClass = Some(fallbackErrorClass), sqlState =
resolvedSqlState)
+ case (None, Some(_)) =>
+ params // Keep as is, sqlState is all we care about
+ case (Some(ec), None) =>
+ val resolvedSqlState = Option(SparkThrowableHelper.getSqlState(ec))
+ .orElse(Some("XXKCM"))
+ params.copy(sqlState = resolvedSqlState)
+ case (Some(_), Some(_)) =>
+ params // Keep as is, already constructed
+ }
+ }
+
+ /**
+ * Returns the errorClass from resolved params. May return null if
errorClass is None and
+ * sqlState is defined (sqlState is all we care about in that case).
+ */
+ private def getErrorClassOrFallback(params: ErrorParams): String = {
+ resolveParams(params).errorClass.orNull
+ }
+
+ /**
+ * Returns the sqlState from resolved params.
+ */
+ private def getSqlStateOrFallback(params: ErrorParams): Option[String] = {
+ resolveParams(params).sqlState
+ }
private def errorConstructor[T <: Throwable: ClassTag](
throwableCtr: ErrorParams => T): (String, ErrorParams => Throwable) = {
@@ -187,91 +258,112 @@ private[client] object GrpcExceptionConverter {
new StreamingQueryException(
params.message,
params.cause.orNull,
- params.errorClass.orNull,
- params.messageParameters)),
+ getErrorClassOrFallback(params),
+ errorParamsToMessageParameters(params),
+ getSqlStateOrFallback(params))),
errorConstructor(params =>
new ParseException(
None,
Origin(),
- errorClass = params.errorClass.orNull,
- messageParameters = params.messageParameters,
+ errorClass = getErrorClassOrFallback(params),
+ messageParameters = errorParamsToMessageParameters(params),
queryContext = params.queryContext)),
errorConstructor(params =>
new AnalysisException(
- errorClass = params.errorClass.getOrElse("_LEGACY_ERROR_TEMP_3100"),
- messageParameters = errorParamsToMessageParameters(params),
+ message = params.message,
cause = params.cause,
- context = params.queryContext)),
+ errorClass = resolveParams(params).errorClass,
+ messageParameters = errorParamsToMessageParameters(params),
+ context = params.queryContext,
+ sqlState = getSqlStateOrFallback(params))),
errorConstructor(params =>
- new NamespaceAlreadyExistsException(params.errorClass.orNull,
params.messageParameters)),
+ new NamespaceAlreadyExistsException(
+ getErrorClassOrFallback(params),
+ errorParamsToMessageParameters(params))),
errorConstructor(params =>
new TableAlreadyExistsException(
- params.errorClass.orNull,
- params.messageParameters,
+ getErrorClassOrFallback(params),
+ errorParamsToMessageParameters(params),
params.cause)),
errorConstructor(params =>
new TempTableAlreadyExistsException(
- params.errorClass.orNull,
- params.messageParameters,
+ getErrorClassOrFallback(params),
+ errorParamsToMessageParameters(params),
params.cause)),
errorConstructor(params =>
new NoSuchDatabaseException(
- params.errorClass.orNull,
- params.messageParameters,
+ getErrorClassOrFallback(params),
+ errorParamsToMessageParameters(params),
params.cause)),
errorConstructor(params =>
- new NoSuchNamespaceException(params.errorClass.orNull,
params.messageParameters)),
+ new NoSuchNamespaceException(
+ getErrorClassOrFallback(params),
+ errorParamsToMessageParameters(params))),
errorConstructor(params =>
- new NoSuchTableException(params.errorClass.orNull,
params.messageParameters, params.cause)),
+ new NoSuchTableException(
+ getErrorClassOrFallback(params),
+ errorParamsToMessageParameters(params),
+ params.cause)),
errorConstructor[NumberFormatException](params =>
new SparkNumberFormatException(
errorClass = params.errorClass.getOrElse("_LEGACY_ERROR_TEMP_3104"),
messageParameters = errorParamsToMessageParameters(params),
- params.queryContext)),
+ params.queryContext,
+ getSqlStateOrFallback(params))),
errorConstructor[IllegalArgumentException](params =>
new SparkIllegalArgumentException(
errorClass = params.errorClass.getOrElse("_LEGACY_ERROR_TEMP_3105"),
messageParameters = errorParamsToMessageParameters(params),
params.queryContext,
summary = "",
- cause = params.cause.orNull)),
+ cause = params.cause.orNull,
+ getSqlStateOrFallback(params))),
errorConstructor[ArithmeticException](params =>
new SparkArithmeticException(
errorClass = params.errorClass.getOrElse("_LEGACY_ERROR_TEMP_3106"),
messageParameters = errorParamsToMessageParameters(params),
- params.queryContext)),
+ params.queryContext,
+ getSqlStateOrFallback(params))),
errorConstructor[UnsupportedOperationException](params =>
new SparkUnsupportedOperationException(
errorClass = params.errorClass.getOrElse("_LEGACY_ERROR_TEMP_3107"),
- messageParameters = errorParamsToMessageParameters(params))),
+ messageParameters = errorParamsToMessageParameters(params),
+ getSqlStateOrFallback(params))),
errorConstructor[ArrayIndexOutOfBoundsException](params =>
new SparkArrayIndexOutOfBoundsException(
errorClass = params.errorClass.getOrElse("_LEGACY_ERROR_TEMP_3108"),
messageParameters = errorParamsToMessageParameters(params),
- params.queryContext)),
+ params.queryContext,
+ getSqlStateOrFallback(params))),
errorConstructor[DateTimeException](params =>
new SparkDateTimeException(
errorClass = params.errorClass.getOrElse("_LEGACY_ERROR_TEMP_3109"),
messageParameters = errorParamsToMessageParameters(params),
- params.queryContext)),
+ params.queryContext,
+ summary = "",
+ cause = None,
+ getSqlStateOrFallback(params))),
errorConstructor(params =>
new SparkRuntimeException(
- params.errorClass.orNull,
- params.messageParameters,
+ getErrorClassOrFallback(params),
+ errorParamsToMessageParameters(params),
params.cause.orNull,
- params.queryContext)),
+ params.queryContext,
+ getSqlStateOrFallback(params))),
errorConstructor(params =>
new SparkUpgradeException(
- params.errorClass.orNull,
- params.messageParameters,
- params.cause.orNull)),
+ getErrorClassOrFallback(params),
+ errorParamsToMessageParameters(params),
+ params.cause.orNull,
+ getSqlStateOrFallback(params))),
errorConstructor(params =>
new SparkException(
message = params.message,
cause = params.cause.orNull,
- errorClass = params.errorClass,
- messageParameters = params.messageParameters,
- context = params.queryContext)))
+ errorClass = Option(getErrorClassOrFallback(params)),
+ messageParameters = errorParamsToMessageParameters(params),
+ context = params.queryContext,
+ sqlState = getSqlStateOrFallback(params))))
/**
* errorsToThrowable reconstructs the exception based on a list of protobuf
messages
@@ -301,6 +393,10 @@ private[client] object GrpcExceptionConverter {
Some(error.getSparkThrowable.getErrorClass)
} else None
+ val sqlState = if (error.hasSparkThrowable &&
error.getSparkThrowable.hasSqlState) {
+ Some(error.getSparkThrowable.getSqlState)
+ } else None
+
val messageParameters = if (error.hasSparkThrowable) {
error.getSparkThrowable.getMessageParametersMap.asScala.toMap
} else Map.empty[String, String]
@@ -328,7 +424,8 @@ private[client] object GrpcExceptionConverter {
cause = causeOpt,
errorClass = errorClass,
messageParameters = messageParameters,
- queryContext = queryContext))
+ queryContext = queryContext,
+ sqlState = sqlState))
if (!error.getStackTraceList.isEmpty) {
exception.setStackTrace(error.getStackTraceList.asScala.toArray.map {
stackTraceElement =>
@@ -352,6 +449,7 @@ private[client] object GrpcExceptionConverter {
val classes =
JsonMethods.parse(info.getMetadataOrDefault("classes",
"[]")).extract[Array[String]]
val errorClass = info.getMetadataOrDefault("errorClass", null)
+ val sqlState = info.getMetadataOrDefault("sqlState", null)
val builder = FetchErrorDetailsResponse.Error
.newBuilder()
.setMessage(message)
@@ -361,12 +459,16 @@ private[client] object GrpcExceptionConverter {
val messageParameters = JsonMethods
.parse(info.getMetadataOrDefault("messageParameters", "{}"))
.extract[Map[String, String]]
- builder.setSparkThrowable(
- FetchErrorDetailsResponse.SparkThrowable
- .newBuilder()
- .setErrorClass(errorClass)
- .putAllMessageParameters(messageParameters.asJava)
- .build())
+ val sparkThrowableBuilder = FetchErrorDetailsResponse.SparkThrowable
+ .newBuilder()
+ .setErrorClass(errorClass)
+ .putAllMessageParameters(messageParameters.asJava)
+
+ if (sqlState != null) {
+ sparkThrowableBuilder.setSqlState(sqlState)
+ }
+
+ builder.setSparkThrowable(sparkThrowableBuilder.build())
}
errorsToThrowable(0, Seq(builder.build()))
@@ -384,8 +486,12 @@ private[client] object GrpcExceptionConverter {
* params.
*/
private def errorParamsToMessageParameters(params: ErrorParams): Map[String,
String] =
- params.errorClass match {
- case Some(_) => params.messageParameters
- case None => Map("message" -> params.message)
+ resolveParams(params).errorClass match {
+ case Some("CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE") =>
+ Map("message" -> params.message)
+ case Some(_) =>
+ params.messageParameters
+ case None =>
+ Map("message" -> params.message)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]