This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8acdc7a83803 [SPARK-54887][CONNECT] Always set a sql state in spark 
connect client
8acdc7a83803 is described below

commit 8acdc7a83803145cd0a40f631223ef6055513ec4
Author: Garland Zhang <[email protected]>
AuthorDate: Tue Jan 27 09:51:00 2026 +0100

    [SPARK-54887][CONNECT] Always set a sql state in spark connect client
    
    ### What changes were proposed in this pull request?
    
    Ensure there exists an error class in every error thrown from spark connect 
client by providing fallback error class and fallback sql state.
    
    ```
       * 
+--------------+--------------+------------------------------------------------+
       * | errorClass   | sqlState     | Description                            
        |
       * 
+--------------+--------------+------------------------------------------------+
       * | null         | null         | Set errorClass to                      
        |
       * |              |              | 
CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE.   |
       * |              |              | sqlState will be read from JSON file 
as "XXKCM"|
       * 
+--------------+--------------+------------------------------------------------+
       * | null         | not null     | Do nothing since sqlState is already 
provided. |
       * 
+--------------+--------------+------------------------------------------------+
       * | not null     | null         | Try to read sqlState from error class 
JSON     |
       * |              |              | file using errorClass. If not found, 
the       |
       * |              |              | client is out of date so fallback to 
"XXKCM".  |
       * 
+--------------+--------------+------------------------------------------------+
       * | not null     | not null     | Do nothing since the error is fully    
        |
       * |              |              | constructed.                           
        |
       * 
+--------------+--------------+------------------------------------------------+
    ```
    
    Expanded some exception class definitions to also accept more parameters 
like sqlState
    
    Note: This PR introduces a behavior change for Non spark exceptions thrown 
from GRPC layer as all exceptions thrown from GRPC layer will now be some form 
of SparkThrowable (this ensures we have sql state defined).
    
    ### Why are the changes needed?
    
    At the moment there are some cases where an errorClass is not thrown. This 
goes against the goal of promoting a definition for providing a sql state for 
every exception in better sql error messages.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. A previous error message that did not throw an error class now does.
    
    ### How was this patch tested?
    
    Unit testing
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Yes
    
    Closes #52589 from garlandz-db/SPARK-53883.
    
    Authored-by: Garland Zhang <[email protected]>
    Signed-off-by: Herman van Hövell <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |   6 +
 .../src/main/resources/error/error-states.json     |   6 +
 .../scala/org/apache/spark/SparkException.scala    | 121 ++++++++++++-
 .../sql/streaming/StreamingQueryException.scala    |  17 ++
 .../org/apache/spark/sql/AnalysisException.scala   |  17 ++
 .../spark/sql/connect/ClientE2ETestSuite.scala     |   4 +-
 .../connect/client/SparkConnectClientSuite.scala   |   7 +-
 .../connect/client/GrpcExceptionConverter.scala    | 194 ++++++++++++++++-----
 8 files changed, 323 insertions(+), 49 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 430cd1ea2000..630bad76f6cd 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -912,6 +912,12 @@
     },
     "sqlState" : "56K00"
   },
+  "CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE" : {
+    "message" : [
+      "Unidentified Error: <message>"
+    ],
+    "sqlState" : "XXKCM"
+  },
   "CONNECT_INVALID_PLAN" : {
     "message" : [
       "The Spark Connect plan is invalid."
diff --git a/common/utils/src/main/resources/error/error-states.json 
b/common/utils/src/main/resources/error/error-states.json
index 4fddbeed4090..7b3050bd2266 100644
--- a/common/utils/src/main/resources/error/error-states.json
+++ b/common/utils/src/main/resources/error/error-states.json
@@ -7524,6 +7524,12 @@
         "standard": "N",
         "usedBy": ["PostgreSQL", "Redshift"]
     },
+    "XXKCM": {
+        "description": "Connect Client - Unexpected missing SQL state",
+        "origin": "Spark",
+        "standard": "N",
+        "usedBy": ["Spark"]
+    },
     "XXKD0": {
         "description": "Analysis - Bad plan",
         "origin": "Databricks",
diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala 
b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
index fc4b3e50be4b..9566826008f2 100644
--- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala
+++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
@@ -198,6 +198,20 @@ private[spark] class SparkUpgradeException private(
     )
   }
 
+  def this(
+    errorClass: String,
+    messageParameters: Map[String, String],
+    cause: Throwable,
+    sqlState: Option[String]) = {
+    this(
+      SparkThrowableHelper.getMessage(errorClass, messageParameters),
+      Option(cause),
+      Option(errorClass),
+      messageParameters,
+      sqlState
+    )
+  }
+
   override def getMessageParameters: java.util.Map[String, String] = 
messageParameters.asJava
 
   override def getCondition: String = errorClass.orNull
@@ -229,6 +243,20 @@ private[spark] class SparkArithmeticException private(
     )
   }
 
