This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f11bc758bafc [SPARK-51418][SQL] Fix DataSource PARTITON TABLE w/ Hive 
type incompatible partition columns
f11bc758bafc is described below

commit f11bc758bafc23c1f9eaf89320c5765ac3a24eac
Author: Kent Yao <y...@apache.org>
AuthorDate: Mon Mar 10 13:30:43 2025 +0800

    [SPARK-51418][SQL] Fix DataSource PARTITON TABLE w/ Hive type incompatible 
partition columns
    
    ### What changes were proposed in this pull request?
    
    ```
    25/03/06 08:25:17 WARN HiveExternalCatalog: Hive incompatible types found: 
timestamp_ntz. Persisting data source table `spark_catalog`.`default`.`c` into 
Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
    org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
InvalidObjectException(message:Invalid partition column type: timestamp_ntz)
    ```
    
    The partition columns are duplicated and stored both in the HMS column meta 
and the table properties. If they contain incompatible data types, the HMS Meta 
API will fail the process.
    
    We can rely on the table properties to read/write
    
    ### Why are the changes needed?
    bugfix, otherwise, newly added spark data types are not able to be used as 
partition columns
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, More type cases are supported for partitioned datasouce tables stored 
in HMS
    
    ### How was this patch tested?
    new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #50182 from yaooqinn/SPARK-51418.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../analyzer-results/timestamp-ntz.sql.out         | 36 ++++++++++++++
 .../resources/sql-tests/inputs/timestamp-ntz.sql   |  6 +++
 .../sql-tests/results/timestamp-ntz.sql.out        | 56 ++++++++++++++++++++++
 .../spark/sql/execution/command/DDLSuite.scala     | 11 +++++
 .../spark/sql/hive/client/HiveClientImpl.scala     | 24 ++++++++--
 5 files changed, 129 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz.sql.out
