This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new c1d0aedf3537 [SPARK-51418][SQL] Fix DataSource PARTITON TABLE w/ Hive type incompatible partition columns c1d0aedf3537 is described below commit c1d0aedf35373330ad56af1b7eb4c1d2e2659c6f Author: Kent Yao <y...@apache.org> AuthorDate: Mon Mar 10 13:30:43 2025 +0800 [SPARK-51418][SQL] Fix DataSource PARTITON TABLE w/ Hive type incompatible partition columns ### What changes were proposed in this pull request? ``` 25/03/06 08:25:17 WARN HiveExternalCatalog: Hive incompatible types found: timestamp_ntz. Persisting data source table `spark_catalog`.`default`.`c` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: InvalidObjectException(message:Invalid partition column type: timestamp_ntz) ``` The partition columns are duplicated and stored both in the HMS column meta and the table properties. If they contain incompatible data types, the HMS Meta API will fail the process. We can rely on the table properties to read/write ### Why are the changes needed? bugfix, otherwise, newly added spark data types are not able to be used as partition columns ### Does this PR introduce _any_ user-facing change? Yes, More type cases are supported for partitioned datasouce tables stored in HMS ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #50182 from yaooqinn/SPARK-51418. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> (cherry picked from commit f11bc758bafc23c1f9eaf89320c5765ac3a24eac) Signed-off-by: Kent Yao <y...@apache.org> --- .../analyzer-results/timestamp-ntz.sql.out | 36 ++++++++++++++ .../resources/sql-tests/inputs/timestamp-ntz.sql | 6 +++ .../sql-tests/results/timestamp-ntz.sql.out | 56 ++++++++++++++++++++++ .../spark/sql/execution/command/DDLSuite.scala | 11 +++++ .../spark/sql/hive/client/HiveClientImpl.scala | 24 ++++++++-- 5 files changed, 129 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz.sql.out index e92a392e22b6..9ab5b2445fc3 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz.sql.out @@ -137,3 +137,39 @@ select timestamp_ntz'2022-01-01 00:00:00' > timestamp_ltz'2022-01-01 00:00:00' select timestamp_ntz'2022-01-01 00:00:00' < timestamp_ltz'2022-01-01 00:00:00' -- !query analysis [Analyzer test output redacted due to nondeterminism] + + +-- !query +CREATE TABLE a (a timestamp_ntz, b int) using parquet PARTITIONED BY(a) +-- !query analysis +CreateDataSourceTableCommand `spark_catalog`.`default`.`a`, false + + +-- !query +INSERT INTO a PARTITION(a=timestamp_ntz'2018-11-17 13:33:33') VALUES (1) +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/a, [a=2018-11-17 13:33:33], false, [a#x], Parquet, [path=file:[not included in comparison]/{warehouse_dir}/a], Append, `spark_catalog`.`default`.`a`, org.apache.spark.sql.execution.datasources.CatalogFileIndex(file:[not included in comparison]/{warehouse_dir}/a), [b, a] ++- Project [b#x, cast(2018-11-17 13:33:33 as timestamp_ntz) AS a#x] + +- Project [cast(col1#x as int) AS b#x] + +- LocalRelation [col1#x] + + +-- !query +DESC FORMATTED a +-- !query analysis +DescribeTableCommand `spark_catalog`.`default`.`a`, true, [col_name#x, data_type#x, comment#x] + + +-- !query +SELECT * FROM a +-- !query analysis +Project [b#x, a#x] ++- SubqueryAlias spark_catalog.default.a + +- Relation spark_catalog.default.a[b#x,a#x] parquet + + +-- !query +DROP TABLE a +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.a diff --git a/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz.sql b/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz.sql index 07901093cfba..7996f5879bf7 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz.sql @@ -31,3 +31,9 @@ select timestamp_ntz'2022-01-01 00:00:00' < date'2022-01-01'; select timestamp_ntz'2022-01-01 00:00:00' = timestamp_ltz'2022-01-01 00:00:00'; select timestamp_ntz'2022-01-01 00:00:00' > timestamp_ltz'2022-01-01 00:00:00'; select timestamp_ntz'2022-01-01 00:00:00' < timestamp_ltz'2022-01-01 00:00:00'; + +CREATE TABLE a (a timestamp_ntz, b int) using parquet PARTITIONED BY(a); +INSERT INTO a PARTITION(a=timestamp_ntz'2018-11-17 13:33:33') VALUES (1); +DESC FORMATTED a; +SELECT * FROM a; +DROP TABLE a; diff --git a/sql/core/src/test/resources/sql-tests/results/timestamp-ntz.sql.out b/sql/core/src/test/resources/sql-tests/results/timestamp-ntz.sql.out index 3a473dad828a..9e37bf4e9caa 100644 --- a/sql/core/src/test/resources/sql-tests/results/timestamp-ntz.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/timestamp-ntz.sql.out @@ -173,3 +173,59 @@ select timestamp_ntz'2022-01-01 00:00:00' < timestamp_ltz'2022-01-01 00:00:00' struct<(TIMESTAMP_NTZ '2022-01-01 00:00:00' < TIMESTAMP '2022-01-01 00:00:00'):boolean> -- !query output false + + +-- !query +CREATE TABLE a (a timestamp_ntz, b int) using parquet PARTITIONED BY(a) +-- !query schema +struct<> +-- !query output + + + +-- !query +INSERT INTO a PARTITION(a=timestamp_ntz'2018-11-17 13:33:33') VALUES (1) +-- !query schema +struct<> +-- !query output + + + +-- !query +DESC FORMATTED a +-- !query schema +struct<col_name:string,data_type:string,comment:string> +-- !query output +b int +a timestamp_ntz +# Partition Information +# col_name data_type comment +a timestamp_ntz + +# Detailed Table Information +Catalog spark_catalog +Database default +Table a +Created Time [not included in comparison] +Last Access [not included in comparison] +Created By [not included in comparison] +Type MANAGED +Provider parquet +Location [not included in comparison]/{warehouse_dir}/a +Partition Provider Catalog + + +-- !query +SELECT * FROM a +-- !query schema +struct<b:int,a:timestamp_ntz> +-- !query output +1 2018-11-17 13:33:33 + + +-- !query +DROP TABLE a +-- !query schema +struct<> +-- !query output + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index d91d762048d2..21bdbd40caa8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2418,6 +2418,17 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { ) } } + + test("SPARK-51418: Partitioned by Hive type incompatible columns") { + withTable("t1") { + sql("CREATE TABLE t1(a timestamp_ntz, b INTEGER) USING parquet PARTITIONED BY (a)") + sql("INSERT INTO t1 PARTITION(a=timestamp_ntz'2018-11-17 13:33:33') VALUES (1)") + checkAnswer(sql("SELECT * FROM t1"), sql("select 1, timestamp_ntz'2018-11-17 13:33:33'")) + sql("ALTER TABLE t1 ADD COLUMN (c string)") + checkAnswer(sql("SELECT * FROM t1"), + sql("select 1, null, timestamp_ntz'2018-11-17 13:33:33'")) + } + } } object FakeLocalFsFileSystem { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 90f8a3a85d70..57f6f999b6ad 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -468,7 +468,9 @@ private[hive] class HiveClientImpl( // Note: Hive separates partition columns and the schema, but for us the // partition columns are part of the schema val (cols, partCols) = try { - (h.getCols.asScala.map(fromHiveColumn), h.getPartCols.asScala.map(fromHiveColumn)) + (h.getCols.asScala.map(fromHiveColumn), + h.getPartCols.asScala.filter(_.getType != INCOMPATIBLE_PARTITION_TYPE_PLACEHOLDER) + .map(fromHiveColumn)) } catch { case ex: SparkException => throw QueryExecutionErrors.convertHiveTableToCatalogTableError( @@ -1093,6 +1095,13 @@ private[hive] class HiveClientImpl( } private[hive] object HiveClientImpl extends Logging { + // We can not pass raw catalogString of Hive incompatible types to Hive metastore. + // For regular columns, we have already empty the schema and read/write using table properties. + // For partition columns, we need to set them to the hive table and also avoid verification + // failures from HMS. We use the TYPE_PLACEHOLDER below to bypass the verification. + // See org.apache.hadoop.hive.metastore.MetaStoreUtils#validateColumnType for more details. + + lazy val INCOMPATIBLE_PARTITION_TYPE_PLACEHOLDER = "<derived from deserializer>" /** Converts the native StructField to Hive's FieldSchema. */ def toHiveColumn(c: StructField): FieldSchema = { // For Hive Serde, we still need to to restore the raw type for char and varchar type. @@ -1167,10 +1176,17 @@ private[hive] object HiveClientImpl extends Logging { hiveTable.setProperty("EXTERNAL", "TRUE") } // Note: In Hive the schema and partition columns must be disjoint sets - val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => - table.partitionColumnNames.contains(c.getName) + val (partSchema, schema) = table.schema.partition { c => + table.partitionColumnNames.contains(c.name) + } + + val partCols = partSchema.map { + case c if !HiveExternalCatalog.isHiveCompatibleDataType(c.dataType) => + new FieldSchema(c.name, INCOMPATIBLE_PARTITION_TYPE_PLACEHOLDER, c.getComment().orNull) + case c => toHiveColumn(c) } - hiveTable.setFields(schema.asJava) + + hiveTable.setFields(schema.map(toHiveColumn).asJava) hiveTable.setPartCols(partCols.asJava) Option(table.owner).filter(_.nonEmpty).orElse(userName).foreach(hiveTable.setOwner) hiveTable.setCreateTime(MILLISECONDS.toSeconds(table.createTime).toInt) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org