This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 595f14881f9 [SPARK-40359][SQL] Migrate type check fails in CSV/JSON 
expressions to error classes
595f14881f9 is described below

commit 595f14881f92a09b03888dd69f69f9e7edf2a060
Author: Max Gekk <[email protected]>
AuthorDate: Thu Sep 22 12:57:23 2022 +0300

    [SPARK-40359][SQL] Migrate type check fails in CSV/JSON expressions to 
error classes
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to use error classes in the case of type check failure 
in JSON expressions.
    
    ### Why are the changes needed?
    Migration onto error classes unifies Spark SQL error messages, and improves 
search-ability of errors.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    
    ### How was this patch tested?
    By running the affected test suites:
    ```
    $ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite"
    $ build/sbt "testOnly *.CsvFunctionsSuite"
    ```
    
    Closes #37902 from MaxGekk/type-check-fail-json-csv.
    
    Authored-by: Max Gekk <[email protected]>
    Signed-off-by: Max Gekk <[email protected]>
---
 core/src/main/resources/error/error-classes.json   | 30 ++++++++
 .../spark/sql/catalyst/expressions/ExprUtils.scala | 26 ++++---
 .../sql/catalyst/expressions/csvExpressions.scala  | 11 +--
 .../sql/catalyst/expressions/jsonExpressions.scala | 82 ++++++++++------------
 .../spark/sql/catalyst/json/JacksonUtils.scala     | 28 ++++----
 .../spark/sql/errors/QueryExecutionErrors.scala    |  5 --
 .../expressions/JsonExpressionsSuite.scala         |  6 +-
 .../org/apache/spark/sql/DataFrameReader.scala     |  5 +-
 .../sql/execution/datasources/json/JsonUtils.scala | 15 ++++
 .../spark/sql/streaming/DataStreamReader.scala     |  4 +-
 .../sql-tests/results/csv-functions.sql.out        | 32 ++++++++-
 .../sql-tests/results/json-functions.sql.out       | 32 ++++++++-
 .../org/apache/spark/sql/JsonFunctionsSuite.scala  | 14 ++--
 .../sql/errors/QueryCompilationErrorsSuite.scala   | 12 +++-
 14 files changed, 205 insertions(+), 97 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 823780264cb..affbafdfc03 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -96,6 +96,11 @@
           "the binary operator requires the input type <inputType>, not 
<actualDataType>."
         ]
       },
+      "CANNOT_CONVERT_TO_JSON" : {
+        "message" : [
+          "Unable to convert column <name> of type <type> to JSON."
+        ]
+      },
       "CAST_WITHOUT_SUGGESTION" : {
         "message" : [
           "cannot cast <srcType> to <targetType>."
@@ -113,10 +118,35 @@
           "To convert values from <srcType> to <targetType>, you can use the 
functions <functionNames> instead."
         ]
       },
