This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b93d4cfd49a [SPARK-53598][SQL] Check the existence of numParts before 
reading large table property
4b93d4cfd49a is described below

commit 4b93d4cfd49aee7f550597cfe6933c3f063c4a36
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Thu Sep 18 17:30:23 2025 +0800

    [SPARK-53598][SQL] Check the existence of numParts before reading large 
table property
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to fix a regression caused by SPARK-33812 
(https://github.com/apache/spark/commit/de234eec8febce99ede5ef9ae2301e36739a0f85
 3.2.0).
    
    We hit an error when upgrading Spark from 3.1 to 3.3, the table is a Hive 
SerDe Parquet table, which TBLPROPERTIES looks like malformed (not sure how 
this happen), the table can be read and write normally with Spark 3.1, but 
fails on reading with Spark 3.3.
    
    ```
    -- Hive DDL
    CREATE EXTERNAL TABLE `foo`.`bar`(
      `id` bigint,
      ...
    ) PARTITIONED BY (`dt` string)
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS INPUTFORMAT
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
    OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
      'hdfs://...'
    TBLPROPERTIES (
      ...
      'spark.sql.sources.schema.partCol.0'='dt',
      'transient_lastDdlTime'='1727333678')
    ```
    
    ```
    org.apache.spark.sql.AnalysisException: Cannot read table property 
'spark.sql.sources.schema' as it's corrupted.
            at 
org.apache.spark.sql.errors.QueryCompilationErrors$.cannotReadCorruptedTablePropertyError(QueryCompilationErrors.scala:840)
 ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104]
            at 
org.apache.spark.sql.catalyst.catalog.CatalogTable$.$anonfun$readLargeTableProp$1(interface.scala:502)
 ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104]
            at scala.Option.orElse(Option.scala:447) 
~[scala-library-2.12.18.jar:?]
            at 
org.apache.spark.sql.catalyst.catalog.CatalogTable$.readLargeTableProp(interface.scala:497)
 ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104]
            at 
org.apache.spark.sql.hive.HiveExternalCatalog.getSchemaFromTableProperties(HiveExternalCatalog.scala:839)
 ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104]
            at 
org.apache.spark.sql.hive.HiveExternalCatalog.restoreHiveSerdeTable(HiveExternalCatalog.scala:809)
 ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104]
            at 
org.apache.spark.sql.hive.HiveExternalCatalog.restoreTableMetadata(HiveExternalCatalog.scala:765)
 ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104]
            at 
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$getTable$1(HiveExternalCatalog.scala:734)
 ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104]
            at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101)
 ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104]
            at 
org.apache.spark.sql.hive.HiveExternalCatalog.getTable(HiveExternalCatalog.scala:734)
 ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104]
            at 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.getTable(ExternalCatalogWithListener.scala:138)
 ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104]
            at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableRawMetadata(SessionCatalog.scala:544)
 ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104]
            at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:529)
 ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104]
            ...
    ```
    
    Before SPARK-33812, it skipped reading the schema from table properties if 