+  def this(
+    errorClass: String,
+    messageParameters: Map[String, String],
+    context: Array[QueryContext],
+    sqlState: Option[String]) = {
+    this(
+      SparkThrowableHelper.getMessage(errorClass, messageParameters, ""),
+      Option(errorClass),
+      messageParameters,
+      context,
+      sqlState
+    )
+  }
+
   def this(
     errorClass: String,
     messageParameters: Map[String, String],
@@ -263,6 +291,18 @@ private[spark] class SparkUnsupportedOperationException 
private(
     )
   }
 
+  def this(
+    errorClass: String,
+    messageParameters: Map[String, String],
+    sqlState: Option[String]) = {
+    this(
+      SparkThrowableHelper.getMessage(errorClass, messageParameters),
+      Option(errorClass),
+      messageParameters,
+      sqlState
+    )
+  }
+
   def this(
     errorClass: String,
     messageParameters: java.util.Map[String, String]) =
@@ -273,7 +313,8 @@ private[spark] class SparkUnsupportedOperationException 
private(
     this(
       SparkThrowableHelper.getMessage(errorClass, Map.empty[String, String]),
       Option(errorClass),
-      Map.empty)
+      Map.empty,
+      None)
   }
 
   override def getMessageParameters: java.util.Map[String, String] = 
messageParameters.asJava
@@ -376,6 +417,23 @@ private[spark] class SparkDateTimeException private(
     )
   }
 
+  def this(
+    errorClass: String,
+    messageParameters: Map[String, String],
+    context: Array[QueryContext],
+    summary: String,
+    cause: Option[Throwable],
+    sqlState: Option[String]) = {
+    this(
+      SparkThrowableHelper.getMessage(errorClass, messageParameters, summary),
+      Option(errorClass),
+      messageParameters,
+      context,
+      cause.orElse(None),
+      sqlState
+    )
+  }
+
   def this(
     errorClass: String,
     messageParameters: Map[String, String],
@@ -433,6 +491,20 @@ private[spark] class SparkNumberFormatException private(
     )
   }
 
+  def this(
+    errorClass: String,
+    messageParameters: Map[String, String],
+    context: Array[QueryContext],
+    sqlState: Option[String]) = {
+    this(
+      SparkThrowableHelper.getMessage(errorClass, messageParameters, ""),
+      Option(errorClass),
+      messageParameters,
+      context,
+      sqlState
+    )
+  }
+
   def this(
     errorClass: String,
     messageParameters: Map[String, String],
@@ -475,6 +547,23 @@ private[spark] class SparkIllegalArgumentException private(
     )
   }
 
+  def this(
+    errorClass: String,
+    messageParameters: Map[String, String],
+    context: Array[QueryContext],
+    summary: String,
+    cause: Throwable,
+    sqlState: Option[String]) = {
+    this(
+      SparkThrowableHelper.getMessage(errorClass, messageParameters, summary),
+      Option(cause),
+      Option(errorClass),
+      messageParameters,
+      context,
+      sqlState
+    )
+  }
+
   def this(
     errorClass: String,
     messageParameters: Map[String, String],
@@ -550,6 +639,22 @@ private[spark] class SparkRuntimeException private(
     )
   }
 
+  def this(
+    errorClass: String,
+    messageParameters: Map[String, String],
+    cause: Throwable,
+    context: Array[QueryContext],
+    sqlState: Option[String]) = {
+    this(
+      SparkThrowableHelper.getMessage(errorClass, messageParameters, ""),
+      Option(cause),
+      Option(errorClass),
+      messageParameters,
+      context,
+      sqlState
+    )
+  }
+
   override def getMessageParameters: java.util.Map[String, String] = 
messageParameters.asJava
 
   override def getCondition: String = errorClass.orNull
@@ -658,6 +763,20 @@ private[spark] class SparkArrayIndexOutOfBoundsException 
private(
     )
   }
 
