This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c1f4d11e1340 [SPARK-55275] Add InvalidPlanInput sql states for 
sql/connect
c1f4d11e1340 is described below

commit c1f4d11e13401a372443e69aefa66cb1fc37279d
Author: Garland Zhang <[email protected]>
AuthorDate: Thu Mar 12 11:18:45 2026 -0400

    [SPARK-55275] Add InvalidPlanInput sql states for sql/connect
    
    ### What changes were proposed in this pull request?
    
      Add error class `SPARK_CONNECT_INVALID_PLAN_INPUT` with 65
      subclasses, covering all `InvalidPlanInput` throw sites across the Spark 
Connect layer (`InvalidInputErrors`,
      `DataTypeProtoConverter`, `LiteralValueProtoConverter`, 
`SparkConnectAnalyzeHandler`, `SessionHolder`,
      `SparkConnectPlanner`).
    
      ### Why are the changes needed?
    
      `InvalidPlanInput` exceptions previously defaulted to `INTERNAL_ERROR` 
(SQL state `XX000`), making plan-input validation
      failures indistinguishable from genuine internal errors. Use 56K00
    
      ### Does this PR introduce _any_ user-facing change?
    
      Yes. `InvalidPlanInput` exceptions now carry SQL state `56K00` and 
structured error conditions
      (`SPARK_CONNECT_INVALID_PLAN_INPUT.<SUBCLASS>`) instead of 
`INTERNAL_ERROR` / `XX000`.
    
      ### How was this patch tested?
    
      Updated `InvalidInputErrorsSuite` to assert new error conditions and 
`getSqlState() == "56K00"`. All 5 tests pass.
      `IllegalStateErrorsSuite` (17 tests) unchanged and passing.
    
      ### Was this patch authored or co-authored using generative AI tooling?
    
      Generated-by: Claude Sonnet 4.6
    
    Closes #54652 from garlandz-db/SPARK-55275_invalid.
    
    Authored-by: Garland Zhang <[email protected]>
    Signed-off-by: Herman van Hövell <[email protected]>