`spark.sql.sources.schema.numParts` was missing for a Hive SerDe table.
    
    ### Why are the changes needed?
    
    Restore behavior before Spark 3.2.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, restores behavior before Spark 3.2.
    
    ### How was this patch tested?
    
    UT is added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52355 from pan3793/SPARK-53598.
    
    Authored-by: Cheng Pan <cheng...@apache.org>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/catalog/interface.scala     |   2 +-
 .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 118 ++++++++++++---------
 2 files changed, 68 insertions(+), 52 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index badfd0dfafb6..73d08eaa2b16 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -767,7 +767,7 @@ object CatalogTable {
     props.get(key).orElse {
       if (props.exists { case (mapKey, _) => mapKey.startsWith(key) }) {
         props.get(s"$key.numParts") match {
-          case None => throw 
QueryCompilationErrors.insufficientTablePropertyError(key)
+          case None => None
           case Some(numParts) =>
             val parts = (0 until numParts.toInt).map { index =>
               val keyPart = s"$key.part.$index"
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 9244776a2008..772db8dff615 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1378,62 +1378,78 @@ class MetastoreDataSourcesSuite extends QueryTest
   }
 
   test("read table with corrupted schema") {
-    try {
-      val schema = StructType(StructField("int", IntegerType) :: Nil)
-      val hiveTableWithoutNumPartsProp = CatalogTable(
-        identifier = TableIdentifier("t", Some("default")),
-        tableType = CatalogTableType.MANAGED,
-        schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA,
-        provider = Some("json"),
-        storage = CatalogStorageFormat.empty,
-        properties = Map(
-          DATASOURCE_PROVIDER -> "json",
-          DATASOURCE_SCHEMA_PART_PREFIX + 0 -> schema.json))
-
-      hiveClient.createTable(hiveTableWithoutNumPartsProp, ignoreIfExists = 
false)
-
-      checkError(
-        exception = intercept[AnalysisException] {
-          sharedState.externalCatalog.getTable("default", "t")
-        },
-        condition = "INSUFFICIENT_TABLE_PROPERTY.MISSING_KEY",
-        parameters = Map("key" -> toSQLConf("spark.sql.sources.schema"))
-      )
-
-      val hiveTableWithNumPartsProp = CatalogTable(
-        identifier = TableIdentifier("t2", Some("default")),
-        tableType = CatalogTableType.MANAGED,
-        schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA,
-        provider = Some("json"),
-        storage = CatalogStorageFormat.empty,
-        properties = Map(
-          DATASOURCE_PROVIDER -> "json",
-          DATASOURCE_SCHEMA_PREFIX + "numParts" -> "3",
-          DATASOURCE_SCHEMA_PART_PREFIX + 0 -> schema.json))
-
-      hiveClient.createTable(hiveTableWithNumPartsProp, ignoreIfExists = false)
+    Seq(true, false).foreach { isHiveTable =>
+      try {
+        val schema = StructType(StructField("int", IntegerType) :: Nil)
+        val hiveTableWithoutNumPartsProp = CatalogTable(
+          identifier = TableIdentifier("t", Some("default")),
+          tableType = CatalogTableType.MANAGED,
+          schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA,
+          provider = if (isHiveTable) None else Some("json"),
+          storage = CatalogStorageFormat.empty,
+          properties = Map(
+            DATASOURCE_SCHEMA_PART_PREFIX + 0 -> schema.json) ++ {
+            if (isHiveTable) {
+              Map.empty
+            } else {
+              Map(DATASOURCE_PROVIDER -> "json")
+            }
+          })
 
-      checkError(
-        exception = intercept[AnalysisException] {
-          sharedState.externalCatalog.getTable("default", "t2")
-        },
-        condition = "INSUFFICIENT_TABLE_PROPERTY.MISSING_KEY_PART",
-        parameters = Map(
-          "key" -> toSQLConf("spark.sql.sources.schema.part.1"),
-          "totalAmountOfParts" -> "3")
-      )
+        hiveClient.createTable(hiveTableWithoutNumPartsProp, ignoreIfExists = 
false)
 
-      withDebugMode {
         val tableMeta = sharedState.externalCatalog.getTable("default", "t")
         assert(tableMeta.identifier == TableIdentifier("t", Some("default")))
-        assert(tableMeta.properties(DATASOURCE_PROVIDER) == "json")
-        val tableMeta2 = sharedState.externalCatalog.getTable("default", "t2")
-        assert(tableMeta2.identifier == TableIdentifier("t2", Some("default")))
-        assert(tableMeta2.properties(DATASOURCE_PROVIDER) == "json")
+        assert(!tableMeta.properties.contains(DATASOURCE_PROVIDER))
+
+        val hiveTableWithNumPartsProp = CatalogTable(
+          identifier = TableIdentifier("t2", Some("default")),
+          tableType = CatalogTableType.MANAGED,
+          schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA,
+          provider = if (isHiveTable) None else Some("json"),
+          storage = CatalogStorageFormat.empty,
+          properties = Map(
+            DATASOURCE_SCHEMA_PREFIX + "numParts" -> "3",
+            DATASOURCE_SCHEMA_PART_PREFIX + 0 -> schema.json) ++ {
+              if (isHiveTable) {
+                Map.empty
+              } else {
+                Map(DATASOURCE_PROVIDER -> "json")
+              }
+            })
+
+        hiveClient.createTable(hiveTableWithNumPartsProp, ignoreIfExists = 
false)
+
+        checkError(
+          exception = intercept[AnalysisException] {
+            sharedState.externalCatalog.getTable("default", "t2")
+          },
+          condition = "INSUFFICIENT_TABLE_PROPERTY.MISSING_KEY_PART",
+          parameters = Map(
+            "key" -> toSQLConf("spark.sql.sources.schema.part.1"),
+            "totalAmountOfParts" -> "3")
+        )
+
+        withDebugMode {
+          val tableMeta = sharedState.externalCatalog.getTable("default", "t")
+          assert(tableMeta.identifier == TableIdentifier("t", Some("default")))
+          if (isHiveTable) {
+            assert(!tableMeta.properties.contains(DATASOURCE_PROVIDER))
+          } else {
+            assert(tableMeta.properties(DATASOURCE_PROVIDER) == "json")
+          }
+          val tableMeta2 = sharedState.externalCatalog.getTable("default", 
"t2")
+          assert(tableMeta2.identifier == TableIdentifier("t2", 
Some("default")))
+          if (isHiveTable) {
+            assert(!tableMeta2.properties.contains(DATASOURCE_PROVIDER))
+          } else {
+            assert(tableMeta2.properties(DATASOURCE_PROVIDER) == "json")
+          }
+        }
+      } finally {
+        hiveClient.dropTable("default", "t", ignoreIfNotExists = true, purge = 
true)
+        hiveClient.dropTable("default", "t2", ignoreIfNotExists = true, purge 
= true)
       }
-    } finally {
-      hiveClient.dropTable("default", "t", ignoreIfNotExists = true, purge = 
true)
-      hiveClient.dropTable("default", "t2", ignoreIfNotExists = true, purge = 
true)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to