This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 595f14881f9 [SPARK-40359][SQL] Migrate type check fails in CSV/JSON
expressions to error classes
595f14881f9 is described below
commit 595f14881f92a09b03888dd69f69f9e7edf2a060
Author: Max Gekk <[email protected]>
AuthorDate: Thu Sep 22 12:57:23 2022 +0300
[SPARK-40359][SQL] Migrate type check fails in CSV/JSON expressions to
error classes
### What changes were proposed in this pull request?
In the PR, I propose to use error classes in the case of type check failure
in JSON expressions.
### Why are the changes needed?
Migration onto error classes unifies Spark SQL error messages, and improves
search-ability of errors.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite"
$ build/sbt "testOnly *.CsvFunctionsSuite"
```
Closes #37902 from MaxGekk/type-check-fail-json-csv.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
core/src/main/resources/error/error-classes.json | 30 ++++++++
.../spark/sql/catalyst/expressions/ExprUtils.scala | 26 ++++---
.../sql/catalyst/expressions/csvExpressions.scala | 11 +--
.../sql/catalyst/expressions/jsonExpressions.scala | 82 ++++++++++------------
.../spark/sql/catalyst/json/JacksonUtils.scala | 28 ++++----
.../spark/sql/errors/QueryExecutionErrors.scala | 5 --
.../expressions/JsonExpressionsSuite.scala | 6 +-
.../org/apache/spark/sql/DataFrameReader.scala | 5 +-
.../sql/execution/datasources/json/JsonUtils.scala | 15 ++++
.../spark/sql/streaming/DataStreamReader.scala | 4 +-
.../sql-tests/results/csv-functions.sql.out | 32 ++++++++-
.../sql-tests/results/json-functions.sql.out | 32 ++++++++-
.../org/apache/spark/sql/JsonFunctionsSuite.scala | 14 ++--
.../sql/errors/QueryCompilationErrorsSuite.scala | 12 +++-
14 files changed, 205 insertions(+), 97 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json
b/core/src/main/resources/error/error-classes.json
index 823780264cb..affbafdfc03 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -96,6 +96,11 @@
"the binary operator requires the input type <inputType>, not
<actualDataType>."
]
},
+ "CANNOT_CONVERT_TO_JSON" : {
+ "message" : [
+ "Unable to convert column <name> of type <type> to JSON."
+ ]
+ },
"CAST_WITHOUT_SUGGESTION" : {
"message" : [
"cannot cast <srcType> to <targetType>."
@@ -113,10 +118,35 @@
"To convert values from <srcType> to <targetType>, you can use the
functions <functionNames> instead."
]
},
+ "INVALID_JSON_MAP_KEY_TYPE" : {
+ "message" : [
+ "Input schema <schema> can only contain STRING as a key type for a
MAP."
+ ]
+ },
+ "INVALID_JSON_SCHEMA" : {
+ "message" : [
+ "Input schema <schema> must be a struct, an array or a map."
+ ]
+ },
+ "NON_FOLDABLE_INPUT" : {
+ "message" : [
+ "the input should be a foldable string expression and not null;
however, got <inputExpr>."
+ ]
+ },
+ "NON_STRING_TYPE" : {
+ "message" : [
+ "all arguments must be strings."
+ ]
+ },
"UNEXPECTED_INPUT_TYPE" : {
"message" : [
"parameter <paramIndex> requires <requiredType> type, however,
<inputSql> is of <inputType> type."
]
+ },
+ "WRONG_NUM_PARAMS" : {
+ "message" : [
+ "wrong number of parameters: <actualNum>."
+ ]
}
}
},
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
index 204b753d6e4..3e10b820aa6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
@@ -20,12 +20,14 @@ package org.apache.spark.sql.catalyst.expressions
import java.text.{DecimalFormat, DecimalFormatSymbols, ParsePosition}
import java.util.Locale
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch,
TypeCheckSuccess}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CharVarcharUtils}
-import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase,
QueryExecutionErrors}
import org.apache.spark.sql.types.{DataType, MapType, StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String
-object ExprUtils {
+object ExprUtils extends QueryErrorsBase {
def evalTypeExpr(exp: Expression): DataType = {
if (exp.foldable) {
@@ -97,18 +99,22 @@ object ExprUtils {
/**
* Check if the schema is valid for Json
- * @param schema
+ * @param schema The schema to check.
* @return
- * None if the schema is valid
- * Some(msg) with the error message if the schema is not valid
+ * `TypeCheckSuccess` if the schema is valid
+ * `DataTypeMismatch` with an error error if the schema is not valid
*/
- def checkJsonSchema(schema: DataType): Option[Throwable] =
- if (schema.existsRecursively {
+ def checkJsonSchema(schema: DataType): TypeCheckResult = {
+ val isInvalid = schema.existsRecursively {
case MapType(keyType, _, _) if keyType != StringType => true
case _ => false
- }) {
- Some(QueryCompilationErrors.invalidJsonSchema(schema))
+ }
+ if (isInvalid) {
+ DataTypeMismatch(
+ errorSubClass = "INVALID_JSON_MAP_KEY_TYPE",
+ messageParameters = Map("schema" -> toSQLType(schema)))
} else {
- None
+ TypeCheckSuccess
}
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
index 80cd0f7034d..8965e30e4e9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
@@ -23,10 +23,11 @@ import com.univocity.parsers.csv.CsvParser
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.csv._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -166,7 +167,7 @@ case class CsvToStructs(
case class SchemaOfCsv(
child: Expression,
options: Map[String, String])
- extends UnaryExpression with CodegenFallback {
+ extends UnaryExpression with CodegenFallback with QueryErrorsBase {
def this(child: Expression) = this(child, Map.empty[String, String])
@@ -185,9 +186,9 @@ case class SchemaOfCsv(
if (child.foldable && csv != null) {
super.checkInputDataTypes()
} else {
- TypeCheckResult.TypeCheckFailure(
- "The input csv should be a foldable string expression and not null; " +
- s"however, got ${child.sql}.")
+ DataTypeMismatch(
+ errorSubClass = "NON_FOLDABLE_INPUT",
+ messageParameters = Map("inputExpr" -> toSQLExpr(child)))
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 4e6191de000..12da5933ece 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -27,11 +27,12 @@ import com.fasterxml.jackson.core.json.JsonReadFeature
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.trees.TreePattern.{JSON_TO_STRUCT,
TreePattern}
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -392,11 +393,15 @@ case class JsonTuple(children: Seq[Expression])
override def checkInputDataTypes(): TypeCheckResult = {
if (children.length < 2) {
- TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least two
arguments")
+ DataTypeMismatch(
+ errorSubClass = "WRONG_NUM_PARAMS",
+ messageParameters = Map("actualNum" -> children.length.toString))
} else if (children.forall(child =>
StringType.acceptsType(child.dataType))) {
TypeCheckResult.TypeCheckSuccess
} else {
- TypeCheckResult.TypeCheckFailure(s"$prettyName requires that all
arguments are strings")
+ DataTypeMismatch(
+ errorSubClass = "NON_STRING_TYPE",
+ messageParameters = Map("funcName" -> prettyName))
}
}
@@ -535,8 +540,12 @@ case class JsonToStructs(
options: Map[String, String],
child: Expression,
timeZoneId: Option[String] = None)
- extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback
with ExpectsInputTypes
- with NullIntolerant {
+ extends UnaryExpression
+ with TimeZoneAwareExpression
+ with CodegenFallback
+ with ExpectsInputTypes
+ with NullIntolerant
+ with QueryErrorsBase {
// The JSON input data might be missing certain fields. We force the
nullability
// of the user-provided schema to avoid data corruptions. In particular, the
parquet-mr encoder
@@ -566,13 +575,12 @@ case class JsonToStructs(
override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
case _: StructType | _: ArrayType | _: MapType =>
- ExprUtils.checkJsonSchema(nullableSchema).map { e =>
- TypeCheckResult.TypeCheckFailure(e.getMessage)
- } getOrElse {
- super.checkInputDataTypes()
- }
- case _ => TypeCheckResult.TypeCheckFailure(
- s"Input schema ${nullableSchema.catalogString} must be a struct, an
array or a map.")
+ val checkResult = ExprUtils.checkJsonSchema(nullableSchema)
+ if (checkResult.isFailure) checkResult else super.checkInputDataTypes()
+ case _ =>
+ DataTypeMismatch(
+ errorSubClass = "INVALID_JSON_SCHEMA",
+ messageParameters = Map("schema" -> toSQLType(nullableSchema)))
}
// This converts parsed rows to the desired output by the given schema.
@@ -663,8 +671,13 @@ case class StructsToJson(
options: Map[String, String],
child: Expression,
timeZoneId: Option[String] = None)
- extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback
- with ExpectsInputTypes with NullIntolerant {
+ extends UnaryExpression
+ with TimeZoneAwareExpression
+ with CodegenFallback
+ with ExpectsInputTypes
+ with NullIntolerant
+ with QueryErrorsBase {
+
override def nullable: Boolean = true
def this(options: Map[String, String], child: Expression) = this(options,
child, None)
@@ -716,33 +729,12 @@ case class StructsToJson(
override def dataType: DataType = StringType
override def checkInputDataTypes(): TypeCheckResult = inputSchema match {
- case struct: StructType =>
- try {
- JacksonUtils.verifySchema(struct)
- TypeCheckResult.TypeCheckSuccess
- } catch {
- case e: UnsupportedOperationException =>
- TypeCheckResult.TypeCheckFailure(e.getMessage)
- }
- case map: MapType =>
- try {
- JacksonUtils.verifyType(prettyName, map)
- TypeCheckResult.TypeCheckSuccess
- } catch {
- case e: UnsupportedOperationException =>
- TypeCheckResult.TypeCheckFailure(e.getMessage)
- }
- case array: ArrayType =>
- try {
- JacksonUtils.verifyType(prettyName, array)
- TypeCheckResult.TypeCheckSuccess
- } catch {
- case e: UnsupportedOperationException =>
- TypeCheckResult.TypeCheckFailure(e.getMessage)
- }
- case _ => TypeCheckResult.TypeCheckFailure(
- s"Input type ${child.dataType.catalogString} must be a struct, array of
structs or " +
- "a map or array of map.")
+ case dt @ (_: StructType | _: MapType | _: ArrayType) =>
+ JacksonUtils.verifyType(prettyName, dt)
+ case _ =>
+ DataTypeMismatch(
+ errorSubClass = "INVALID_JSON_SCHEMA",
+ messageParameters = Map("schema" -> toSQLType(child.dataType)))
}
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
@@ -775,7 +767,7 @@ case class StructsToJson(
case class SchemaOfJson(
child: Expression,
options: Map[String, String])
- extends UnaryExpression with CodegenFallback {
+ extends UnaryExpression with CodegenFallback with QueryErrorsBase {
def this(child: Expression) = this(child, Map.empty[String, String])
@@ -803,9 +795,9 @@ case class SchemaOfJson(
if (child.foldable && json != null) {
super.checkInputDataTypes()
} else {
- TypeCheckResult.TypeCheckFailure(
- "The input json should be a foldable string expression and not null; "
+
- s"however, got ${child.sql}.")
+ DataTypeMismatch(
+ errorSubClass = "NON_FOLDABLE_INPUT",
+ messageParameters = Map("inputExpr" -> toSQLExpr(child)))
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
index 9286e29844b..b103eda6fc7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
@@ -19,10 +19,12 @@ package org.apache.spark.sql.catalyst.json
import com.fasterxml.jackson.core.{JsonParser, JsonToken}
-import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch,
TypeCheckSuccess}
+import org.apache.spark.sql.errors.QueryErrorsBase
import org.apache.spark.sql.types._
-object JacksonUtils {
+object JacksonUtils extends QueryErrorsBase {
/**
* Advance the parser until a null or a specific token is found
*/
@@ -33,11 +35,14 @@ object JacksonUtils {
}
}
- def verifyType(name: String, dataType: DataType): Unit = {
+ def verifyType(name: String, dataType: DataType): TypeCheckResult = {
dataType match {
- case NullType | _: AtomicType | CalendarIntervalType =>
+ case NullType | _: AtomicType | CalendarIntervalType => TypeCheckSuccess
- case st: StructType => st.foreach(field => verifyType(field.name,
field.dataType))
+ case st: StructType =>
+ st.foldLeft(TypeCheckSuccess: TypeCheckResult) { case (currResult,
field) =>
+ if (currResult.isFailure) currResult else verifyType(field.name,
field.dataType)
+ }
case at: ArrayType => verifyType(name, at.elementType)
@@ -48,14 +53,11 @@ object JacksonUtils {
case udt: UserDefinedType[_] => verifyType(name, udt.sqlType)
case _ =>
- throw QueryExecutionErrors.cannotConvertColumnToJSONError(name,
dataType)
+ DataTypeMismatch(
+ errorSubClass = "CANNOT_CONVERT_TO_JSON",
+ messageParameters = Map(
+ "name" -> toSQLId(name),
+ "type" -> toSQLType(dataType)))
}
}
-
- /**
- * Verify if the schema is supported in JSON parsing.
- */
- def verifySchema(schema: StructType): Unit = {
- schema.foreach(field => verifyType(field.name, field.dataType))
- }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index bc778abc985..c1496611d15 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1311,11 +1311,6 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
s"an ${ArrayType.simpleString}, a ${StructType.simpleString} or a
${MapType.simpleString}")
}
- def cannotConvertColumnToJSONError(name: String, dataType: DataType):
Throwable = {
- new UnsupportedOperationException(
- s"Unable to convert column $name of type ${dataType.catalogString} to
JSON.")
- }
-
def malformedRecordsDetectedInSchemaInferenceError(e: Throwable): Throwable
= {
new SparkException("Malformed records are detected in schema inference. " +
s"Parse Mode: ${FailFastMode.name}.", e)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index af071727b10..ab6da87287b 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -25,7 +25,6 @@ import org.scalatest.exceptions.TestFailedException
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.plans.PlanTestBase
import org.apache.spark.sql.catalyst.util._
@@ -864,10 +863,7 @@ class JsonExpressionsSuite extends SparkFunSuite with
ExpressionEvalHelper with
(MapType(StringType, MapType(IntegerType, StringType)), """{"key": {"1"
: "test"}}""")
).foreach{
case(schema, jsonData) =>
- assert(JsonToStructs(schema, Map.empty,
Literal(jsonData)).checkInputDataTypes() match {
- case TypeCheckFailure(_) => true
- case _ => false
- })
+ assert(JsonToStructs(schema, Map.empty,
Literal(jsonData)).checkInputDataTypes().isFailure)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 344f40eef45..443b18f7281 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.csv._
import org.apache.spark.sql.execution.datasources.jdbc._
+import
org.apache.spark.sql.execution.datasources.json.JsonUtils.checkJsonSchema
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.internal.SQLConf
@@ -357,7 +358,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*/
@scala.annotation.varargs
def json(paths: String*): DataFrame = {
- userSpecifiedSchema.foreach(ExprUtils.checkJsonSchema(_).foreach(throw _))
+ userSpecifiedSchema.foreach(checkJsonSchema)
format("json").load(paths : _*)
}
@@ -406,7 +407,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
- userSpecifiedSchema.foreach(ExprUtils.checkJsonSchema(_).foreach(throw _))
+ userSpecifiedSchema.foreach(checkJsonSchema)
val schema = userSpecifiedSchema.map {
case s if !SQLConf.get.getConf(
SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) =>
s.asNullable
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala
index d511594c5de..a288b5ebf8b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonUtils.scala
@@ -17,10 +17,15 @@
package org.apache.spark.sql.execution.datasources.json
+import org.apache.spark.SparkException
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
+import
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch,
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.json.JSONOptions
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.DataType
object JsonUtils {
/**
@@ -48,4 +53,14 @@ object JsonUtils {
json.sample(withReplacement = false, options.samplingRatio, 1)
}
}
+
+ def checkJsonSchema(schema: DataType): Unit = {
+ ExprUtils.checkJsonSchema(schema) match {
+ case DataTypeMismatch("INVALID_JSON_MAP_KEY_TYPE", _) =>
+ throw QueryCompilationErrors.invalidJsonSchema(schema)
+ case TypeCheckSuccess =>
+ case result =>
+ throw SparkException.internalError(s"Unknown type check result:
$result.")
+ }
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index ba825a28852..d4621468f84 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -25,7 +25,6 @@ import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
CharVarcharUtils}
import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider}
@@ -33,6 +32,7 @@ import
org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
+import
org.apache.spark.sql.execution.datasources.json.JsonUtils.checkJsonSchema
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils,
FileDataSourceV2}
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.sources.StreamSourceProvider
@@ -232,7 +232,7 @@ final class DataStreamReader private[sql](sparkSession:
SparkSession) extends Lo
* @since 2.0.0
*/
def json(path: String): DataFrame = {
- userSpecifiedSchema.foreach(ExprUtils.checkJsonSchema(_).foreach(throw _))
+ userSpecifiedSchema.foreach(checkJsonSchema)
format("json").load(path)
}
diff --git
a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
index 301e5cc78df..291739f9c3b 100644
--- a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
@@ -95,7 +95,21 @@ select schema_of_csv(null)
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
-cannot resolve 'schema_of_csv(NULL)' due to data type mismatch: The input csv
should be a foldable string expression and not null; however, got NULL.; line 1
pos 7
+{
+ "errorClass" : "DATATYPE_MISMATCH",
+ "errorSubClass" : "NON_FOLDABLE_INPUT",
+ "messageParameters" : {
+ "inputExpr" : "\"NULL\"",
+ "sqlExpr" : "\"schema_of_csv(NULL)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 26,
+ "fragment" : "schema_of_csv(null)"
+ } ]
+}
-- !query
@@ -112,7 +126,21 @@ SELECT schema_of_csv(csvField) FROM csvTable
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
-cannot resolve 'schema_of_csv(csvtable.csvField)' due to data type mismatch:
The input csv should be a foldable string expression and not null; however, got
csvtable.csvField.; line 1 pos 7
+{
+ "errorClass" : "DATATYPE_MISMATCH",
+ "errorSubClass" : "NON_FOLDABLE_INPUT",
+ "messageParameters" : {
+ "inputExpr" : "\"csvField\"",
+ "sqlExpr" : "\"schema_of_csv(csvField)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 30,
+ "fragment" : "schema_of_csv(csvField)"
+ } ]
+}
-- !query
diff --git
a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
index b9620a4f9e6..ec6d3871178 100644
--- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
@@ -403,7 +403,21 @@ select schema_of_json(null)
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
-cannot resolve 'schema_of_json(NULL)' due to data type mismatch: The input
json should be a foldable string expression and not null; however, got NULL.;
line 1 pos 7
+{
+ "errorClass" : "DATATYPE_MISMATCH",
+ "errorSubClass" : "NON_FOLDABLE_INPUT",
+ "messageParameters" : {
+ "inputExpr" : "\"NULL\"",
+ "sqlExpr" : "\"schema_of_json(NULL)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 27,
+ "fragment" : "schema_of_json(null)"
+ } ]
+}
-- !query
@@ -420,7 +434,21 @@ SELECT schema_of_json(jsonField) FROM jsonTable
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
-cannot resolve 'schema_of_json(jsontable.jsonField)' due to data type
mismatch: The input json should be a foldable string expression and not null;
however, got jsontable.jsonField.; line 1 pos 7
+{
+ "errorClass" : "DATATYPE_MISMATCH",
+ "errorSubClass" : "NON_FOLDABLE_INPUT",
+ "messageParameters" : {
+ "inputExpr" : "\"jsonField\"",
+ "sqlExpr" : "\"schema_of_json(jsonField)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 32,
+ "fragment" : "schema_of_json(jsonField)"
+ } ]
+}
-- !query
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 085084d915f..83bdd247cce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -488,11 +488,15 @@ class JsonFunctionsSuite extends QueryTest with
SharedSparkSession {
test("SPARK-24027: from_json of a map with unsupported key type") {
val schema = MapType(StructType(StructField("f", IntegerType) :: Nil),
StringType)
- val startMsg = "cannot resolve 'entries' due to data type mismatch:"
- val exception = intercept[AnalysisException] {
- Seq("""{{"f": 1}: "a"}""").toDS().select(from_json($"value", schema))
- }.getMessage
- assert(exception.contains(startMsg))
+ checkError(
+ exception = intercept[AnalysisException] {
+ Seq("""{{"f": 1}: "a"}""").toDS().select(from_json($"value", schema))
+ },
+ errorClass = "DATATYPE_MISMATCH",
+ errorSubClass = Some("INVALID_JSON_MAP_KEY_TYPE"),
+ parameters = Map(
+ "schema" -> "\"MAP<STRUCT<f: INT>, STRING>\"",
+ "sqlExpr" -> "\"entries\""))
}
test("SPARK-24709: infers schemas of json strings and pass them to
from_json") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
index 92aadb6779e..d9930cfbe8a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.errors
import org.apache.spark.sql.{AnalysisException, ClassData,
IntegratedUDFTestUtils, QueryTest, Row}
import org.apache.spark.sql.api.java.{UDF1, UDF2, UDF23Test}
import org.apache.spark.sql.expressions.SparkUserDefinedFunction
-import org.apache.spark.sql.functions.{grouping, grouping_id, lit, struct,
sum, udf}
+import org.apache.spark.sql.functions.{from_json, grouping, grouping_id, lit,
struct, sum, udf}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, MapType, StringType,
StructField, StructType}
@@ -655,6 +655,16 @@ class QueryCompilationErrorsSuite
fragment = "LATERAL VIEW array_contains(value, 1) AS explodedvalue",
start = 62, stop = 115))
}
+
+ test("DATATYPE_MISMATCH.INVALID_JSON_SCHEMA: invalid top type passed to
from_json()") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ Seq("""{"a":1}""").toDF("a").select(from_json($"a",
IntegerType)).collect()
+ },
+ errorClass = "DATATYPE_MISMATCH",
+ errorSubClass = Some("INVALID_JSON_SCHEMA"),
+ parameters = Map("schema" -> "\"INT\"", "sqlExpr" -> "\"from_json(a)\""))
+ }
}
class MyCastToString extends SparkUserDefinedFunction(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]