---
 .../src/main/resources/error/error-conditions.json | 330 +++++++++++++++++++++
 .../connect/common/DataTypeProtoConverter.scala    |  15 +-
 .../common/LiteralValueProtoConverter.scala        |  38 ++-
 .../sql/connect/planner/InvalidInputErrors.scala   | 166 +++++++----
 .../sql/connect/planner/SparkConnectPlanner.scala  |   4 +-
 .../spark/sql/connect/service/SessionHolder.scala  |   4 +-
 .../service/SparkConnectAnalyzeHandler.scala       |   5 +-
 .../connect/planner/InvalidInputErrorsSuite.scala  |  61 +++-
 8 files changed, 533 insertions(+), 90 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 38be710e9865..2eefccd3825e 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -929,6 +929,41 @@
       "The Spark Connect plan is invalid."
     ],
     "subClass" : {
+      "AGGREGATE_NEEDS_PLAN_INPUT" : {
+        "message" : [
+          "Aggregate needs a plan input"
+        ]
+      },
+      "AGGREGATE_WITH_PIVOT_REQUIRES_PIVOT" : {
+        "message" : [
+          "Aggregate with GROUP_TYPE_PIVOT requires a Pivot"
+        ]
+      },
+      "ALIAS_WITH_MULTIPLE_IDENTIFIERS_AND_METADATA" : {
+        "message" : [
+          "Alias expressions with more than 1 identifier must not use optional 
metadata."
+        ]
+      },
+      "ARRAY_LITERAL_MISSING_DATA_TYPE" : {
+        "message" : [
+          "Data type information is missing in the array literal."
+        ]
+      },
+      "ARRAY_LITERAL_NOT_SET" : {
+        "message" : [
+          "Array literal is not set."
+        ]
+      },
+      "ASSERTION_FAILURE" : {
+        "message" : [
+          "<message>"
+        ]
+      },
+      "CANNOT_FIND_CACHED_LOCAL_RELATION" : {
+        "message" : [
+          "Cannot find a cached local relation for hash: <hash>"
+        ]
+      },
       "CANNOT_PARSE" : {
         "message" : [
           "Cannot decompress or parse the input plan (<errorMsg>)",
@@ -936,12 +971,307 @@
           "To disable plan compression, set 
'spark.connect.session.planCompression.threshold' to -1."
         ]
       },
+      "CHUNKED_CACHED_LOCAL_RELATION_WITHOUT_DATA" : {
+        "message" : [
+          "ChunkedCachedLocalRelation should contain data."
+        ]
+      },
+      "DATAFRAME_NOT_FOUND" : {
+        "message" : [
+          "No DataFrame with id <dfId> is found in the session <sessionId>"
+        ]
+      },
+      "DATA_TYPE_UNSUPPORTED_CATALYST_TO_PROTO" : {
+        "message" : [
+          "Does not support convert <typeName> to connect proto types."
+        ]
+      },
+      "DATA_TYPE_UNSUPPORTED_PROTO_TO_CATALYST" : {
+        "message" : [
+          "Does not support convert <kindCase> to catalyst types."
+        ]
+      },
+      "DEDUPLICATE_ALL_COLUMNS_AND_SUBSET" : {
+        "message" : [
+          "Cannot deduplicate on both all columns and a subset of columns"
+        ]
+      },
+      "DEDUPLICATE_NEEDS_INPUT" : {
+        "message" : [
+          "Deduplicate needs a plan input"
+        ]
+      },
+      "DEDUPLICATE_REQUIRES_COLUMNS_OR_ALL" : {
+        "message" : [
+          "Deduplicate requires to either deduplicate on all columns or a 
subset of columns"
+        ]
+      },
+      "EXCEPT_DOES_NOT_SUPPORT_UNION_BY_NAME" : {
+        "message" : [
+          "Except does not support union_by_name"
+        ]
+      },
+      "EXPECTED_NULL_VALUE" : {
+        "message" : [
+          "Expected null value, but got <literalTypeCase>"
+        ]
+      },
+      "EXPECTING_SCALA_UDF" : {
+        "message" : [
+          "Expecting a Scala UDF, but get <exprType>"
+        ]
+      },
+      "FIELD_CANNOT_BE_EMPTY" : {
+        "message" : [
+          "<fieldName> in <fullName> cannot be empty"
+        ]
+      },
+      "FUNCTION_EVAL_TYPE_NOT_SUPPORTED" : {
+        "message" : [
+          "Function with EvalType: <evalType> is not supported"
+        ]
+      },
+      "GROUPING_EXPRESSION_ABSENT" : {
+        "message" : [
+          "The grouping expression cannot be absent for KeyValueGroupedDataset"
+        ]
+      },
+      "INCOMPATIBLE_LITERAL_DATA_TYPE" : {
+        "message" : [
+          "Incompatible data type <dataTypeKindCase> for literal 
<literalTypeCase>"
+        ]
+      },
+      "INPUT_DATA_NO_SCHEMA" : {
+        "message" : [
+          "Input data for LocalRelation does not produce a schema."
+        ]
+      },
+      "INTERSECT_DOES_NOT_SUPPORT_UNION_BY_NAME" : {
+        "message" : [
+          "Intersect does not support union_by_name"
+        ]
+      },
+      "INVALID_ENUM" : {
+        "message" : [
+          "This enum value of <fullName> is invalid: <name>(<number>)"
+        ]
+      },
+      "INVALID_JDBC_PARAMS" : {
+        "message" : [
+          "Invalid jdbc params, please specify jdbc url and table."
+        ]
+      },
+      "INVALID_ONE_OF_FIELD_NOT_SET" : {
+        "message" : [
+          "This oneOf field in <fullName> is not set: <name>"
+        ]
+      },
+      "INVALID_ONE_OF_FIELD_NOT_SUPPORTED" : {
+        "message" : [
+          "This oneOf field message in <fullName> is not supported: 
<name>(<number>)"
+        ]
+      },
+      "INVALID_SCHEMA_NON_STRUCT_TYPE" : {
+        "message" : [
+          "The input schema <inputSchema> is not a struct type, but got 
<dataType>."
+        ]
+      },
+      "INVALID_SQL_WITH_REFERENCES" : {
+        "message" : [
+          "<query> is not a valid relation for SQL with references"
+        ]
+      },
+      "INVALID_WITH_RELATION_REFERENCE" : {
+        "message" : [
+          "Invalid WithRelation reference"
+        ]
+      },
+      "LAMBDA_FUNCTION_ARGUMENT_COUNT_INVALID" : {
+        "message" : [
+          "LambdaFunction requires 1 ~ 3 arguments, but got <got> ones!"
+        ]
+      },
+      "LOCAL_RELATION_CHUNK_SIZE_LIMIT_EXCEEDED" : {
+        "message" : [
+          "One of cached local relation chunks exceeded the limit of <limit> 
bytes."
+        ]
+      },
+      "LOCAL_RELATION_SIZE_LIMIT_EXCEEDED" : {
+        "message" : [
+          "Cached local relation size (<actualSize> bytes) exceeds the limit 
(<limit> bytes)."
+        ]
+      },
+      "LOWER_BOUND_REQUIRED_IN_WINDOW_FRAME" : {
+        "message" : [
+          "LowerBound is required in WindowFrame"
+        ]
+      },
+      "MAP_LITERAL_MISSING_DATA_TYPE" : {
+        "message" : [
+          "Data type information is missing in the map literal."
+        ]
+      },
+      "MAP_LITERAL_NOT_SET" : {
+        "message" : [
+          "Map literal is not set."
+        ]
+      },
+      "MULTIPLE_PATHS_NOT_SUPPORTED_FOR_STREAMING_SOURCE" : {
+        "message" : [
+          "Multiple paths are not supported for streaming source"
+        ]
+      },
+      "NA_FILL_VALUES_EMPTY" : {
+        "message" : [
+          "values must contains at least 1 item!"
+        ]
+      },
+      "NA_FILL_VALUES_LENGTH_MISMATCH" : {
+        "message" : [
+          "When values contains more than 1 items, values and cols should have 
the same length!"
+        ]
+      },
+      "NOT_FOUND_CACHED_LOCAL_RELATION" : {
+        "message" : [
+          "Not found any cached local relation with the hash: <hash> in the 
session with sessionUUID <sessionUUID>."
+        ]
+      },
+      "NOT_FOUND_CHUNKED_CACHED_LOCAL_RELATION" : {
+        "message" : [
+          "Not found chunked cached local relation block with the hash: <hash> 
in the session with sessionUUID <sessionUUID>."
+        ]
+      },
+      "NO_HANDLER_FOR_EXTENSION" : {
+        "message" : [
+          "No handler found for extension type: <extensionTypeUrl>"
+        ]
+      },
       "PLAN_SIZE_LARGER_THAN_MAX" : {
         "message" : [
           "The plan size is larger than max (<planSize> vs. <maxPlanSize>)",
           "This typically occurs when building very complex queries with many 
operations, large literals, or deeply nested expressions.",
           "Consider splitting the query into smaller parts using temporary 
views for intermediate results or reducing the number of operations."
         ]
+      },
+      "PREDICATES_NOT_SUPPORTED_FOR_DATA_SOURCE" : {
+        "message" : [
+          "Predicates are not supported for <format> data sources."
+        ]
+      },
+      "PYTHON_UDT_MISSING_FIELDS" : {
+        "message" : [
+          "PythonUserDefinedType requires all the three fields: python_class, 
serialized_python_class and sql_type."
+        ]
+      },
+      "REDUCE_SHOULD_CARRY_SCALAR_SCALA_UDF" : {
+        "message" : [
+          "reduce should carry a scalar scala udf, but got <got>"
+        ]
+      },
+      "ROW_NOT_SUPPORTED_FOR_UDF" : {
+        "message" : [
+          "Row is not a supported <errorType> type for this UDF."
+        ]
+      },
+      "SCHEMA_REQUIRED_FOR_LOCAL_RELATION" : {
+        "message" : [
+          "Schema for LocalRelation is required when the input data is not 
provided."
+        ]
+      },
+      "SET_OPERATION_MUST_HAVE_TWO_INPUTS" : {
+        "message" : [
+          "Set operation must have 2 inputs"
+        ]
+      },
+      "SQL_COMMAND_EXPECTS_SQL_OR_WITH_RELATIONS" : {
+        "message" : [
+          "SQL command expects either a SQL or a WithRelations, but got 
<other>"
+        ]
+      },
+      "STREAMING_QUERY_NOT_FOUND" : {
+        "message" : [
+          "Streaming query <id> is not found"
+        ]
+      },
+      "STREAMING_QUERY_RUN_ID_MISMATCH" : {
+        "message" : [
+          "Run id mismatch for query id <id>. Run id in the request <runId> 
does not match one on the server <serverRunId>. The query might have restarted."
+        ]
+      },
+      "STRUCT_LITERAL_MISSING_DATA_TYPE" : {
+        "message" : [
+          "Data type information is missing in the struct literal."
+        ]
+      },
+      "STRUCT_LITERAL_NOT_SET" : {
+        "message" : [
+          "Struct literal is not set."
+        ]
+      },
+      "UDT_TYPE_FIELD_INVALID" : {
+        "message" : [
+          "UserDefinedType requires the 'type' field to be 'udt', but got 
'<udtType>'."
+        ]
+      },
+      "UNION_BY_NAME_ALLOW_MISSING_COL_REQUIRES_BY_NAME" : {
+        "message" : [
+          "UnionByName `allowMissingCol` can be true only if `byName` is true."
+        ]
+      },
+      "UNKNOWN_ANALYZE_METHOD" : {
+        "message" : [
+          "Unknown Analyze Method <other>!"
+        ]
+      },
+      "UNRESOLVED_COLUMN_AMONG_FIELD_NAMES" : {
+        "message" : [
+          "Cannot resolve column name \"<colName>\" among (<fieldNames>)."
+        ]
+      },
+      "UNRESOLVED_NAMED_LAMBDA_VARIABLE_REQUIRES_NAME_PART" : {
+        "message" : [
+          "UnresolvedNamedLambdaVariable requires at least one name part!"
+        ]
+      },
+      "UNRESOLVED_STAR_TARGET_INVALID" : {
+        "message" : [
+          "UnresolvedStar requires a unparsed target ending with '.*', but got 
<target>."
+        ]
+      },
+      "UNRESOLVED_STAR_WITH_BOTH_TARGET_AND_PLAN_ID" : {
+        "message" : [
+          "UnresolvedStar with both target and plan id is not supported."
+        ]
+      },
+      "UNSUPPORTED_LITERAL_TYPE" : {
+        "message" : [
+          "Unsupported Literal Type: <typeInfo>"
+        ]
+      },
+      "UNSUPPORTED_USER_DEFINED_FUNCTION_IMPLEMENTATION" : {
+        "message" : [
+          "Unsupported UserDefinedFunction implementation: <clazz>"
+        ]
+      },
+      "UPPER_BOUND_REQUIRED_IN_WINDOW_FRAME" : {
+        "message" : [
+          "UpperBound is required in WindowFrame"
+        ]
+      },
+      "USING_COLUMNS_OR_JOIN_CONDITION_SET_IN_JOIN" : {
+        "message" : [
+          "Using columns or join conditions cannot be set at the same time in 
Join"
+        ]
+      },
+      "WINDOW_FUNCTION_REQUIRED" : {
+        "message" : [
+          "WindowFunction is required in WindowExpression"
+        ]
+      },
+      "WITH_COLUMNS_REQUIRE_SINGLE_NAME_PART" : {
+        "message" : [
+          "WithColumns require column name only contains one name part, but 
got <got>"
+        ]
       }
     },
     "sqlState" : "56K00"