+  def this(
+    errorClass: String,
+    messageParameters: Map[String, String],
+    context: Array[QueryContext],
+    sqlState: Option[String]) = {
+    this(
+      SparkThrowableHelper.getMessage(errorClass, messageParameters, ""),
+      Option(errorClass),
+      messageParameters,
+      context,
+      sqlState
+    )
+  }
+
   def this(
     errorClass: String,
     messageParameters: Map[String, String],
diff --git 
a/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
 
b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index b30260b8d6cc..3abff53f3265 100644
--- 
a/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ 
b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -58,6 +58,23 @@ class StreamingQueryException private[sql](
       messageParameters)
   }
 
+  private[spark] def this(
+      message: String,
+      cause: Throwable,
+      errorClass: String,
+      messageParameters: Map[String, String],
+      sqlState: Option[String]) = {
+    this(
+      messageParameters.getOrElse("queryDebugString", ""),
+      message,
+      cause,
+      messageParameters.getOrElse("startOffset", ""),
+      messageParameters.getOrElse("endOffset", ""),
+      errorClass,
+      messageParameters,
+      sqlState)
+  }
+
   def this(
       queryDebugString: String,
       cause: Throwable,
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/AnalysisException.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/AnalysisException.scala
index ea45f20a0593..12eebf866a1f 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/AnalysisException.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/AnalysisException.scala
@@ -138,6 +138,23 @@ class AnalysisException protected (
       context = origin.getQueryContext,
       cause = cause)
 
+  def this(
+      message: String,
+      cause: Option[Throwable],
+      errorClass: Option[String],
+      messageParameters: Map[String, String],
+      context: Array[QueryContext],
+      sqlState: Option[String]) =
+    this(
+      message = message,
+      line = None,
+      startPosition = None,
+      cause = cause,
+      errorClass = errorClass,
+      messageParameters = messageParameters,
+      context = context,
+      sqlState = sqlState)
+
   def copy(
       message: String,
       line: Option[Int],
diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
index b9f72badd45f..8161e327569a 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
@@ -132,8 +132,8 @@ class ClientE2ETestSuite
       assert(ex.getCause.isInstanceOf[SparkException])
 
       val cause = ex.getCause.asInstanceOf[SparkException]
-      assert(cause.getCondition == null)
-      assert(cause.getMessageParameters.isEmpty)
+      assert(cause.getCondition == 
"CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE")
+      assert(cause.getMessageParameters.asScala == Map("message" -> 
"test".repeat(10000)))
       assert(cause.getMessage.contains("test".repeat(10000)))
     }
   }
diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index 20d1187d2a8f..98fc5dd78ee4 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -227,13 +227,15 @@ class SparkConnectClientSuite extends ConnectFunSuite {
           cause = None,
           errorClass = Some("DUPLICATE_KEY"),
           messageParameters = Map("keyColumn" -> "`abc`"),
-          queryContext = Array.empty)
+          queryContext = Array.empty,
+          sqlState = None)
         val error = constructor(testParams).asInstanceOf[Throwable with 
SparkThrowable]
         assert(error.getMessage.contains(testParams.message))
         assert(error.getCause == null)
         assert(error.getCondition == testParams.errorClass.get)
         assert(error.getMessageParameters.asScala == 
testParams.messageParameters)
         assert(error.getQueryContext.isEmpty)
+        assert(error.getSqlState == "23505")
       }
     }
 
@@ -244,7 +246,8 @@ class SparkConnectClientSuite extends ConnectFunSuite {
           cause = None,
           errorClass = None,
           messageParameters = Map.empty,
-          queryContext = Array.empty)
+          queryContext = Array.empty,
+          sqlState = None)
         val error = constructor(testParams)
         assert(error.getMessage.contains(testParams.message))
         assert(error.getCause == null)
diff --git 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index 7e0b0949fcf1..5bcf4c9acd40 100644
--- 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++ 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -27,7 +27,7 @@ import io.grpc.protobuf.StatusProto
 import org.json4s.{DefaultFormats, Formats}
 import org.json4s.jackson.JsonMethods
 
-import org.apache.spark.{QueryContext, QueryContextType, 
SparkArithmeticException, SparkArrayIndexOutOfBoundsException, 
SparkDateTimeException, SparkException, SparkIllegalArgumentException, 
SparkNumberFormatException, SparkRuntimeException, 
SparkUnsupportedOperationException, SparkUpgradeException}
+import org.apache.spark.{QueryContext, QueryContextType, 
SparkArithmeticException, SparkArrayIndexOutOfBoundsException, 
SparkDateTimeException, SparkException, SparkIllegalArgumentException, 
SparkNumberFormatException, SparkRuntimeException, SparkThrowableHelper, 
SparkUnsupportedOperationException, SparkUpgradeException}
 import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, 
