This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 69946bb5c70 [SPARK-42317][SQL] Assign name to _LEGACY_ERROR_TEMP_2247:
CANNOT_MERGE_SCHEMAS
69946bb5c70 is described below
commit 69946bb5c707657bf0840b21356fbe95b8524ab9
Author: Koray Beyaz <[email protected]>
AuthorDate: Mon Apr 24 11:30:11 2023 +0300
[SPARK-42317][SQL] Assign name to _LEGACY_ERROR_TEMP_2247:
CANNOT_MERGE_SCHEMAS
### What changes were proposed in this pull request?
This PR proposes to assign name to _LEGACY_ERROR_TEMP_2247 as
"CANNOT_MERGE_SCHEMAS".
Also proposes to display both left and right schemas in the exception so
that one can compare them. Please let me know if you prefer the old error
message with a single schema.
This is the stack trace after the changes:
```
scala> spark.read.option("mergeSchema", "true").parquet(path)
org.apache.spark.SparkException: [CANNOT_MERGE_SCHEMAS] Failed merging
schemas:
Initial schema:
"STRUCT<id: BIGINT>"
Schema that cannot be merged with the initial schema:
"STRUCT<id: INT>".
at
org.apache.spark.sql.errors.QueryExecutionErrors$.failedMergingSchemaError(QueryExecutionErrors.scala:2355)
at
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:104)
at
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:100)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:100)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:496)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:132)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:78)
at
org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:208)
at scala.Option.orElse(Option.scala:447)
at
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:205)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:407)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:563)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:548)
... 49 elided
Caused by: org.apache.spark.SparkException:
[CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE] Failed to merge incompatible data types
"BIGINT" and "INT".
at
org.apache.spark.sql.errors.QueryExecutionErrors$.cannotMergeIncompatibleDataTypesError(QueryExecutionErrors.scala:1326)
at
org.apache.spark.sql.types.StructType$.$anonfun$merge$3(StructType.scala:610)
at scala.Option.map(Option.scala:230)
at
org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:602)
at
org.apache.spark.sql.types.StructType$.$anonfun$merge$2$adapted(StructType.scala:599)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at
org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:599)
at
org.apache.spark.sql.types.StructType$.mergeInternal(StructType.scala:647)
at org.apache.spark.sql.types.StructType$.merge(StructType.scala:593)
at org.apache.spark.sql.types.StructType.merge(StructType.scala:498)
at
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:102)
... 67 more
```
### Why are the changes needed?
We should assign proper name to LEGACY_ERROR_TEMP*
### Does this PR introduce _any_ user-facing change?
Yes, the users will see an improved error message.
### How was this patch tested?
Changed an existing test case to test the new error class with `checkError`
utility.
Closes #40810 from kori73/assign-name-2247.
Lead-authored-by: Koray Beyaz <[email protected]>
Co-authored-by: kori73 <[email protected]>
Co-authored-by: Koray Beyaz <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
core/src/main/resources/error/error-classes.json | 16 ++++++++------
.../spark/sql/errors/QueryExecutionErrors.scala | 10 +++++----
.../execution/datasources/SchemaMergeUtils.scala | 4 ++--
.../datasources/parquet/ParquetSchemaSuite.scala | 25 +++++++++++++---------
4 files changed, 33 insertions(+), 22 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json
b/core/src/main/resources/error/error-classes.json
index 11b280efad8..370508b70a8 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -94,6 +94,16 @@
],
"sqlState" : "42825"
},
+ "CANNOT_MERGE_SCHEMAS" : {
+ "message" : [
+ "Failed merging schemas:",
+ "Initial schema:",
+ "<left>",
+ "Schema that cannot be merged with the initial schema:",
+ "<right>."
+ ],
+ "sqlState" : "42KD9"
+ },
"CANNOT_MODIFY_CONFIG" : {
"message" : [
"Cannot modify the value of the Spark config: <key>.",
@@ -4817,12 +4827,6 @@
"Table does not support dynamic partition overwrite: <table>."
]
},
- "_LEGACY_ERROR_TEMP_2247" : {
- "message" : [
- "Failed merging schema:",
- "<schema>."
- ]
- },
"_LEGACY_ERROR_TEMP_2248" : {
"message" : [
"Cannot broadcast the table over <maxBroadcastTableRows> rows: <numRows>
rows."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 11fe84990c1..6c4066e638c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2345,11 +2345,13 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
cause = null)
}
- def failedMergingSchemaError(schema: StructType, e: SparkException):
Throwable = {
+ def failedMergingSchemaError(
+ leftSchema: StructType,
+ rightSchema: StructType,
+ e: SparkException): Throwable = {
new SparkException(
- errorClass = "_LEGACY_ERROR_TEMP_2247",
- messageParameters = Map(
- "schema" -> schema.treeString),
+ errorClass = "CANNOT_MERGE_SCHEMAS",
+ messageParameters = Map("left" -> toSQLType(leftSchema), "right" ->
toSQLType(rightSchema)),
cause = e)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
index babecfc1f38..35d9b5d6034 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
@@ -86,7 +86,7 @@ object SchemaMergeUtils extends Logging {
try {
mergedSchema = mergedSchema.merge(schema)
} catch { case cause: SparkException =>
- throw QueryExecutionErrors.failedMergingSchemaError(schema,
cause)
+ throw
QueryExecutionErrors.failedMergingSchemaError(mergedSchema, schema, cause)
}
}
Iterator.single(mergedSchema)
@@ -101,7 +101,7 @@ object SchemaMergeUtils extends Logging {
try {
finalSchema = finalSchema.merge(schema)
} catch { case cause: SparkException =>
- throw QueryExecutionErrors.failedMergingSchemaError(schema, cause)
+ throw QueryExecutionErrors.failedMergingSchemaError(finalSchema,
schema, cause)
}
}
Some(finalSchema)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 5589c61be7a..80c27049e5a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -28,6 +28,7 @@ import org.apache.parquet.schema.Type._
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType
import
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.internal.SQLConf
@@ -980,20 +981,24 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
}
}
- test("schema merging failure error message") {
+ test("CANNOT_MERGE_SCHEMAS: Failed merging schemas") {
import testImplicits._
withTempPath { dir =>
val path = dir.getCanonicalPath
- spark.range(3).write.parquet(s"$path/p=1")
- spark.range(3).select($"id" cast IntegerType as Symbol("id"))
- .write.parquet(s"$path/p=2")
-
- val message = intercept[SparkException] {
- spark.read.option("mergeSchema", "true").parquet(path).schema
- }.getMessage
-
- assert(message.contains("Failed merging schema"))
+ val df1 = spark.range(3)
+ df1.write.parquet(s"$path/p=1")
+ val df2 = spark.range(3).select($"id" cast IntegerType as Symbol("id"))
+ df2.write.parquet(s"$path/p=2")
+ checkError(
+ exception = intercept[SparkException] {
+ spark.read.option("mergeSchema", "true").parquet(path)
+ },
+ errorClass = "CANNOT_MERGE_SCHEMAS",
+ sqlState = "42KD9",
+ parameters = Map(
+ "left" -> toSQLType(df1.schema),
+ "right" -> toSQLType(df2.schema)))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]