This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fd3069ab113c [SPARK-49163][SQL] Attempt to create table based on 
broken parquet partition data results should return user-facing error
fd3069ab113c is described below

commit fd3069ab113c12a6dfa338abfd8c99acd707dfbd
Author: Nikola Mandic <[email protected]>
AuthorDate: Fri Aug 9 21:50:48 2024 +0800

    [SPARK-49163][SQL] Attempt to create table based on broken parquet 
partition data results should return user-facing error
    
    ### What changes were proposed in this pull request?
    
    Create an example parquet table with partitions and insert data in Spark:
    ```
    create table t(col1 string, col2 string, col3 string) using parquet 
location 'some/path/parquet-test' partitioned by (col1, col2);
    insert into t (col1, col2, col3) values ('a', 'b', 'c');
    ```
    Go into the `parquet-test` path in the filesystem and try to copy parquet 
data file from path `col1=a/col2=b` directory into `col1=a`. After that, try to 
create new table based on parquet data in Spark:
    ```
    create table broken_table using parquet location 'some/path/parquet-test';
    ```
    This query errors with internal error. Stack trace excerpts:
    ```
    org.apache.spark.SparkException: [INTERNAL_ERROR] Eagerly executed command 
failed. You hit a bug in Spark or the Spark plugins you use. Please, report 
this bug to the corresponding communities or vendors, and provide the full 
stack trace. SQLSTATE: XX000
    ...
    Caused by: java.lang.AssertionError: assertion failed: Conflicting 
partition column names detected:        Partition column name list #0: col1
            Partition column name list #1: col1, col2For partitioned table 
directories, data files should only live in leaf directories.
    And directories at the same level should have the same partition column 
name.
    Please check the following directories for unexpected files or inconsistent 
partition column names:        file:some/path/parquet-test/col1=a
            file:some/path/parquet-test/col1=a/col2=b
      at scala.Predef$.assert(Predef.scala:279)
      at 
org.apache.spark.sql.execution.datasources.PartitioningUtils$.resolvePartitions(PartitioningUtils.scala:391)
    ...
    ```
    Fix this by changing internal error to user-facing error.
    
    ### Why are the changes needed?
    
    Replace internal error with user-facing one for valid sequence of Spark SQL 
operations.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it presents the user with regular error instead of internal error.
    
    ### How was this patch tested?
    
    Added checks to `ParquetPartitionDiscoverySuite` which simulate the 
described scenario by manually breaking parquet table in the filesystem.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #47668 from nikolamand-db/SPARK-49163.
    
    Authored-by: Nikola Mandic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  11 +++
 .../spark/sql/errors/QueryExecutionErrors.scala    |  12 +++
 .../execution/datasources/PartitioningUtils.scala  |  20 ++--
 .../sql/execution/datasources/FileIndexSuite.scala |   2 +-
 .../parquet/ParquetPartitionDiscoverySuite.scala   | 110 ++++++++++++++-------
 5 files changed, 108 insertions(+), 47 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 4766c7790915..3512fe34e92a 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -625,6 +625,17 @@
     ],
     "sqlState" : "40000"
   },
+  "CONFLICTING_PARTITION_COLUMN_NAMES" : {
+    "message" : [
+      "Conflicting partition column names detected:",
+      "<distinctPartColLists>",
+      "For partitioned table directories, data files should only live in leaf 
directories.",
+      "And directories at the same level should have the same partition column 
name.",
+      "Please check the following directories for unexpected files or 
inconsistent partition column names:",
+      "<suspiciousPaths>"
+    ],
+    "sqlState" : "KD009"
+  },
   "CONNECT" : {
     "message" : [
       "Generic Spark Connect error."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 9bfb81ad821b..eb25387af5a7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2837,4 +2837,16 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
         "parameter" -> toSQLId("unit"),
         "invalidValue" -> s"'$invalidValue'"))
   }
