This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4b93d4cfd49a [SPARK-53598][SQL] Check the existence of numParts before reading large table property 4b93d4cfd49a is described below commit 4b93d4cfd49aee7f550597cfe6933c3f063c4a36 Author: Cheng Pan <cheng...@apache.org> AuthorDate: Thu Sep 18 17:30:23 2025 +0800 [SPARK-53598][SQL] Check the existence of numParts before reading large table property ### What changes were proposed in this pull request? This PR proposes to fix a regression caused by SPARK-33812 (https://github.com/apache/spark/commit/de234eec8febce99ede5ef9ae2301e36739a0f85 3.2.0). We hit an error when upgrading Spark from 3.1 to 3.3, the table is a Hive SerDe Parquet table, which TBLPROPERTIES looks like malformed (not sure how this happen), the table can be read and write normally with Spark 3.1, but fails on reading with Spark 3.3. ``` -- Hive DDL CREATE EXTERNAL TABLE `foo`.`bar`( `id` bigint, ... ) PARTITIONED BY (`dt` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://...' TBLPROPERTIES ( ... 'spark.sql.sources.schema.partCol.0'='dt', 'transient_lastDdlTime'='1727333678') ``` ``` org.apache.spark.sql.AnalysisException: Cannot read table property 'spark.sql.sources.schema' as it's corrupted. at org.apache.spark.sql.errors.QueryCompilationErrors$.cannotReadCorruptedTablePropertyError(QueryCompilationErrors.scala:840) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.CatalogTable$.$anonfun$readLargeTableProp$1(interface.scala:502) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at scala.Option.orElse(Option.scala:447) ~[scala-library-2.12.18.jar:?] at org.apache.spark.sql.catalyst.catalog.CatalogTable$.readLargeTableProp(interface.scala:497) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.getSchemaFromTableProperties(HiveExternalCatalog.scala:839) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.restoreHiveSerdeTable(HiveExternalCatalog.scala:809) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.restoreTableMetadata(HiveExternalCatalog.scala:765) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$getTable$1(HiveExternalCatalog.scala:734) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.getTable(HiveExternalCatalog.scala:734) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.getTable(ExternalCatalogWithListener.scala:138) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableRawMetadata(SessionCatalog.scala:544) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:529) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] ... ``` Before SPARK-33812, it skipped reading the schema from table properties if `spark.sql.sources.schema.numParts` was missing for a Hive SerDe table. ### Why are the changes needed? Restore behavior before Spark 3.2. ### Does this PR introduce _any_ user-facing change? Yes, restores behavior before Spark 3.2. ### How was this patch tested? UT is added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52355 from pan3793/SPARK-53598. Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/catalog/interface.scala | 2 +- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 118 ++++++++++++--------- 2 files changed, 68 insertions(+), 52 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index badfd0dfafb6..73d08eaa2b16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -767,7 +767,7 @@ object CatalogTable { props.get(key).orElse { if (props.exists { case (mapKey, _) => mapKey.startsWith(key) }) { props.get(s"$key.numParts") match { - case None => throw QueryCompilationErrors.insufficientTablePropertyError(key) + case None => None case Some(numParts) => val parts = (0 until numParts.toInt).map { index => val keyPart = s"$key.part.$index" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 9244776a2008..772db8dff615 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1378,62 +1378,78 @@ class MetastoreDataSourcesSuite extends QueryTest } test("read table with corrupted schema") { - try { - val schema = StructType(StructField("int", IntegerType) :: Nil) - val hiveTableWithoutNumPartsProp = CatalogTable( - identifier = TableIdentifier("t", Some("default")), - tableType = CatalogTableType.MANAGED, - schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA, - provider = Some("json"), - storage = CatalogStorageFormat.empty, - properties = Map( - DATASOURCE_PROVIDER -> "json", - DATASOURCE_SCHEMA_PART_PREFIX + 0 -> schema.json)) - - hiveClient.createTable(hiveTableWithoutNumPartsProp, ignoreIfExists = false) - - checkError( - exception = intercept[AnalysisException] { - sharedState.externalCatalog.getTable("default", "t") - }, - condition = "INSUFFICIENT_TABLE_PROPERTY.MISSING_KEY", - parameters = Map("key" -> toSQLConf("spark.sql.sources.schema")) - ) - - val hiveTableWithNumPartsProp = CatalogTable( - identifier = TableIdentifier("t2", Some("default")), - tableType = CatalogTableType.MANAGED, - schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA, - provider = Some("json"), - storage = CatalogStorageFormat.empty, - properties = Map( - DATASOURCE_PROVIDER -> "json", - DATASOURCE_SCHEMA_PREFIX + "numParts" -> "3", - DATASOURCE_SCHEMA_PART_PREFIX + 0 -> schema.json)) - - hiveClient.createTable(hiveTableWithNumPartsProp, ignoreIfExists = false) + Seq(true, false).foreach { isHiveTable => + try { + val schema = StructType(StructField("int", IntegerType) :: Nil) + val hiveTableWithoutNumPartsProp = CatalogTable( + identifier = TableIdentifier("t", Some("default")), + tableType = CatalogTableType.MANAGED, + schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA, + provider = if (isHiveTable) None else Some("json"), + storage = CatalogStorageFormat.empty, + properties = Map( + DATASOURCE_SCHEMA_PART_PREFIX + 0 -> schema.json) ++ { + if (isHiveTable) { + Map.empty + } else { + Map(DATASOURCE_PROVIDER -> "json") + } + }) - checkError( - exception = intercept[AnalysisException] { - sharedState.externalCatalog.getTable("default", "t2") - }, - condition = "INSUFFICIENT_TABLE_PROPERTY.MISSING_KEY_PART", - parameters = Map( - "key" -> toSQLConf("spark.sql.sources.schema.part.1"), - "totalAmountOfParts" -> "3") - ) + hiveClient.createTable(hiveTableWithoutNumPartsProp, ignoreIfExists = false) - withDebugMode { val tableMeta = sharedState.externalCatalog.getTable("default", "t") assert(tableMeta.identifier == TableIdentifier("t", Some("default"))) - assert(tableMeta.properties(DATASOURCE_PROVIDER) == "json") - val tableMeta2 = sharedState.externalCatalog.getTable("default", "t2") - assert(tableMeta2.identifier == TableIdentifier("t2", Some("default"))) - assert(tableMeta2.properties(DATASOURCE_PROVIDER) == "json") + assert(!tableMeta.properties.contains(DATASOURCE_PROVIDER)) + + val hiveTableWithNumPartsProp = CatalogTable( + identifier = TableIdentifier("t2", Some("default")), + tableType = CatalogTableType.MANAGED, + schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA, + provider = if (isHiveTable) None else Some("json"), + storage = CatalogStorageFormat.empty, + properties = Map( + DATASOURCE_SCHEMA_PREFIX + "numParts" -> "3", + DATASOURCE_SCHEMA_PART_PREFIX + 0 -> schema.json) ++ { + if (isHiveTable) { + Map.empty + } else { + Map(DATASOURCE_PROVIDER -> "json") + } + }) + + hiveClient.createTable(hiveTableWithNumPartsProp, ignoreIfExists = false) + + checkError( + exception = intercept[AnalysisException] { + sharedState.externalCatalog.getTable("default", "t2") + }, + condition = "INSUFFICIENT_TABLE_PROPERTY.MISSING_KEY_PART", + parameters = Map( + "key" -> toSQLConf("spark.sql.sources.schema.part.1"), + "totalAmountOfParts" -> "3") + ) + + withDebugMode { + val tableMeta = sharedState.externalCatalog.getTable("default", "t") + assert(tableMeta.identifier == TableIdentifier("t", Some("default"))) + if (isHiveTable) { + assert(!tableMeta.properties.contains(DATASOURCE_PROVIDER)) + } else { + assert(tableMeta.properties(DATASOURCE_PROVIDER) == "json") + } + val tableMeta2 = sharedState.externalCatalog.getTable("default", "t2") + assert(tableMeta2.identifier == TableIdentifier("t2", Some("default"))) + if (isHiveTable) { + assert(!tableMeta2.properties.contains(DATASOURCE_PROVIDER)) + } else { + assert(tableMeta2.properties(DATASOURCE_PROVIDER) == "json") + } + } + } finally { + hiveClient.dropTable("default", "t", ignoreIfNotExists = true, purge = true) + hiveClient.dropTable("default", "t2", ignoreIfNotExists = true, purge = true) } - } finally { - hiveClient.dropTable("default", "t", ignoreIfNotExists = true, purge = true) - hiveClient.dropTable("default", "t2", ignoreIfNotExists = true, purge = true) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org