This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 69946bb5c70 [SPARK-42317][SQL] Assign name to _LEGACY_ERROR_TEMP_2247: 
CANNOT_MERGE_SCHEMAS
69946bb5c70 is described below

commit 69946bb5c707657bf0840b21356fbe95b8524ab9
Author: Koray Beyaz <koraybeya...@gmail.com>
AuthorDate: Mon Apr 24 11:30:11 2023 +0300

    [SPARK-42317][SQL] Assign name to _LEGACY_ERROR_TEMP_2247: 
CANNOT_MERGE_SCHEMAS
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to assign name to _LEGACY_ERROR_TEMP_2247 as 
"CANNOT_MERGE_SCHEMAS".
    
    Also proposes to display both left and right schemas in the exception so 
that one can compare them. Please let me know if you prefer the old error 
message with a single schema.
    
    This is the stack trace after the changes:
    
    ```
    scala> spark.read.option("mergeSchema", "true").parquet(path)
    org.apache.spark.SparkException: [CANNOT_MERGE_SCHEMAS] Failed merging 
schemas:
    Initial schema:
    "STRUCT<id: BIGINT>"
    Schema that cannot be merged with the initial schema:
    "STRUCT<id: INT>".
      at 
org.apache.spark.sql.errors.QueryExecutionErrors$.failedMergingSchemaError(QueryExecutionErrors.scala:2355)
      at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:104)
      at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:100)
      at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
      at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
      at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
      at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:100)
      at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:496)
      at 
org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:132)
      at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:78)
      at 
org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:208)
      at scala.Option.orElse(Option.scala:447)
      at 
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:205)
      at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:407)
      at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
      at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
      at scala.Option.getOrElse(Option.scala:189)
      at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
      at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:563)
      at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:548)
      ... 49 elided
    Caused by: org.apache.spark.SparkException: 
[CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE] Failed to merge incompatible data types 
"BIGINT" and "INT".
      at 
org.apache.spark.sql.errors.QueryExecutionErrors$.cannotMergeIncompatibleDataTypesError(QueryExecutionErrors.scala:1326)
      at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$3(StructType.scala:610)
      at scala.Option.map(Option.scala:230)
      at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:602)
      at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$2$adapted(StructType.scala:599)
      at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
      at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
      at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
      at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:599)
      at 
org.apache.spark.sql.types.StructType$.mergeInternal(StructType.scala:647)
      at org.apache.spark.sql.types.StructType$.merge(StructType.scala:593)
      at org.apache.spark.sql.types.StructType.merge(StructType.scala:498)
      at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:102)
      ... 67 more
    ```
    
    ### Why are the changes needed?
    
    We should assign proper name to LEGACY_ERROR_TEMP*
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, the users will see an improved error message.
    
    ### How was this patch tested?
    
    Changed an existing test case to test the new error class with `checkError` 
utility.
    
    Closes #40810 from kori73/assign-name-2247.
    
    Lead-authored-by: Koray Beyaz <koraybeya...@gmail.com>
    Co-authored-by: kori73 <koray.beya...@gmail.com>
    Co-authored-by: Koray Beyaz <koray.beya...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 core/src/main/resources/error/error-classes.json   | 16 ++++++++------
 .../spark/sql/errors/QueryExecutionErrors.scala    | 10 +++++----
 .../execution/datasources/SchemaMergeUtils.scala   |  4 ++--
 .../datasources/parquet/ParquetSchemaSuite.scala   | 25 +++++++++++++---------
 4 files changed, 33 insertions(+), 22 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 11b280efad8..370508b70a8 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -94,6 +94,16 @@
     ],
     "sqlState" : "42825"
   },
+  "CANNOT_MERGE_SCHEMAS" : {
+    "message" : [
+      "Failed merging schemas:",
+      "Initial schema:",
+      "<left>",
+      "Schema that cannot be merged with the initial schema:",
+      "<right>."
+    ],
+    "sqlState" : "42KD9"
+  },
   "CANNOT_MODIFY_CONFIG" : {
     "message" : [
       "Cannot modify the value of the Spark config: <key>.",
@@ -4817,12 +4827,6 @@
       "Table does not support dynamic partition overwrite: <table>."
     ]
   },
-  "_LEGACY_ERROR_TEMP_2247" : {
-    "message" : [
-      "Failed merging schema:",
-      "<schema>."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_2248" : {
     "message" : [
       "Cannot broadcast the table over <maxBroadcastTableRows> rows: <numRows> 
rows."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 11fe84990c1..6c4066e638c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2345,11 +2345,13 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
       cause = null)
   }
 
-  def failedMergingSchemaError(schema: StructType, e: SparkException): 
Throwable = {
+  def failedMergingSchemaError(
+      leftSchema: StructType,
+      rightSchema: StructType,
+      e: SparkException): Throwable = {
     new SparkException(
-      errorClass = "_LEGACY_ERROR_TEMP_2247",
-      messageParameters = Map(
-        "schema" -> schema.treeString),
+      errorClass = "CANNOT_MERGE_SCHEMAS",
+      messageParameters = Map("left" -> toSQLType(leftSchema), "right" -> 
toSQLType(rightSchema)),
       cause = e)
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
index babecfc1f38..35d9b5d6034 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
@@ -86,7 +86,7 @@ object SchemaMergeUtils extends Logging {
               try {
                 mergedSchema = mergedSchema.merge(schema)
               } catch { case cause: SparkException =>
-                throw QueryExecutionErrors.failedMergingSchemaError(schema, 
cause)
+                throw 
QueryExecutionErrors.failedMergingSchemaError(mergedSchema, schema, cause)
               }
             }
             Iterator.single(mergedSchema)
@@ -101,7 +101,7 @@ object SchemaMergeUtils extends Logging {
         try {
           finalSchema = finalSchema.merge(schema)
         } catch { case cause: SparkException =>
-          throw QueryExecutionErrors.failedMergingSchemaError(schema, cause)
+          throw QueryExecutionErrors.failedMergingSchemaError(finalSchema, 
schema, cause)
         }
       }
       Some(finalSchema)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 5589c61be7a..80c27049e5a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -28,6 +28,7 @@ import org.apache.parquet.schema.Type._
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType
 import 
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
 import org.apache.spark.sql.functions.desc
 import org.apache.spark.sql.internal.SQLConf
@@ -980,20 +981,24 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
     }
   }
 
-  test("schema merging failure error message") {
+  test("CANNOT_MERGE_SCHEMAS: Failed merging schemas") {
     import testImplicits._
 
     withTempPath { dir =>
       val path = dir.getCanonicalPath
-      spark.range(3).write.parquet(s"$path/p=1")
-      spark.range(3).select($"id" cast IntegerType as Symbol("id"))
-        .write.parquet(s"$path/p=2")
-
-      val message = intercept[SparkException] {
-        spark.read.option("mergeSchema", "true").parquet(path).schema
-      }.getMessage
-
-      assert(message.contains("Failed merging schema"))
+      val df1 = spark.range(3)
+      df1.write.parquet(s"$path/p=1")
+      val df2 = spark.range(3).select($"id" cast IntegerType as Symbol("id"))
+      df2.write.parquet(s"$path/p=2")
+      checkError(
+        exception = intercept[SparkException] {
+          spark.read.option("mergeSchema", "true").parquet(path)
+        },
+        errorClass = "CANNOT_MERGE_SCHEMAS",
+        sqlState = "42KD9",
+        parameters = Map(
+          "left" -> toSQLType(df1.schema),
+          "right" -> toSQLType(df2.schema)))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to