index e92a392e22b6..9ab5b2445fc3 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz.sql.out
@@ -137,3 +137,39 @@ select timestamp_ntz'2022-01-01 00:00:00' > 
timestamp_ltz'2022-01-01 00:00:00'
 select timestamp_ntz'2022-01-01 00:00:00' < timestamp_ltz'2022-01-01 00:00:00'
 -- !query analysis
 [Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+CREATE TABLE a (a timestamp_ntz, b int) using parquet PARTITIONED BY(a)
+-- !query analysis
+CreateDataSourceTableCommand `spark_catalog`.`default`.`a`, false
+
+
+-- !query
+INSERT INTO a PARTITION(a=timestamp_ntz'2018-11-17 13:33:33') VALUES (1)
+-- !query analysis
+InsertIntoHadoopFsRelationCommand file:[not included in 
comparison]/{warehouse_dir}/a, [a=2018-11-17 13:33:33], false, [a#x], Parquet, 
[path=file:[not included in comparison]/{warehouse_dir}/a], Append, 
`spark_catalog`.`default`.`a`, 
org.apache.spark.sql.execution.datasources.CatalogFileIndex(file:[not included 
in comparison]/{warehouse_dir}/a), [b, a]
++- Project [b#x, cast(2018-11-17 13:33:33 as timestamp_ntz) AS a#x]
+   +- Project [cast(col1#x as int) AS b#x]
+      +- LocalRelation [col1#x]
+
+
+-- !query
+DESC FORMATTED a
+-- !query analysis
+DescribeTableCommand `spark_catalog`.`default`.`a`, true, [col_name#x, 
data_type#x, comment#x]
+
+
+-- !query
+SELECT * FROM a
+-- !query analysis
+Project [b#x, a#x]
++- SubqueryAlias spark_catalog.default.a
+   +- Relation spark_catalog.default.a[b#x,a#x] parquet
+
+
+-- !query
+DROP TABLE a
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.a
diff --git a/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz.sql 
b/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz.sql
index 07901093cfba..7996f5879bf7 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz.sql
@@ -31,3 +31,9 @@ select timestamp_ntz'2022-01-01 00:00:00' < date'2022-01-01';
 select timestamp_ntz'2022-01-01 00:00:00' = timestamp_ltz'2022-01-01 00:00:00';
 select timestamp_ntz'2022-01-01 00:00:00' > timestamp_ltz'2022-01-01 00:00:00';
 select timestamp_ntz'2022-01-01 00:00:00' < timestamp_ltz'2022-01-01 00:00:00';
+
+CREATE TABLE a (a timestamp_ntz, b int) using parquet PARTITIONED BY(a);
+INSERT INTO a PARTITION(a=timestamp_ntz'2018-11-17 13:33:33') VALUES (1);
+DESC FORMATTED a;
+SELECT * FROM a;
+DROP TABLE a;
diff --git 
a/sql/core/src/test/resources/sql-tests/results/timestamp-ntz.sql.out 
b/sql/core/src/test/resources/sql-tests/results/timestamp-ntz.sql.out
index 3a473dad828a..9e37bf4e9caa 100644
--- a/sql/core/src/test/resources/sql-tests/results/timestamp-ntz.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/timestamp-ntz.sql.out
@@ -173,3 +173,59 @@ select timestamp_ntz'2022-01-01 00:00:00' < 
timestamp_ltz'2022-01-01 00:00:00'
 struct<(TIMESTAMP_NTZ '2022-01-01 00:00:00' < TIMESTAMP '2022-01-01 
00:00:00'):boolean>
 -- !query output
 false
+
+
+-- !query
+CREATE TABLE a (a timestamp_ntz, b int) using parquet PARTITIONED BY(a)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+INSERT INTO a PARTITION(a=timestamp_ntz'2018-11-17 13:33:33') VALUES (1)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DESC FORMATTED a
+-- !query schema
+struct<col_name:string,data_type:string,comment:string>
+-- !query output
+b                      int                                         
+a                      timestamp_ntz                               
+# Partition Information                                                    
+# col_name             data_type               comment             
+a                      timestamp_ntz                               
+                                                                   
+# Detailed Table Information                                               
+Catalog                spark_catalog                               
+Database               default                                     
+Table                  a                                           
+Created Time [not included in comparison]
+Last Access [not included in comparison]
+Created By [not included in comparison]
+Type                   MANAGED                                     
+Provider               parquet                                     
+Location [not included in comparison]/{warehouse_dir}/a                        
    
+Partition Provider     Catalog
+
+
+-- !query
+SELECT * FROM a
+-- !query schema
+struct<b:int,a:timestamp_ntz>
+-- !query output
+1      2018-11-17 13:33:33
+
+
+-- !query
+DROP TABLE a
+-- !query schema
+struct<>
+-- !query output
+
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index d91d762048d2..21bdbd40caa8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2418,6 +2418,17 @@ abstract class DDLSuite extends QueryTest with 
DDLSuiteBase {
       )
     }
   }
+
+  test("SPARK-51418: Partitioned by Hive type incompatible columns") {
+    withTable("t1") {
+      sql("CREATE TABLE t1(a timestamp_ntz, b INTEGER) USING parquet 
PARTITIONED BY (a)")
+      sql("INSERT INTO t1 PARTITION(a=timestamp_ntz'2018-11-17 13:33:33') 
VALUES (1)")
+      checkAnswer(sql("SELECT * FROM t1"), sql("select 1, 
timestamp_ntz'2018-11-17 13:33:33'"))
+      sql("ALTER TABLE t1 ADD COLUMN (c string)")
+      checkAnswer(sql("SELECT * FROM t1"),
+        sql("select 1, null, timestamp_ntz'2018-11-17 13:33:33'"))
+    }
+  }
 }
 
 object FakeLocalFsFileSystem {
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 90f8a3a85d70..57f6f999b6ad 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -468,7 +468,9 @@ private[hive] class HiveClientImpl(
     // Note: Hive separates partition columns and the schema, but for us the
     // partition columns are part of the schema
     val (cols, partCols) = try {
-      (h.getCols.asScala.map(fromHiveColumn), 
h.getPartCols.asScala.map(fromHiveColumn))
+      (h.getCols.asScala.map(fromHiveColumn),
+        h.getPartCols.asScala.filter(_.getType != 
INCOMPATIBLE_PARTITION_TYPE_PLACEHOLDER)
+          .map(fromHiveColumn))
     } catch {
       case ex: SparkException =>
         throw QueryExecutionErrors.convertHiveTableToCatalogTableError(
@@ -1093,6 +1095,13 @@ private[hive] class HiveClientImpl(
 }
 
 private[hive] object HiveClientImpl extends Logging {
+  // We can not pass raw catalogString of Hive incompatible types to Hive 
metastore.
+  // For regular columns, we have already empty the schema and read/write 
using table properties.
+  // For partition columns, we need to set them to the hive table and also 
avoid verification
+  // failures from HMS. We use the TYPE_PLACEHOLDER below to bypass the 
verification.
+  // See org.apache.hadoop.hive.metastore.MetaStoreUtils#validateColumnType 
for more details.
+
+  lazy val INCOMPATIBLE_PARTITION_TYPE_PLACEHOLDER = "<derived from 
deserializer>"
   /** Converts the native StructField to Hive's FieldSchema. */
   def toHiveColumn(c: StructField): FieldSchema = {
     // For Hive Serde, we still need to to restore the raw type for char and 
varchar type.
@@ -1167,10 +1176,17 @@ private[hive] object HiveClientImpl extends Logging {
       hiveTable.setProperty("EXTERNAL", "TRUE")
     }
     // Note: In Hive the schema and partition columns must be disjoint sets
-    val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
-      table.partitionColumnNames.contains(c.getName)
+    val (partSchema, schema) = table.schema.partition { c =>
+      table.partitionColumnNames.contains(c.name)
+    }
+
+    val partCols = partSchema.map {
+      case c if !HiveExternalCatalog.isHiveCompatibleDataType(c.dataType) =>
+        new FieldSchema(c.name, INCOMPATIBLE_PARTITION_TYPE_PLACEHOLDER, 
c.getComment().orNull)
+      case c => toHiveColumn(c)
     }
-    hiveTable.setFields(schema.asJava)
+
+    hiveTable.setFields(schema.map(toHiveColumn).asJava)
     hiveTable.setPartCols(partCols.asJava)
     
Option(table.owner).filter(_.nonEmpty).orElse(userName).foreach(hiveTable.setOwner)
     hiveTable.setCreateTime(MILLISECONDS.toSeconds(table.createTime).toInt)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to