diff --git 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/DataTypeProtoConverter.scala
 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/DataTypeProtoConverter.scala
index 0500ca478dad..ceccf780f586 100644
--- 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/DataTypeProtoConverter.scala
+++ 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/DataTypeProtoConverter.scala
@@ -89,7 +89,9 @@ object DataTypeProtoConverter {
       case proto.DataType.KindCase.UDT => toCatalystUDT(t.getUdt)
 
       case _ =>
-        throw InvalidPlanInput(s"Does not support convert ${t.getKindCase} to 
catalyst types.")
+        throw InvalidPlanInput(
+          "CONNECT_INVALID_PLAN.DATA_TYPE_UNSUPPORTED_PROTO_TO_CATALYST",
+          Map("kindCase" -> t.getKindCase.toString))
     }
   }
 
@@ -147,7 +149,8 @@ object DataTypeProtoConverter {
   private def toCatalystUDT(t: proto.DataType.UDT): UserDefinedType[_] = {
     if (t.getType != "udt") {
       throw InvalidPlanInput(
-        s"""UserDefinedType requires the 'type' field to be 'udt', but got 
'${t.getType}'.""")
+        "CONNECT_INVALID_PLAN.UDT_TYPE_FIELD_INVALID",
+        Map("udtType" -> t.getType))
     }
 
     if (t.hasJvmClass) {
@@ -157,9 +160,7 @@ object DataTypeProtoConverter {
         .newInstance()
     } else {
       if (!t.hasPythonClass || !t.hasSerializedPythonClass || !t.hasSqlType) {
-        throw InvalidPlanInput(
-          "PythonUserDefinedType requires all the three fields: " +
-            "python_class, serialized_python_class and sql_type.")
+        throw 
InvalidPlanInput("CONNECT_INVALID_PLAN.PYTHON_UDT_MISSING_FIELDS", Map.empty)
       }
 
       new PythonUserDefinedType(
@@ -389,7 +390,9 @@ object DataTypeProtoConverter {
         }
 
       case _ =>
-        throw InvalidPlanInput(s"Does not support convert ${t.typeName} to 
connect proto types.")
+        throw InvalidPlanInput(
+          "CONNECT_INVALID_PLAN.DATA_TYPE_UNSUPPORTED_CATALYST_TO_PROTO",
+          Map("typeName" -> t.typeName))
     }
   }
 }
diff --git 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/LiteralValueProtoConverter.scala
 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/LiteralValueProtoConverter.scala
index 63c43f956d78..026f5441c6ca 100644
--- 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/LiteralValueProtoConverter.scala
+++ 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/LiteralValueProtoConverter.scala
@@ -387,7 +387,10 @@ object LiteralValueProtoConverter {
   private def getScalaConverter(dataType: proto.DataType): 
proto.Expression.Literal => Any = {
     val converter: proto.Expression.Literal => Any = dataType.getKindCase 
match {
       case proto.DataType.KindCase.NULL =>
-        v => throw InvalidPlanInput(s"Expected null value, but got 
${v.getLiteralTypeCase}")
+        v =>
+          throw InvalidPlanInput(
+            "CONNECT_INVALID_PLAN.EXPECTED_NULL_VALUE",
+            Map("literalTypeCase" -> v.getLiteralTypeCase.toString))
       case proto.DataType.KindCase.SHORT => v => v.getShort.toShort
       case proto.DataType.KindCase.INTEGER => v => v.getInteger
       case proto.DataType.KindCase.LONG => v => v.getLong
@@ -421,7 +424,9 @@ object LiteralValueProtoConverter {
       case proto.DataType.KindCase.STRUCT =>
         v => toScalaStructInternal(v, dataType.getStruct)
       case _ =>
-        throw InvalidPlanInput(s"Unsupported Literal Type: 
${dataType.getKindCase}")
+        throw InvalidPlanInput(
+          "CONNECT_INVALID_PLAN.UNSUPPORTED_LITERAL_TYPE",
+          Map("typeInfo" -> dataType.getKindCase.toString))
     }
     v => if (v.hasNull) null else converter(v)
   }
@@ -558,7 +563,9 @@ object LiteralValueProtoConverter {
                   .setContainsNull(true)
                   .build())
             } else {
-              throw InvalidPlanInput("Data type information is missing in the 
array literal.")
+              throw InvalidPlanInput(
+                "CONNECT_INVALID_PLAN.ARRAY_LITERAL_MISSING_DATA_TYPE",
+                Map.empty)
             }
           case proto.Expression.Literal.LiteralTypeCase.MAP =>
             if (literal.getMap.hasKeyType && literal.getMap.hasValueType) {
@@ -570,18 +577,23 @@ object LiteralValueProtoConverter {
                   .setValueContainsNull(true)
                   .build())
             } else {
-              throw InvalidPlanInput("Data type information is missing in the 
map literal.")
+              throw InvalidPlanInput(
+                "CONNECT_INVALID_PLAN.MAP_LITERAL_MISSING_DATA_TYPE",
+                Map.empty)
             }
           case proto.Expression.Literal.LiteralTypeCase.STRUCT =>
             if (literal.getStruct.hasStructType) {
               builder.setStruct(literal.getStruct.getStructType.getStruct)
             } else {
-              throw InvalidPlanInput("Data type information is missing in the 
struct literal.")
+              throw InvalidPlanInput(
+                "CONNECT_INVALID_PLAN.STRUCT_LITERAL_MISSING_DATA_TYPE",
+                Map.empty)
             }
           case _ =>
+            val literalCase = literal.getLiteralTypeCase
             throw InvalidPlanInput(
-              s"Unsupported Literal Type: ${literal.getLiteralTypeCase.name}" +
-                s"(${literal.getLiteralTypeCase.getNumber})")
+              "CONNECT_INVALID_PLAN.UNSUPPORTED_LITERAL_TYPE",
+              Map("typeInfo" -> 
s"${literalCase.name}(${literalCase.getNumber})"))
         }
         builder.build()
       }
@@ -589,8 +601,10 @@ object LiteralValueProtoConverter {
 
     if (!isCompatible(literal.getLiteralTypeCase, dataType.getKindCase)) {
       throw InvalidPlanInput(
-        s"Incompatible data type ${dataType.getKindCase} " +
-          s"for literal ${literal.getLiteralTypeCase}")
+        "CONNECT_INVALID_PLAN.INCOMPATIBLE_LITERAL_DATA_TYPE",
+        Map(
+          "dataTypeKindCase" -> dataType.getKindCase.toString,
+          "literalTypeCase" -> literal.getLiteralTypeCase.toString))
     }
 
     dataType
@@ -600,7 +614,7 @@ object LiteralValueProtoConverter {
       literal: proto.Expression.Literal,
       arrayType: proto.DataType.Array): Array[_] = {
     if (!literal.hasArray) {
-      throw InvalidPlanInput("Array literal is not set.")
+      throw InvalidPlanInput("CONNECT_INVALID_PLAN.ARRAY_LITERAL_NOT_SET", 
Map.empty)
     }
     val array = literal.getArray
     def makeArrayData[T](converter: proto.Expression.Literal => T)(implicit
@@ -620,7 +634,7 @@ object LiteralValueProtoConverter {
       literal: proto.Expression.Literal,
       mapType: proto.DataType.Map): mutable.Map[_, _] = {
     if (!literal.hasMap) {
-      throw InvalidPlanInput("Map literal is not set.")
+      throw InvalidPlanInput("CONNECT_INVALID_PLAN.MAP_LITERAL_NOT_SET", 
Map.empty)
     }
     val map = literal.getMap
     def makeMapData[K, V](
@@ -646,7 +660,7 @@ object LiteralValueProtoConverter {
       literal: proto.Expression.Literal,
       structType: proto.DataType.Struct): Any = {
     if (!literal.hasStruct) {
-      throw InvalidPlanInput("Struct literal is not set.")
+      throw InvalidPlanInput("CONNECT_INVALID_PLAN.STRUCT_LITERAL_NOT_SET", 
Map.empty)
     }
     val struct = literal.getStruct
     val structData = Array.tabulate(struct.getElementsCount) { i =>
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
index f4a6913d1eab..cdba6c825332 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
@@ -30,96 +30,115 @@ import org.apache.spark.sql.types.DataType
 
 object InvalidInputErrors {
 
-  def noHandlerFoundForExtension(extensionTypeUrl: String): InvalidPlanInput = 
{
-    InvalidPlanInput(s"No handler found for extension type: $extensionTypeUrl")
-  }
+  def noHandlerFoundForExtension(extensionTypeUrl: String): InvalidPlanInput =
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.NO_HANDLER_FOR_EXTENSION",
+      Map("extensionTypeUrl" -> extensionTypeUrl))
 
   def invalidSQLWithReferences(query: proto.WithRelations): InvalidPlanInput =
-    InvalidPlanInput(s"$query is not a valid relation for SQL with references")
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.INVALID_SQL_WITH_REFERENCES",
+      Map("query" -> query.toString))
 
   def naFillValuesEmpty(): InvalidPlanInput =
-    InvalidPlanInput("values must contains at least 1 item!")
+    InvalidPlanInput("CONNECT_INVALID_PLAN.NA_FILL_VALUES_EMPTY", Map.empty)
 
   def naFillValuesLengthMismatch(): InvalidPlanInput =
-    InvalidPlanInput(
-      "When values contains more than 1 items, values and cols should have the 
same length!")
+    InvalidPlanInput("CONNECT_INVALID_PLAN.NA_FILL_VALUES_LENGTH_MISMATCH", 
Map.empty)
 
   def deduplicateNeedsInput(): InvalidPlanInput =
-    InvalidPlanInput("Deduplicate needs a plan input")
+    InvalidPlanInput("CONNECT_INVALID_PLAN.DEDUPLICATE_NEEDS_INPUT", Map.empty)
 
   def deduplicateAllColumnsAndSubset(): InvalidPlanInput =
-    InvalidPlanInput("Cannot deduplicate on both all columns and a subset of 
columns")
+    
InvalidPlanInput("CONNECT_INVALID_PLAN.DEDUPLICATE_ALL_COLUMNS_AND_SUBSET", 
Map.empty)
 
   def deduplicateRequiresColumnsOrAll(): InvalidPlanInput =
-    InvalidPlanInput(
-      "Deduplicate requires to either deduplicate on all columns or a subset 
of columns")
+    
InvalidPlanInput("CONNECT_INVALID_PLAN.DEDUPLICATE_REQUIRES_COLUMNS_OR_ALL", 
Map.empty)
 
   def invalidDeduplicateColumn(colName: String, fieldNames: String): 
InvalidPlanInput =
     InvalidPlanInput(
-      "UNRESOLVED_COLUMN_AMONG_FIELD_NAMES",
+      "CONNECT_INVALID_PLAN.UNRESOLVED_COLUMN_AMONG_FIELD_NAMES",
       Map("colName" -> colName, "fieldNames" -> fieldNames))
 
   def functionEvalTypeNotSupported(evalType: Int): InvalidPlanInput =
-    InvalidPlanInput(s"Function with EvalType: $evalType is not supported")
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.FUNCTION_EVAL_TYPE_NOT_SUPPORTED",
+      Map("evalType" -> evalType.toString))
 
   def groupingExpressionAbsentForKeyValueGroupedDataset(): InvalidPlanInput =
-    InvalidPlanInput("The grouping expression cannot be absent for 
KeyValueGroupedDataset")
+    InvalidPlanInput("CONNECT_INVALID_PLAN.GROUPING_EXPRESSION_ABSENT", 
Map.empty)
 
   def expectingScalaUdfButGot(exprType: proto.Expression.ExprTypeCase): 
InvalidPlanInput =
-    InvalidPlanInput(s"Expecting a Scala UDF, but get $exprType")
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.EXPECTING_SCALA_UDF",
+      Map("exprType" -> exprType.toString))
 
   def rowNotSupportedForUdf(errorType: String): InvalidPlanInput =
-    InvalidPlanInput(s"Row is not a supported $errorType type for this UDF.")
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.ROW_NOT_SUPPORTED_FOR_UDF",
+      Map("errorType" -> errorType))
 
   def notFoundCachedLocalRelation(hash: String, sessionUUID: String): 
InvalidPlanInput =
     InvalidPlanInput(
-      s"Not found any cached local relation with the hash: " +
-        s"$hash in the session with sessionUUID $sessionUUID.")
+      "CONNECT_INVALID_PLAN.NOT_FOUND_CACHED_LOCAL_RELATION",
+      Map("hash" -> hash, "sessionUUID" -> sessionUUID))
 
   def notFoundChunkedCachedLocalRelationBlock(
       hash: String,
       sessionUUID: String): InvalidPlanInput =
     InvalidPlanInput(
-      s"Not found chunked cached local relation block with the hash: " +
-        s"$hash in the session with sessionUUID $sessionUUID.")
+      "CONNECT_INVALID_PLAN.NOT_FOUND_CHUNKED_CACHED_LOCAL_RELATION",
+      Map("hash" -> hash, "sessionUUID" -> sessionUUID))
 
   def localRelationSizeLimitExceeded(actualSize: Long, limit: Long): 
InvalidPlanInput =
     InvalidPlanInput(
-      s"Cached local relation size ($actualSize bytes) exceeds the limit 
($limit bytes).")
+      "CONNECT_INVALID_PLAN.LOCAL_RELATION_SIZE_LIMIT_EXCEEDED",
+      Map("actualSize" -> actualSize.toString, "limit" -> limit.toString))
 
   def localRelationChunkSizeLimitExceeded(limit: Long): InvalidPlanInput =
-    InvalidPlanInput(s"One of cached local relation chunks exceeded the limit 
of $limit bytes.")
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.LOCAL_RELATION_CHUNK_SIZE_LIMIT_EXCEEDED",
+      Map("limit" -> limit.toString))
 
   def withColumnsRequireSingleNamePart(got: String): InvalidPlanInput =
-    InvalidPlanInput(s"WithColumns require column name only contains one name 
part, but got $got")
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.WITH_COLUMNS_REQUIRE_SINGLE_NAME_PART",
+      Map("got" -> got))
 
   def inputDataForLocalRelationNoSchema(): InvalidPlanInput =
-    InvalidPlanInput("Input data for LocalRelation does not produce a schema.")
+    InvalidPlanInput("CONNECT_INVALID_PLAN.INPUT_DATA_NO_SCHEMA", Map.empty)
 
   def chunkedCachedLocalRelationWithoutData(): InvalidPlanInput =
-    InvalidPlanInput("ChunkedCachedLocalRelation should contain data.")
+    
InvalidPlanInput("CONNECT_INVALID_PLAN.CHUNKED_CACHED_LOCAL_RELATION_WITHOUT_DATA",
 Map.empty)
 
   def schemaRequiredForLocalRelation(): InvalidPlanInput =
-    InvalidPlanInput("Schema for LocalRelation is required when the input data 
is not provided.")
+    
InvalidPlanInput("CONNECT_INVALID_PLAN.SCHEMA_REQUIRED_FOR_LOCAL_RELATION", 
Map.empty)
 
   def invalidSchemaStringNonStructType(schema: String, dataType: DataType): 
InvalidPlanInput =
     InvalidPlanInput(
-      "INVALID_SCHEMA.NON_STRUCT_TYPE",
+      "CONNECT_INVALID_PLAN.INVALID_SCHEMA_NON_STRUCT_TYPE",
       Map("inputSchema" -> quoteByDefault(schema), "dataType" -> 
toSQLType(dataType)))
 
   def invalidJdbcParams(): InvalidPlanInput =
-    InvalidPlanInput("Invalid jdbc params, please specify jdbc url and table.")
+    InvalidPlanInput("CONNECT_INVALID_PLAN.INVALID_JDBC_PARAMS", Map.empty)
 
   def predicatesNotSupportedForDataSource(format: String): InvalidPlanInput =
-    InvalidPlanInput(s"Predicates are not supported for $format data sources.")
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.PREDICATES_NOT_SUPPORTED_FOR_DATA_SOURCE",
+      Map("format" -> format))
 
   def multiplePathsNotSupportedForStreamingSource(): InvalidPlanInput =
-    InvalidPlanInput("Multiple paths are not supported for streaming source")
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.MULTIPLE_PATHS_NOT_SUPPORTED_FOR_STREAMING_SOURCE",
+      Map.empty)
 
   def invalidEnum(protoEnum: Enum[_] with ProtocolMessageEnum): 
InvalidPlanInput =
     InvalidPlanInput(
-      s"This enum value of ${protoEnum.getDescriptorForType.getFullName}" +
-        s" is invalid: ${protoEnum.name()}(${protoEnum.getNumber})")
+      "CONNECT_INVALID_PLAN.INVALID_ENUM",
+      Map(
+        "fullName" -> protoEnum.getDescriptorForType.getFullName,
+        "name" -> protoEnum.name(),
+        "number" -> protoEnum.getNumber.toString))
 
   def invalidOneOfField(
       enumCase: Enum[_] with EnumLite,
@@ -127,90 +146,119 @@ object InvalidInputErrors {
     // If the oneOf field is not set, the enum number will be 0.
     if (enumCase.getNumber == 0) {
       InvalidPlanInput(
-        s"This oneOf field in ${descriptor.getFullName} is not set: 
${enumCase.name()}")
+        "CONNECT_INVALID_PLAN.INVALID_ONE_OF_FIELD_NOT_SET",
+        Map("fullName" -> descriptor.getFullName, "name" -> enumCase.name()))
     } else {
       InvalidPlanInput(
-        s"This oneOf field message in ${descriptor.getFullName} is not 
supported: " +
-          s"${enumCase.name()}(${enumCase.getNumber})")
+        "CONNECT_INVALID_PLAN.INVALID_ONE_OF_FIELD_NOT_SUPPORTED",
+        Map(
+          "fullName" -> descriptor.getFullName,
+          "name" -> enumCase.name(),
+          "number" -> enumCase.getNumber.toString))
     }
   }
 
   def cannotBeEmpty(fieldName: String, descriptor: Descriptor): 
InvalidPlanInput =
-    InvalidPlanInput(s"$fieldName in ${descriptor.getFullName} cannot be 
empty")
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.FIELD_CANNOT_BE_EMPTY",
+      Map("fieldName" -> fieldName, "fullName" -> descriptor.getFullName))
 
   def invalidSchemaTypeNonStruct(dataType: DataType): InvalidPlanInput =
     InvalidPlanInput("INVALID_SCHEMA_TYPE_NON_STRUCT", Map("dataType" -> 
toSQLType(dataType)))
 
   def lambdaFunctionArgumentCountInvalid(got: Int): InvalidPlanInput =
-    InvalidPlanInput(s"LambdaFunction requires 1 ~ 3 arguments, but got $got 
ones!")
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.LAMBDA_FUNCTION_ARGUMENT_COUNT_INVALID",
+      Map("got" -> got.toString))
 
   def aliasWithMultipleIdentifiersAndMetadata(): InvalidPlanInput =
     InvalidPlanInput(
-      "Alias expressions with more than 1 identifier must not use optional 
metadata.")
+      "CONNECT_INVALID_PLAN.ALIAS_WITH_MULTIPLE_IDENTIFIERS_AND_METADATA",
+      Map.empty)
 
   def unresolvedStarTargetInvalid(target: String): InvalidPlanInput =
     InvalidPlanInput(
-      s"UnresolvedStar requires a unparsed target ending with '.*', but got 
$target.")
+      "CONNECT_INVALID_PLAN.UNRESOLVED_STAR_TARGET_INVALID",
+      Map("target" -> target))
 
   def unresolvedStarWithBothTargetAndPlanId(): InvalidPlanInput =
-    InvalidPlanInput("UnresolvedStar with both target and plan id is not 
supported.")
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.UNRESOLVED_STAR_WITH_BOTH_TARGET_AND_PLAN_ID",
+      Map.empty)
 
   def windowFunctionRequired(): InvalidPlanInput =
-    InvalidPlanInput("WindowFunction is required in WindowExpression")
+    InvalidPlanInput("CONNECT_INVALID_PLAN.WINDOW_FUNCTION_REQUIRED", 
Map.empty)
 
   def lowerBoundRequiredInWindowFrame(): InvalidPlanInput =
-    InvalidPlanInput("LowerBound is required in WindowFrame")
+    
InvalidPlanInput("CONNECT_INVALID_PLAN.LOWER_BOUND_REQUIRED_IN_WINDOW_FRAME", 
Map.empty)
 
   def upperBoundRequiredInWindowFrame(): InvalidPlanInput =
-    InvalidPlanInput("UpperBound is required in WindowFrame")
+    
InvalidPlanInput("CONNECT_INVALID_PLAN.UPPER_BOUND_REQUIRED_IN_WINDOW_FRAME", 
Map.empty)
 
   def setOperationMustHaveTwoInputs(): InvalidPlanInput =
-    InvalidPlanInput("Set operation must have 2 inputs")
+    
InvalidPlanInput("CONNECT_INVALID_PLAN.SET_OPERATION_MUST_HAVE_TWO_INPUTS", 
Map.empty)
 
   def exceptDoesNotSupportUnionByName(): InvalidPlanInput =
-    InvalidPlanInput("Except does not support union_by_name")
+    
InvalidPlanInput("CONNECT_INVALID_PLAN.EXCEPT_DOES_NOT_SUPPORT_UNION_BY_NAME", 
Map.empty)
 
   def intersectDoesNotSupportUnionByName(): InvalidPlanInput =
-    InvalidPlanInput("Intersect does not support union_by_name")
+    
InvalidPlanInput("CONNECT_INVALID_PLAN.INTERSECT_DOES_NOT_SUPPORT_UNION_BY_NAME",
 Map.empty)
 
   def aggregateNeedsPlanInput(): InvalidPlanInput =
-    InvalidPlanInput("Aggregate needs a plan input")
+    InvalidPlanInput("CONNECT_INVALID_PLAN.AGGREGATE_NEEDS_PLAN_INPUT", 
Map.empty)
 
   def aggregateWithPivotRequiresPivot(): InvalidPlanInput =
-    InvalidPlanInput("Aggregate with GROUP_TYPE_PIVOT requires a Pivot")
+    
InvalidPlanInput("CONNECT_INVALID_PLAN.AGGREGATE_WITH_PIVOT_REQUIRES_PIVOT", 
Map.empty)
 
   def invalidWithRelationReference(): InvalidPlanInput =
-    InvalidPlanInput("Invalid WithRelation reference")
+    InvalidPlanInput("CONNECT_INVALID_PLAN.INVALID_WITH_RELATION_REFERENCE", 
Map.empty)
 
   def assertionFailure(message: String): InvalidPlanInput =
-    InvalidPlanInput(message)
+    InvalidPlanInput("CONNECT_INVALID_PLAN.ASSERTION_FAILURE", Map("message" 
-> message))
 
   def unresolvedNamedLambdaVariableRequiresNamePart(): InvalidPlanInput =
-    InvalidPlanInput("UnresolvedNamedLambdaVariable requires at least one name 
part!")
+    InvalidPlanInput(
+      
"CONNECT_INVALID_PLAN.UNRESOLVED_NAMED_LAMBDA_VARIABLE_REQUIRES_NAME_PART",
+      Map.empty)
 
   def usingColumnsOrJoinConditionSetInJoin(): InvalidPlanInput =
-    InvalidPlanInput("Using columns or join conditions cannot be set at the 
same time in Join")
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.USING_COLUMNS_OR_JOIN_CONDITION_SET_IN_JOIN",
+      Map.empty)
 
   def sqlCommandExpectsSqlOrWithRelations(other: proto.Relation.RelTypeCase): 
InvalidPlanInput =
-    InvalidPlanInput(s"SQL command expects either a SQL or a WithRelations, 
but got $other")
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.SQL_COMMAND_EXPECTS_SQL_OR_WITH_RELATIONS",
+      Map("other" -> other.toString))
 
   def reduceShouldCarryScalarScalaUdf(got: mutable.Buffer[proto.Expression]): 
InvalidPlanInput =
-    InvalidPlanInput(s"reduce should carry a scalar scala udf, but got $got")
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.REDUCE_SHOULD_CARRY_SCALAR_SCALA_UDF",
+      Map("got" -> got.toString))
 
   def unionByNameAllowMissingColRequiresByName(): InvalidPlanInput =
-    InvalidPlanInput("UnionByName `allowMissingCol` can be true only if 
`byName` is true.")
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.UNION_BY_NAME_ALLOW_MISSING_COL_REQUIRES_BY_NAME",
+      Map.empty)
 
   def unsupportedUserDefinedFunctionImplementation(clazz: Class[_]): 
InvalidPlanInput =
-    InvalidPlanInput(s"Unsupported UserDefinedFunction implementation: 
${clazz}")
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.UNSUPPORTED_USER_DEFINED_FUNCTION_IMPLEMENTATION",
+      Map("clazz" -> clazz.toString))
 
   def streamingQueryRunIdMismatch(
       id: String,
       runId: String,
       serverRunId: String): InvalidPlanInput =
     InvalidPlanInput(
-      s"Run id mismatch for query id $id. Run id in the request $runId " +
-        s"does not match one on the server $serverRunId. The query might have 
restarted.")
+      "CONNECT_INVALID_PLAN.STREAMING_QUERY_RUN_ID_MISMATCH",
+      Map("id" -> id, "runId" -> runId, "serverRunId" -> serverRunId))
 
   def streamingQueryNotFound(id: String): InvalidPlanInput =
-    InvalidPlanInput(s"Streaming query $id is not found")
+    InvalidPlanInput("CONNECT_INVALID_PLAN.STREAMING_QUERY_NOT_FOUND", 
Map("id" -> id))
+
+  def cannotFindCachedLocalRelation(hash: String): InvalidPlanInput =
+    InvalidPlanInput(
+      "CONNECT_INVALID_PLAN.CANNOT_FIND_CACHED_LOCAL_RELATION",
+      Map("hash" -> hash))
 }
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index ee8180d5e6f8..5da4b022942b 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -59,7 +59,7 @@ import 
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
 import org.apache.spark.sql.classic.{Catalog, DataFrameWriter, Dataset, 
MergeIntoWriter, RelationalGroupedDataset, SparkSession, TypedAggUtils, 
UserDefinedFunctionUtils}
 import org.apache.spark.sql.classic.ClassicConversions._
 import org.apache.spark.sql.connect.client.arrow.ArrowSerializer
-import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
ForeachWriterPacket, InvalidPlanInput, LiteralValueProtoConverter, 
StorageLevelProtoConverter, StreamingListenerPacket, UdfPacket}
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
ForeachWriterPacket, LiteralValueProtoConverter, StorageLevelProtoConverter, 
StreamingListenerPacket, UdfPacket}
 import 
org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
 import org.apache.spark.sql.connect.ml.MLHandler
 import org.apache.spark.sql.connect.pipelines.PipelinesHandler
@@ -1332,7 +1332,7 @@ class SparkConnectPlanner(
   private def transformCachedLocalRelation(rel: proto.CachedLocalRelation): 
LogicalPlan = {
     val blockManager = session.sparkContext.env.blockManager
     val blockId = 
session.artifactManager.getCachedBlockId(rel.getHash).getOrElse {
-      throw InvalidPlanInput(s"Cannot find a cached local relation for hash: 
${rel.getHash}")
+      throw InvalidInputErrors.cannotFindCachedLocalRelation(rel.getHash)
     }
     val bytes = blockManager.getLocalBytes(blockId)
     bytes
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
index e5f9b7fe85f1..307416a659f7 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
@@ -465,7 +465,9 @@ case class SessionHolder(userId: String, sessionId: String, 
session: SparkSessio
   private[connect] def getDataFrameOrThrow(dfId: String): DataFrame = {
     Option(dataFrameCache.get(dfId))
       .getOrElse {
-        throw InvalidPlanInput(s"No DataFrame with id $dfId is found in the 
session $sessionId")
+        throw InvalidPlanInput(
+          "CONNECT_INVALID_PLAN.DATAFRAME_NOT_FOUND",
+          Map("dfId" -> dfId, "sessionId" -> sessionId))
       }
   }
 
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
index 72029cafaa63..49450f938719 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
@@ -244,7 +244,10 @@ private[connect] class SparkConnectAnalyzeHandler(
       // RequestDecompressionInterceptor.decompressAnalyzePlanRequest() to 
handle
       // this case. The interceptor has a default case that throws 
UnsupportedOperationException
       // for unhandled cases, which will fail tests and block CI if you forget 
to update it.
-      case other => throw InvalidPlanInput(s"Unknown Analyze Method $other!")
+      case other =>
+        throw InvalidPlanInput(
+          "CONNECT_INVALID_PLAN.UNKNOWN_ANALYZE_METHOD",
+          Map("other" -> other.toString))
     }
 
     builder
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/InvalidInputErrorsSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/InvalidInputErrorsSuite.scala
index 07c377a77df5..768a532a7a82 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/InvalidInputErrorsSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/InvalidInputErrorsSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.connect.proto
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
InvalidPlanInput}
-import org.apache.spark.sql.connect.planner.SparkConnectPlanTest
+import org.apache.spark.sql.connect.planner.{InvalidInputErrors, 
SparkConnectPlanTest}
 import org.apache.spark.sql.types._
 
 class InvalidInputErrorsSuite extends PlanTest with SparkConnectPlanTest {
@@ -75,7 +75,7 @@ class InvalidInputErrorsSuite extends PlanTest with 
SparkConnectPlanTest {
       }),
     TestCase(
       name = "Invalid schema string non struct type",
-      expectedErrorCondition = "INVALID_SCHEMA.NON_STRUCT_TYPE",
+      expectedErrorCondition = 
"CONNECT_INVALID_PLAN.INVALID_SCHEMA_NON_STRUCT_TYPE",
       expectedParameters = Map(
         "inputSchema" -> 
""""{"type":"array","elementType":"integer","containsNull":false}"""",
         "dataType" -> "\"ARRAY<INT>\""),
@@ -97,8 +97,8 @@ class InvalidInputErrorsSuite extends PlanTest with 
SparkConnectPlanTest {
       }),
     TestCase(
       name = "Deduplicate needs input",
-      expectedErrorCondition = "INTERNAL_ERROR",
-      expectedParameters = Map("message" -> "Deduplicate needs a plan input"),
+      expectedErrorCondition = "CONNECT_INVALID_PLAN.DEDUPLICATE_NEEDS_INPUT",
+      expectedParameters = Map.empty,
       invalidInput = {
         val deduplicate = proto.Deduplicate
           .newBuilder()
@@ -109,9 +109,9 @@ class InvalidInputErrorsSuite extends PlanTest with 
SparkConnectPlanTest {
       }),
     TestCase(
       name = "Catalog not set",
-      expectedErrorCondition = "INTERNAL_ERROR",
+      expectedErrorCondition = 
"CONNECT_INVALID_PLAN.INVALID_ONE_OF_FIELD_NOT_SET",
       expectedParameters =
-        Map("message" -> "This oneOf field in spark.connect.Catalog is not 
set: CATTYPE_NOT_SET"),
+        Map("fullName" -> "spark.connect.Catalog", "name" -> 
"CATTYPE_NOT_SET"),
       invalidInput = {
         val catalog = proto.Catalog
           .newBuilder()
@@ -126,15 +126,58 @@ class InvalidInputErrorsSuite extends PlanTest with 
SparkConnectPlanTest {
   // Run all test cases
   testCases.foreach { testCase =>
     test(s"${testCase.name}") {
+      val exception = intercept[InvalidPlanInput] {
+        transform(testCase.invalidInput)
+      }
       checkError(
-        exception = intercept[InvalidPlanInput] {
-          transform(testCase.invalidInput)
-        },
+        exception = exception,
         condition = testCase.expectedErrorCondition,
         parameters = testCase.expectedParameters)
+      if (testCase.expectedErrorCondition.startsWith("CONNECT_INVALID_PLAN")) {
+        assert(exception.getSqlState == "56K00")
+      }
     }
   }
 
+  test("noHandlerFoundForExtension") {
+    val ex = InvalidInputErrors.noHandlerFoundForExtension("foo.bar.Ext")
+    assert(ex.getCondition.contains("NO_HANDLER_FOR_EXTENSION"))
+    assert(ex.getMessage.contains("foo.bar.Ext"))
+    assert(ex.getSqlState == "56K00")
+  }
+
+  test("notFoundCachedLocalRelation") {
+    val ex = InvalidInputErrors.notFoundCachedLocalRelation("abc123", 
"sess-uuid")
+    assert(ex.getCondition.contains("NOT_FOUND_CACHED_LOCAL_RELATION"))
+    assert(ex.getMessage.contains("abc123"))
+    assert(ex.getMessage.contains("sess-uuid"))
+    assert(ex.getSqlState == "56K00")
+  }
+
+  test("localRelationSizeLimitExceeded") {
+    val ex = InvalidInputErrors.localRelationSizeLimitExceeded(1000L, 500L)
+    assert(ex.getCondition.contains("LOCAL_RELATION_SIZE_LIMIT_EXCEEDED"))
+    assert(ex.getMessage.contains("1000"))
+    assert(ex.getMessage.contains("500"))
+    assert(ex.getSqlState == "56K00")
+  }
+
+  test("functionEvalTypeNotSupported") {
+    val ex = InvalidInputErrors.functionEvalTypeNotSupported(42)
+    assert(ex.getCondition.contains("FUNCTION_EVAL_TYPE_NOT_SUPPORTED"))
+    assert(ex.getMessage.contains("42"))
+    assert(ex.getSqlState == "56K00")
+  }
+
+  test("streamingQueryRunIdMismatch") {
+    val ex = InvalidInputErrors.streamingQueryRunIdMismatch("q1", "run1", 
"run2")
+    assert(ex.getCondition.contains("STREAMING_QUERY_RUN_ID_MISMATCH"))
+    assert(ex.getMessage.contains("q1"))
+    assert(ex.getMessage.contains("run1"))
+    assert(ex.getMessage.contains("run2"))
+    assert(ex.getSqlState == "56K00")
+  }
+
   // Helper case class to define test cases
   case class TestCase(
       name: String,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to