+
+  def conflictingPartitionColumnNamesError(
+      distinctPartColLists: Seq[String],
+      suspiciousPaths: Seq[Path]): SparkRuntimeException = {
+    new SparkRuntimeException(
+      errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES",
+      messageParameters = Map(
+        "distinctPartColLists" -> distinctPartColLists.mkString("\n\t", 
"\n\t", "\n"),
+        "suspiciousPaths" -> suspiciousPaths.map("\t" + _).mkString("\n", 
"\n", "")
+      )
+    )
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 3b2d601b81fb..676a2ab64d0a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -29,6 +29,7 @@ import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.SparkRuntimeException
 import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -386,9 +387,9 @@ object PartitioningUtils extends SQLConfHelper {
       } else {
         pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase()))
       }
-      assert(
-        partColNames.distinct.size == 1,
-        listConflictingPartitionColumns(pathsWithPartitionValues))
+      if (partColNames.distinct.size != 1) {
+        throw conflictingPartitionColumnsError(pathsWithPartitionValues)
+      }
 
       // Resolves possible type conflicts for each column
       val values = pathsWithPartitionValues.map(_._2)
@@ -404,8 +405,8 @@ object PartitioningUtils extends SQLConfHelper {
     }
   }
 
-  private[datasources] def listConflictingPartitionColumns(
-      pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
+  private[datasources] def conflictingPartitionColumnsError(
+      pathWithPartitionValues: Seq[(Path, PartitionValues)]): 
SparkRuntimeException = {
     val distinctPartColNames = 
pathWithPartitionValues.map(_._2.columnNames).distinct
 
     def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
@@ -423,13 +424,8 @@ object PartitioningUtils extends SQLConfHelper {
     // Lists out those non-leaf partition directories that also contain files
     val suspiciousPaths = 
distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths)
 
-    s"Conflicting partition column names detected:\n" +
-      distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
-      "For partitioned table directories, data files should only live in leaf 
directories.\n" +
-      "And directories at the same level should have the same partition column 
name.\n" +
-      "Please check the following directories for unexpected files or " +
-      "inconsistent partition column names:\n" +
-      suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
+    QueryExecutionErrors.conflictingPartitionColumnNamesError(
+      distinctPartColLists, suspiciousPaths)
   }
 
   // scalastyle:off line.size.limit
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 6399eb6da049..21623f94c8ba 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -112,7 +112,7 @@ class FileIndexSuite extends SharedSparkSession {
       }
 
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
-        val msg = intercept[AssertionError] {
+        val msg = intercept[SparkRuntimeException] {
           val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, 
None)
           fileIndex.partitionSpec()
         }.getMessage
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index a6ad147c865d..1484511a98b6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -27,7 +27,7 @@ import com.google.common.io.Files
 import org.apache.hadoop.fs.Path
 import org.apache.parquet.hadoop.ParquetOutputFormat
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkRuntimeException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
@@ -958,54 +958,58 @@ abstract class ParquetPartitionDiscoverySuite
     }
   }
 
-  test("listConflictingPartitionColumns") {
-    def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): 
String = {
-      val conflictingColNameLists = colNameLists.zipWithIndex.map { case 
(list, index) =>
-        s"\tPartition column name list #$index: $list"
-      }.mkString("\n", "\n", "\n")
-
-      // scalastyle:off
-      s"""Conflicting partition column names detected:
-         |$conflictingColNameLists
-         |For partitioned table directories, data files should only live in 
leaf directories.
-         |And directories at the same level should have the same partition 
column name.
-         |Please check the following directories for unexpected files or 
inconsistent partition column names:
-         |${paths.map("\t" + _).mkString("\n", "\n", "")}
-       """.stripMargin.trim
-      // scalastyle:on
-    }
-
-    assert(
-      listConflictingPartitionColumns(
+  test("conflictingPartitionColumnsError") {
+    checkError(
+      exception = conflictingPartitionColumnsError(
         Seq(
           (new Path("file:/tmp/foo/a=1"),
             PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))),
           (new Path("file:/tmp/foo/b=1"),
-            PartitionValues(Seq("b"), Seq(TypedPartValue("1", 
IntegerType)))))).trim ===
-        makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1", 
"file:/tmp/foo/b=1")))
+            PartitionValues(Seq("b"), Seq(TypedPartValue("1", IntegerType))))
+        )
+      ),
+      errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES",
+      parameters = Map(
+        "distinctPartColLists" ->
+          "\n\tPartition column name list #0: a\n\tPartition column name list 
#1: b\n",
+        "suspiciousPaths" -> "\n\tfile:/tmp/foo/a=1\n\tfile:/tmp/foo/b=1"
+      )
+    )
 