+      "INVALID_JSON_MAP_KEY_TYPE" : {
+        "message" : [
+          "Input schema <schema> can only contain STRING as a key type for a 
MAP."
+        ]
+      },
+      "INVALID_JSON_SCHEMA" : {
+        "message" : [
+          "Input schema <schema> must be a struct, an array or a map."
+        ]
+      },
+      "NON_FOLDABLE_INPUT" : {
+        "message" : [
+          "the input should be a foldable string expression and not null; 
however, got <inputExpr>."
+        ]
+      },
+      "NON_STRING_TYPE" : {
+        "message" : [
+          "all arguments must be strings."
+        ]
+      },
       "UNEXPECTED_INPUT_TYPE" : {
         "message" : [
           "parameter <paramIndex> requires <requiredType> type, however, 
<inputSql> is of <inputType> type."
         ]
+      },
+      "WRONG_NUM_PARAMS" : {
+        "message" : [
+          "wrong number of parameters: <actualNum>."
+        ]
       }
     }
   },
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
index 204b753d6e4..3e10b820aa6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
@@ -20,12 +20,14 @@ package org.apache.spark.sql.catalyst.expressions
 import java.text.{DecimalFormat, DecimalFormatSymbols, ParsePosition}
 import java.util.Locale
 
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, 
TypeCheckSuccess}
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CharVarcharUtils}
-import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase, 
QueryExecutionErrors}
 import org.apache.spark.sql.types.{DataType, MapType, StringType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
-object ExprUtils {
+object ExprUtils extends QueryErrorsBase {
 
   def evalTypeExpr(exp: Expression): DataType = {
     if (exp.foldable) {
@@ -97,18 +99,22 @@ object ExprUtils {
 
   /**
    * Check if the schema is valid for Json
-   * @param schema
+   * @param schema The schema to check.
    * @return
-   *  None if the schema is valid
-   *  Some(msg) with the error message if the schema is not valid
+   *  `TypeCheckSuccess` if the schema is valid
+   *  `DataTypeMismatch` with an error error if the schema is not valid
    */
-  def checkJsonSchema(schema: DataType): Option[Throwable] =
-    if (schema.existsRecursively {
+  def checkJsonSchema(schema: DataType): TypeCheckResult = {
+    val isInvalid = schema.existsRecursively {
       case MapType(keyType, _, _) if keyType != StringType => true
       case _ => false
-    }) {
-      Some(QueryCompilationErrors.invalidJsonSchema(schema))
+    }
+    if (isInvalid) {
+      DataTypeMismatch(
+        errorSubClass = "INVALID_JSON_MAP_KEY_TYPE",
+        messageParameters = Map("schema" -> toSQLType(schema)))
     } else {
-      None
+      TypeCheckSuccess
     }
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
index 80cd0f7034d..8965e30e4e9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
@@ -23,10 +23,11 @@ import com.univocity.parsers.csv.CsvParser
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
 import org.apache.spark.sql.catalyst.csv._
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -166,7 +167,7 @@ case class CsvToStructs(
 case class SchemaOfCsv(
     child: Expression,
     options: Map[String, String])
-  extends UnaryExpression with CodegenFallback {
+  extends UnaryExpression with CodegenFallback with QueryErrorsBase {
 
   def this(child: Expression) = this(child, Map.empty[String, String])
 
@@ -185,9 +186,9 @@ case class SchemaOfCsv(
     if (child.foldable && csv != null) {
       super.checkInputDataTypes()
     } else {
-      TypeCheckResult.TypeCheckFailure(
-        "The input csv should be a foldable string expression and not null; " +
-        s"however, got ${child.sql}.")
+      DataTypeMismatch(
+        errorSubClass = "NON_FOLDABLE_INPUT",
+        messageParameters = Map("inputExpr" -> toSQLExpr(child)))
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 4e6191de000..12da5933ece 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -27,11 +27,12 @@ import com.fasterxml.jackson.core.json.JsonReadFeature
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.json._
 import org.apache.spark.sql.catalyst.trees.TreePattern.{JSON_TO_STRUCT, 
TreePattern}
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -392,11 +393,15 @@ case class JsonTuple(children: Seq[Expression])
 
   override def checkInputDataTypes(): TypeCheckResult = {
     if (children.length < 2) {
-      TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least two 
arguments")
+      DataTypeMismatch(
+        errorSubClass = "WRONG_NUM_PARAMS",
+        messageParameters = Map("actualNum" -> children.length.toString))
     } else if (children.forall(child => 
StringType.acceptsType(child.dataType))) {
       TypeCheckResult.TypeCheckSuccess
     } else {
-      TypeCheckResult.TypeCheckFailure(s"$prettyName requires that all 
arguments are strings")
+      DataTypeMismatch(
+        errorSubClass = "NON_STRING_TYPE",
+        messageParameters = Map("funcName" -> prettyName))
     }
   }
 
@@ -535,8 +540,12 @@ case class JsonToStructs(
     options: Map[String, String],
     child: Expression,
     timeZoneId: Option[String] = None)
-  extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback 
with ExpectsInputTypes
-    with NullIntolerant {
+  extends UnaryExpression
+  with TimeZoneAwareExpression
+  with CodegenFallback
+  with ExpectsInputTypes
+  with NullIntolerant
+  with QueryErrorsBase {
 
   // The JSON input data might be missing certain fields. We force the 
nullability
   // of the user-provided schema to avoid data corruptions. In particular, the 
parquet-mr encoder
@@ -566,13 +575,12 @@ case class JsonToStructs(
 
   override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
     case _: StructType | _: ArrayType | _: MapType =>
-      ExprUtils.checkJsonSchema(nullableSchema).map { e =>
-        TypeCheckResult.TypeCheckFailure(e.getMessage)
-      } getOrElse {
-        super.checkInputDataTypes()
-      }
-    case _ => TypeCheckResult.TypeCheckFailure(
-      s"Input schema ${nullableSchema.catalogString} must be a struct, an 
array or a map.")
+      val checkResult = ExprUtils.checkJsonSchema(nullableSchema)
+      if (checkResult.isFailure) checkResult else super.checkInputDataTypes()
+    case _ =>
+      DataTypeMismatch(
+        errorSubClass = "INVALID_JSON_SCHEMA",
+        messageParameters = Map("schema" -> toSQLType(nullableSchema)))
   }
 
   // This converts parsed rows to the desired output by the given schema.
@@ -663,8 +671,13 @@ case class StructsToJson(
     options: Map[String, String],
     child: Expression,
     timeZoneId: Option[String] = None)
-  extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback
-    with ExpectsInputTypes with NullIntolerant {
+  extends UnaryExpression
+  with TimeZoneAwareExpression
+  with CodegenFallback
+  with ExpectsInputTypes
+  with NullIntolerant
+  with QueryErrorsBase {
+
   override def nullable: Boolean = true
 
   def this(options: Map[String, String], child: Expression) = this(options, 
child, None)
@@ -716,33 +729,12 @@ case class StructsToJson(
   override def dataType: DataType = StringType
 
   override def checkInputDataTypes(): TypeCheckResult = inputSchema match {
-    case struct: StructType =>
-      try {
-        JacksonUtils.verifySchema(struct)
-        TypeCheckResult.TypeCheckSuccess
-      } catch {
-        case e: UnsupportedOperationException =>
-          TypeCheckResult.TypeCheckFailure(e.getMessage)
-      }
-    case map: MapType =>
-      try {
-        JacksonUtils.verifyType(prettyName, map)
-        TypeCheckResult.TypeCheckSuccess
-      } catch {
-        case e: UnsupportedOperationException =>
-          TypeCheckResult.TypeCheckFailure(e.getMessage)
-      }
-    case array: ArrayType =>
-      try {
-        JacksonUtils.verifyType(prettyName, array)
-        TypeCheckResult.TypeCheckSuccess
-      } catch {
-        case e: UnsupportedOperationException =>
-          TypeCheckResult.TypeCheckFailure(e.getMessage)
-      }
-    case _ => TypeCheckResult.TypeCheckFailure(
-      s"Input type ${child.dataType.catalogString} must be a struct, array of 
structs or " +
-          "a map or array of map.")
+    case dt @ (_: StructType | _: MapType | _: ArrayType) =>
+      JacksonUtils.verifyType(prettyName, dt)
+    case _ =>
+      DataTypeMismatch(
+        errorSubClass = "INVALID_JSON_SCHEMA",
+        messageParameters = Map("schema" -> toSQLType(child.dataType)))
   }
 
   override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
@@ -775,7 +767,7 @@ case class StructsToJson(
 case class SchemaOfJson(
     child: Expression,
     options: Map[String, String])
-  extends UnaryExpression with CodegenFallback {
+  extends UnaryExpression with CodegenFallback with QueryErrorsBase {
 
   def this(child: Expression) = this(child, Map.empty[String, String])
 
@@ -803,9 +795,9 @@ case class SchemaOfJson(
     if (child.foldable && json != null) {
       super.checkInputDataTypes()
     } else {
-      TypeCheckResult.TypeCheckFailure(
-        "The input json should be a foldable string expression and not null; " 
+
-        s"however, got ${child.sql}.")
+      DataTypeMismatch(
+        errorSubClass = "NON_FOLDABLE_INPUT",
+        messageParameters = Map("inputExpr" -> toSQLExpr(child)))
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
index 9286e29844b..b103eda6fc7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
@@ -19,10 +19,12 @@ package org.apache.spark.sql.catalyst.json
 
 import com.fasterxml.jackson.core.{JsonParser, JsonToken}
 
-import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, 
TypeCheckSuccess}
+import org.apache.spark.sql.errors.QueryErrorsBase
 import org.apache.spark.sql.types._
 
-object JacksonUtils {
+object JacksonUtils extends QueryErrorsBase {
   /**
    * Advance the parser until a null or a specific token is found
    */
@@ -33,11 +35,14 @@ object JacksonUtils {
     }
   }
 
-  def verifyType(name: String, dataType: DataType): Unit = {
+  def verifyType(name: String, dataType: DataType): TypeCheckResult = {
     dataType match {
-      case NullType | _: AtomicType | CalendarIntervalType =>
+      case NullType | _: AtomicType | CalendarIntervalType => TypeCheckSuccess
 
-      case st: StructType => st.foreach(field => verifyType(field.name, 
field.dataType))
+      case st: StructType =>
+        st.foldLeft(TypeCheckSuccess: TypeCheckResult) { case (currResult, 
field) =>
+          if (currResult.isFailure) currResult else verifyType(field.name, 
field.dataType)
+        }
 
       case at: ArrayType => verifyType(name, at.elementType)
 
@@ -48,14 +53,11 @@ object JacksonUtils {
       case udt: UserDefinedType[_] => verifyType(name, udt.sqlType)
 
       case _ =>
-        throw QueryExecutionErrors.cannotConvertColumnToJSONError(name, 
dataType)
+        DataTypeMismatch(
+          errorSubClass = "CANNOT_CONVERT_TO_JSON",
+          messageParameters = Map(
+            "name" -> toSQLId(name),
+            "type" -> toSQLType(dataType)))
     }
   }
-
-  /**
-   * Verify if the schema is supported in JSON parsing.
-   */
-  def verifySchema(schema: StructType): Unit = {
-    schema.foreach(field => verifyType(field.name, field.dataType))
-  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index bc778abc985..c1496611d15 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1311,11 +1311,6 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
         s"an ${ArrayType.simpleString}, a ${StructType.simpleString} or a 
${MapType.simpleString}")
   }
 
-  def cannotConvertColumnToJSONError(name: String, dataType: DataType): 
Throwable = {
-    new UnsupportedOperationException(
-      s"Unable to convert column $name of type ${dataType.catalogString} to 
JSON.")
-  }
-
   def malformedRecordsDetectedInSchemaInferenceError(e: Throwable): Throwable 
= {
     new SparkException("Malformed records are detected in schema inference. " +
       s"Parse Mode: ${FailFastMode.name}.", e)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index af071727b10..ab6da87287b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -25,7 +25,6 @@ import org.scalatest.exceptions.TestFailedException
 import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.plans.PlanTestBase
 import org.apache.spark.sql.catalyst.util._
@@ -864,10 +863,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
       (MapType(StringType, MapType(IntegerType, StringType)), """{"key": {"1" 
: "test"}}""")
     ).foreach{
       case(schema, jsonData) =>
-        assert(JsonToStructs(schema, Map.empty, 
Literal(jsonData)).checkInputDataTypes() match {
-          case TypeCheckFailure(_) => true
-          case _ => false
-        })
+        assert(JsonToStructs(schema, Map.empty, 
Literal(jsonData)).checkInputDataTypes().isFailure)
       }
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 344f40eef45..443b18f7281 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.csv._
 import org.apache.spark.sql.execution.datasources.jdbc._
+import 
org.apache.spark.sql.execution.datasources.json.JsonUtils.checkJsonSchema
 import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
 import org.apache.spark.sql.internal.SQLConf
@@ -357,7 +358,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    */
   @scala.annotation.varargs
   def json(paths: String*): DataFrame = {
-    userSpecifiedSchema.foreach(ExprUtils.checkJsonSchema(_).foreach(throw _))
+    userSpecifiedSchema.foreach(checkJsonSchema)
     format("json").load(paths : _*)
   }
 
@@ -406,7 +407,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
       sparkSession.sessionState.conf.sessionLocalTimeZone,
       sparkSession.sessionState.conf.columnNameOfCorruptRecord)
 
-    userSpecifiedSchema.foreach(ExprUtils.checkJsonSchema(_).foreach(throw _))
+    userSpecifiedSchema.foreach(checkJsonSchema)
     val schema = userSpecifiedSchema.map {
       case s if !SQLConf.get.getConf(
         SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) => 
s.asNullable
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala
index d511594c5de..a288b5ebf8b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala
@@ -17,10 +17,15 @@
 
 package org.apache.spark.sql.execution.datasources.json
 
+import org.apache.spark.SparkException
 import org.apache.spark.input.PortableDataStream
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Dataset
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.ExprUtils
 import org.apache.spark.sql.catalyst.json.JSONOptions
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.DataType
 
 object JsonUtils {
   /**
@@ -48,4 +53,14 @@ object JsonUtils {
       json.sample(withReplacement = false, options.samplingRatio, 1)
     }
   }
+
+  def checkJsonSchema(schema: DataType): Unit = {
+    ExprUtils.checkJsonSchema(schema) match {
+      case DataTypeMismatch("INVALID_JSON_MAP_KEY_TYPE", _) =>
+        throw QueryCompilationErrors.invalidJsonSchema(schema)
+      case TypeCheckSuccess =>
+      case result =>
+        throw SparkException.internalError(s"Unknown type check result: 
$result.")
+    }
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index ba825a28852..d4621468f84 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -25,7 +25,6 @@ import org.apache.spark.annotation.Evolving
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.ExprUtils
 import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CharVarcharUtils}
 import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider}
@@ -33,6 +32,7 @@ import 
org.apache.spark.sql.connector.catalog.TableCapability._
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
+import 
org.apache.spark.sql.execution.datasources.json.JsonUtils.checkJsonSchema
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, 
FileDataSourceV2}
 import org.apache.spark.sql.execution.streaming.StreamingRelation
 import org.apache.spark.sql.sources.StreamSourceProvider
@@ -232,7 +232,7 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * @since 2.0.0
    */
   def json(path: String): DataFrame = {
-    userSpecifiedSchema.foreach(ExprUtils.checkJsonSchema(_).foreach(throw _))
+    userSpecifiedSchema.foreach(checkJsonSchema)
     format("json").load(path)
   }
 
diff --git 
a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
index 301e5cc78df..291739f9c3b 100644
--- a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
@@ -95,7 +95,21 @@ select schema_of_csv(null)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'schema_of_csv(NULL)' due to data type mismatch: The input csv 
should be a foldable string expression and not null; however, got NULL.; line 1 
pos 7
+{
+  "errorClass" : "DATATYPE_MISMATCH",
+  "errorSubClass" : "NON_FOLDABLE_INPUT",
+  "messageParameters" : {
+    "inputExpr" : "\"NULL\"",
+    "sqlExpr" : "\"schema_of_csv(NULL)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 26,
+    "fragment" : "schema_of_csv(null)"
+  } ]
+}
 
 
 -- !query
@@ -112,7 +126,21 @@ SELECT schema_of_csv(csvField) FROM csvTable
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'schema_of_csv(csvtable.csvField)' due to data type mismatch: 
The input csv should be a foldable string expression and not null; however, got 
csvtable.csvField.; line 1 pos 7
+{
+  "errorClass" : "DATATYPE_MISMATCH",
+  "errorSubClass" : "NON_FOLDABLE_INPUT",
+  "messageParameters" : {
+    "inputExpr" : "\"csvField\"",
+    "sqlExpr" : "\"schema_of_csv(csvField)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 30,
+    "fragment" : "schema_of_csv(csvField)"
+  } ]
+}
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
index b9620a4f9e6..ec6d3871178 100644
--- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
@@ -403,7 +403,21 @@ select schema_of_json(null)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'schema_of_json(NULL)' due to data type mismatch: The input 
json should be a foldable string expression and not null; however, got NULL.; 
line 1 pos 7
+{
+  "errorClass" : "DATATYPE_MISMATCH",
+  "errorSubClass" : "NON_FOLDABLE_INPUT",
+  "messageParameters" : {
+    "inputExpr" : "\"NULL\"",
+    "sqlExpr" : "\"schema_of_json(NULL)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 27,
+    "fragment" : "schema_of_json(null)"
+  } ]
+}
 
 
 -- !query
@@ -420,7 +434,21 @@ SELECT schema_of_json(jsonField) FROM jsonTable
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'schema_of_json(jsontable.jsonField)' due to data type 
mismatch: The input json should be a foldable string expression and not null; 
however, got jsontable.jsonField.; line 1 pos 7
+{
+  "errorClass" : "DATATYPE_MISMATCH",
+  "errorSubClass" : "NON_FOLDABLE_INPUT",
+  "messageParameters" : {
+    "inputExpr" : "\"jsonField\"",
+    "sqlExpr" : "\"schema_of_json(jsonField)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 32,
+    "fragment" : "schema_of_json(jsonField)"
+  } ]
+}
 
 
 -- !query
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 085084d915f..83bdd247cce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -488,11 +488,15 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSparkSession {
 
   test("SPARK-24027: from_json of a map with unsupported key type") {
     val schema = MapType(StructType(StructField("f", IntegerType) :: Nil), 
StringType)
-    val startMsg = "cannot resolve 'entries' due to data type mismatch:"
-    val exception = intercept[AnalysisException] {
-      Seq("""{{"f": 1}: "a"}""").toDS().select(from_json($"value", schema))
-    }.getMessage
-    assert(exception.contains(startMsg))
+    checkError(
+      exception = intercept[AnalysisException] {
+        Seq("""{{"f": 1}: "a"}""").toDS().select(from_json($"value", schema))
+      },
+      errorClass = "DATATYPE_MISMATCH",
+      errorSubClass = Some("INVALID_JSON_MAP_KEY_TYPE"),
+      parameters = Map(
+        "schema" -> "\"MAP<STRUCT<f: INT>, STRING>\"",
+        "sqlExpr" -> "\"entries\""))
   }
 
   test("SPARK-24709: infers schemas of json strings and pass them to 
from_json") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
index 92aadb6779e..d9930cfbe8a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.errors
 import org.apache.spark.sql.{AnalysisException, ClassData, 
IntegratedUDFTestUtils, QueryTest, Row}
 import org.apache.spark.sql.api.java.{UDF1, UDF2, UDF23Test}
 import org.apache.spark.sql.expressions.SparkUserDefinedFunction
-import org.apache.spark.sql.functions.{grouping, grouping_id, lit, struct, 
sum, udf}
+import org.apache.spark.sql.functions.{from_json, grouping, grouping_id, lit, 
struct, sum, udf}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, MapType, StringType, 
StructField, StructType}
@@ -655,6 +655,16 @@ class QueryCompilationErrorsSuite
         fragment = "LATERAL VIEW array_contains(value, 1) AS explodedvalue",
         start = 62, stop = 115))
   }
+
+  test("DATATYPE_MISMATCH.INVALID_JSON_SCHEMA: invalid top type passed to 
from_json()") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        Seq("""{"a":1}""").toDF("a").select(from_json($"a", 
IntegerType)).collect()
+      },
+      errorClass = "DATATYPE_MISMATCH",
+      errorSubClass = Some("INVALID_JSON_SCHEMA"),
+      parameters = Map("schema" -> "\"INT\"", "sqlExpr" -> "\"from_json(a)\""))
+  }
 }
 
 class MyCastToString extends SparkUserDefinedFunction(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to