FetchErrorDetailsResponse, SparkConnectServiceGrpc, UserContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
@@ -160,12 +160,45 @@ private[client] class GrpcExceptionConverter(channel: 
ManagedChannel) extends Lo
     }
 
     // If no ErrorInfo is found, create a SparkException based on the 
StatusRuntimeException.
-    new SparkException(ex.toString, ex.getCause)
+    new SparkException(
+      message = ex.toString,
+      cause = ex.getCause,
+      errorClass = Some("CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE"),
+      messageParameters = Map("message" -> ex.toString),
+      context = Array.empty)
   }
 }
 
 private[client] object GrpcExceptionConverter {
 
+  /**
+   * Error Class and SQL State Resolution Logic
+   * ===========================================
+   *
+   * When constructing exceptions from server responses, the errorClass and 
sqlState
+   * may or may not be present. The following table describes how they are 
resolved:
+   *
+   * 
+--------------+--------------+------------------------------------------------+
+   * | errorClass   | sqlState     | Description                               
     |
+   * 
+--------------+--------------+------------------------------------------------+
+   * | null         | null         | Set errorClass to                         
     |
+   * |              |              | 
CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE.   |
+   * |              |              | sqlState will be read from JSON file as 
"XXKCM"|
+   * 
+--------------+--------------+------------------------------------------------+
+   * | null         | not null     | Do nothing since sqlState is already 
provided. |
+   * 
+--------------+--------------+------------------------------------------------+
+   * | not null     | null         | Try to read sqlState from error class 
JSON     |
+   * |              |              | file using errorClass. If not found, the  
     |
+   * |              |              | client is out of date so fallback to 
"XXKCM".  |
+   * 
+--------------+--------------+------------------------------------------------+
+   * | not null     | not null     | Do nothing since the error is fully       
     |
+   * |              |              | constructed.                              
     |
+   * 
+--------------+--------------+------------------------------------------------+
+   *
+   * Note: "XXKCM" is the fallback SQL state used when the client cannot read 
the
+   * sqlState from the JSON error definitions file (e.g., client is out of 
date).
+   */
+
   private[client] case class ErrorParams(
       message: String,
       cause: Option[Throwable],
@@ -174,7 +207,45 @@ private[client] object GrpcExceptionConverter {
       // messageParameters will only be set if the error is both enriched and 
SparkThrowable.
       messageParameters: Map[String, String],
       // queryContext will only be set if the error is both enriched and 
SparkThrowable.
-      queryContext: Array[QueryContext])
+      queryContext: Array[QueryContext],
+      // sqlState will be set if the server provided it (from metadata or 
FetchErrorDetails).
+      sqlState: Option[String])
+
+  /**
+   * Resolves errorClass and sqlState based on the above table
+   */
+  private def resolveParams(params: ErrorParams): ErrorParams = {
+    (params.errorClass, params.sqlState) match {
+      case (None, None) =>
+        val fallbackErrorClass = "CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE"
+        val resolvedSqlState = 
Option(SparkThrowableHelper.getSqlState(fallbackErrorClass))
+          .orElse(Some("XXKCM"))
+        params.copy(errorClass = Some(fallbackErrorClass), sqlState = 
resolvedSqlState)
+      case (None, Some(_)) =>
+        params // Keep as is, sqlState is all we care about
+      case (Some(ec), None) =>
+        val resolvedSqlState = Option(SparkThrowableHelper.getSqlState(ec))
+          .orElse(Some("XXKCM"))
+        params.copy(sqlState = resolvedSqlState)
+      case (Some(_), Some(_)) =>
+        params // Keep as is, already constructed
+    }
+  }
+
+  /**
+   * Returns the errorClass from resolved params. May return null if 
errorClass is None and
+   * sqlState is defined (sqlState is all we care about in that case).
+   */
+  private def getErrorClassOrFallback(params: ErrorParams): String = {
+    resolveParams(params).errorClass.orNull
+  }
+
+  /**
+   * Returns the sqlState from resolved params.
+   */
+  private def getSqlStateOrFallback(params: ErrorParams): Option[String] = {
+    resolveParams(params).sqlState
+  }
 
   private def errorConstructor[T <: Throwable: ClassTag](
       throwableCtr: ErrorParams => T): (String, ErrorParams => Throwable) = {
@@ -187,91 +258,112 @@ private[client] object GrpcExceptionConverter {
       new StreamingQueryException(
         params.message,
         params.cause.orNull,
-        params.errorClass.orNull,
-        params.messageParameters)),
+        getErrorClassOrFallback(params),
+        errorParamsToMessageParameters(params),
+        getSqlStateOrFallback(params))),
     errorConstructor(params =>
       new ParseException(
         None,
         Origin(),
-        errorClass = params.errorClass.orNull,
-        messageParameters = params.messageParameters,
+        errorClass = getErrorClassOrFallback(params),
+        messageParameters = errorParamsToMessageParameters(params),
         queryContext = params.queryContext)),
     errorConstructor(params =>
       new AnalysisException(
-        errorClass = params.errorClass.getOrElse("_LEGACY_ERROR_TEMP_3100"),
-        messageParameters = errorParamsToMessageParameters(params),
+        message = params.message,
         cause = params.cause,
-        context = params.queryContext)),
+        errorClass = resolveParams(params).errorClass,
+        messageParameters = errorParamsToMessageParameters(params),
+        context = params.queryContext,
+        sqlState = getSqlStateOrFallback(params))),
     errorConstructor(params =>
-      new NamespaceAlreadyExistsException(params.errorClass.orNull, 
params.messageParameters)),
+      new NamespaceAlreadyExistsException(
+        getErrorClassOrFallback(params),
+        errorParamsToMessageParameters(params))),
     errorConstructor(params =>
       new TableAlreadyExistsException(
-        params.errorClass.orNull,
-        params.messageParameters,
+        getErrorClassOrFallback(params),
+        errorParamsToMessageParameters(params),
         params.cause)),
     errorConstructor(params =>
       new TempTableAlreadyExistsException(
-        params.errorClass.orNull,
-        params.messageParameters,
+        getErrorClassOrFallback(params),
+        errorParamsToMessageParameters(params),
         params.cause)),
     errorConstructor(params =>
       new NoSuchDatabaseException(
-        params.errorClass.orNull,
-        params.messageParameters,
+        getErrorClassOrFallback(params),
+        errorParamsToMessageParameters(params),
         params.cause)),
     errorConstructor(params =>
-      new NoSuchNamespaceException(params.errorClass.orNull, 
params.messageParameters)),
+      new NoSuchNamespaceException(
+        getErrorClassOrFallback(params),
+        errorParamsToMessageParameters(params))),
     errorConstructor(params =>
-      new NoSuchTableException(params.errorClass.orNull, 
params.messageParameters, params.cause)),
+      new NoSuchTableException(
+        getErrorClassOrFallback(params),
+        errorParamsToMessageParameters(params),
+        params.cause)),
     errorConstructor[NumberFormatException](params =>
       new SparkNumberFormatException(
         errorClass = params.errorClass.getOrElse("_LEGACY_ERROR_TEMP_3104"),
         messageParameters = errorParamsToMessageParameters(params),
-        params.queryContext)),
+        params.queryContext,
+        getSqlStateOrFallback(params))),
     errorConstructor[IllegalArgumentException](params =>
       new SparkIllegalArgumentException(
         errorClass = params.errorClass.getOrElse("_LEGACY_ERROR_TEMP_3105"),
         messageParameters = errorParamsToMessageParameters(params),
         params.queryContext,
         summary = "",
-        cause = params.cause.orNull)),
+        cause = params.cause.orNull,
+        getSqlStateOrFallback(params))),
     errorConstructor[ArithmeticException](params =>
       new SparkArithmeticException(
         errorClass = params.errorClass.getOrElse("_LEGACY_ERROR_TEMP_3106"),
         messageParameters = errorParamsToMessageParameters(params),
-        params.queryContext)),
+        params.queryContext,
+        getSqlStateOrFallback(params))),
     errorConstructor[UnsupportedOperationException](params =>
       new SparkUnsupportedOperationException(
         errorClass = params.errorClass.getOrElse("_LEGACY_ERROR_TEMP_3107"),
-        messageParameters = errorParamsToMessageParameters(params))),
+        messageParameters = errorParamsToMessageParameters(params),
+        getSqlStateOrFallback(params))),
     errorConstructor[ArrayIndexOutOfBoundsException](params =>
       new SparkArrayIndexOutOfBoundsException(
         errorClass = params.errorClass.getOrElse("_LEGACY_ERROR_TEMP_3108"),
         messageParameters = errorParamsToMessageParameters(params),
-        params.queryContext)),
+        params.queryContext,
+        getSqlStateOrFallback(params))),
     errorConstructor[DateTimeException](params =>
       new SparkDateTimeException(
         errorClass = params.errorClass.getOrElse("_LEGACY_ERROR_TEMP_3109"),
         messageParameters = errorParamsToMessageParameters(params),
-        params.queryContext)),
+        params.queryContext,
+        summary = "",
+        cause = None,
+        getSqlStateOrFallback(params))),
     errorConstructor(params =>
       new SparkRuntimeException(
-        params.errorClass.orNull,
-        params.messageParameters,
+        getErrorClassOrFallback(params),
+        errorParamsToMessageParameters(params),
         params.cause.orNull,
-        params.queryContext)),
+        params.queryContext,
+        getSqlStateOrFallback(params))),
     errorConstructor(params =>
       new SparkUpgradeException(
-        params.errorClass.orNull,
-        params.messageParameters,
-        params.cause.orNull)),
+        getErrorClassOrFallback(params),
+        errorParamsToMessageParameters(params),
+        params.cause.orNull,
+        getSqlStateOrFallback(params))),
     errorConstructor(params =>
       new SparkException(
         message = params.message,
         cause = params.cause.orNull,
-        errorClass = params.errorClass,
-        messageParameters = params.messageParameters,
-        context = params.queryContext)))
+        errorClass = Option(getErrorClassOrFallback(params)),
+        messageParameters = errorParamsToMessageParameters(params),
+        context = params.queryContext,
+        sqlState = getSqlStateOrFallback(params))))
 
   /**
    * errorsToThrowable reconstructs the exception based on a list of protobuf 
messages
@@ -301,6 +393,10 @@ private[client] object GrpcExceptionConverter {
       Some(error.getSparkThrowable.getErrorClass)
     } else None
 
+    val sqlState = if (error.hasSparkThrowable && 
error.getSparkThrowable.hasSqlState) {
+      Some(error.getSparkThrowable.getSqlState)
+    } else None
+
     val messageParameters = if (error.hasSparkThrowable) {
       error.getSparkThrowable.getMessageParametersMap.asScala.toMap
     } else Map.empty[String, String]
@@ -328,7 +424,8 @@ private[client] object GrpcExceptionConverter {
         cause = causeOpt,
         errorClass = errorClass,
         messageParameters = messageParameters,
-        queryContext = queryContext))
+        queryContext = queryContext,
+        sqlState = sqlState))
 
     if (!error.getStackTraceList.isEmpty) {
       exception.setStackTrace(error.getStackTraceList.asScala.toArray.map { 
stackTraceElement =>
@@ -352,6 +449,7 @@ private[client] object GrpcExceptionConverter {
     val classes =
       JsonMethods.parse(info.getMetadataOrDefault("classes", 
"[]")).extract[Array[String]]
     val errorClass = info.getMetadataOrDefault("errorClass", null)
+    val sqlState = info.getMetadataOrDefault("sqlState", null)
     val builder = FetchErrorDetailsResponse.Error
       .newBuilder()
       .setMessage(message)
@@ -361,12 +459,16 @@ private[client] object GrpcExceptionConverter {
       val messageParameters = JsonMethods
         .parse(info.getMetadataOrDefault("messageParameters", "{}"))
         .extract[Map[String, String]]
-      builder.setSparkThrowable(
-        FetchErrorDetailsResponse.SparkThrowable
-          .newBuilder()
-          .setErrorClass(errorClass)
-          .putAllMessageParameters(messageParameters.asJava)
-          .build())
+      val sparkThrowableBuilder = FetchErrorDetailsResponse.SparkThrowable
+        .newBuilder()
+        .setErrorClass(errorClass)
+        .putAllMessageParameters(messageParameters.asJava)
+
+      if (sqlState != null) {
+        sparkThrowableBuilder.setSqlState(sqlState)
+      }
+
+      builder.setSparkThrowable(sparkThrowableBuilder.build())
     }
 
     errorsToThrowable(0, Seq(builder.build()))
@@ -384,8 +486,12 @@ private[client] object GrpcExceptionConverter {
    *   params.
    */
   private def errorParamsToMessageParameters(params: ErrorParams): Map[String, 
String] =
-    params.errorClass match {
-      case Some(_) => params.messageParameters
-      case None => Map("message" -> params.message)
+    resolveParams(params).errorClass match {
+      case Some("CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE") =>
+        Map("message" -> params.message)
+      case Some(_) =>
+        params.messageParameters
+      case None =>
+        Map("message" -> params.message)
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to