-    assert(
-      listConflictingPartitionColumns(
+    checkError(
+      exception = conflictingPartitionColumnsError(
         Seq(
           (new Path("file:/tmp/foo/a=1/_temporary"),
             PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))),
           (new Path("file:/tmp/foo/a=1"),
-            PartitionValues(Seq("a"), Seq(TypedPartValue("1", 
IntegerType)))))).trim ===
-        makeExpectedMessage(
-          Seq("a"),
-          Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1")))
+            PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType))))
+        )
+      ),
+      errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES",
+      parameters = Map(
+        "distinctPartColLists" ->
+          "\n\tPartition column name list #0: a\n",
+        "suspiciousPaths" -> 
"\n\tfile:/tmp/foo/a=1/_temporary\n\tfile:/tmp/foo/a=1"
+      )
+    )
 
-    assert(
-      listConflictingPartitionColumns(
+    checkError(
+      exception = conflictingPartitionColumnsError(
         Seq(
           (new Path("file:/tmp/foo/a=1"),
             PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))),
           (new Path("file:/tmp/foo/a=1/b=foo"),
             PartitionValues(Seq("a", "b"),
-              Seq(TypedPartValue("1", IntegerType), TypedPartValue("foo", 
StringType)))))).trim ===
-        makeExpectedMessage(
-          Seq("a", "a, b"),
-          Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo")))
+              Seq(TypedPartValue("1", IntegerType), TypedPartValue("foo", 
StringType))))
+        )
+      ),
+      errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES",
+      parameters = Map(
+        "distinctPartColLists" ->
+          "\n\tPartition column name list #0: a\n\tPartition column name list 
#1: a, b\n",
+        "suspiciousPaths" -> "\n\tfile:/tmp/foo/a=1\n\tfile:/tmp/foo/a=1/b=foo"
+      )
+    )
   }
 
   test("Parallel partition discovery") {
@@ -1145,6 +1149,44 @@ abstract class ParquetPartitionDiscoverySuite
       checkAnswer(res, Seq(Row(1, 2, 3, 4.0f)))
     }
   }
+
+  test("SPARK-49163: Attempt to create table based on broken parquet partition 
data") {
+    withTempDir { dir =>
+      val data = Seq[(String, String, String)](("a", "b", "c"))
+      data
+        .toDF("col1", "col2", "col3")
+        .write
+        .mode("overwrite")
+        .partitionBy("col1", "col2")
+        .parquet(dir.getCanonicalPath)
+
+      // Structure of parquet table in filesystem:
+      // <base>
+      // +- col1=a
+      //    +- col2=b
+      //       |- part-00000.parquet
+
+      val partition = new File(dir, "col1=a")
+      val dummyData = new File(partition, "dummy")
+      dummyData.createNewFile()
+
+      // Structure of parquet table in filesystem is now corrupt:
+      // <base>
+      // +- col1=a
+      //    |- dummy
+      //    +- col2=b
+      //       |- part-00000.parquet
+
+      val exception = intercept[SparkRuntimeException] {
+        spark.read.parquet(dir.toString)
+      }
+      val msg = exception.getMessage
+      assert(exception.getErrorClass === "CONFLICTING_PARTITION_COLUMN_NAMES")
+      // Partitions inside the error message can be presented in any order
+      assert("Partition column name list #[0-1]: 
col1".r.findFirstIn(msg).isDefined)
+      assert("Partition column name list #[0-1]: col1, 
col2".r.findFirstIn(msg).isDefined)
+    }
+  }
 }
 
 class ParquetV1PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to