This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d812eec3e53 [HUDI-8315] update 
SqlKeyGenerator#convertPartitionPathToSqlType to handle default_partition_path 
(#12621)
d812eec3e53 is described below

commit d812eec3e53eeb99c6b9195e3c44e80a21cc262d
Author: karthick-de-25 <[email protected]>
AuthorDate: Sun Jan 19 06:57:06 2025 +0530

    [HUDI-8315] update SqlKeyGenerator#convertPartitionPathToSqlType to handle 
default_partition_path (#12621)
---
 .../spark/sql/hudi/command/SqlKeyGenerator.scala   | 36 +++++++++++++---------
 .../TestSparkSqlWithTimestampKeyGenerator.scala    | 35 +++++++++++++++++++++
 2 files changed, 56 insertions(+), 15 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
index 740ac675868..0f21a9e16db 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types.{StructType, TimestampType}
 import org.apache.spark.unsafe.types.UTF8String
 import org.joda.time.format.DateTimeFormat
+import org.apache.hudi.common.util.PartitionPathEncodeUtils
 
 import java.sql.Timestamp
 import java.util
@@ -152,28 +153,33 @@ class SqlKeyGenerator(props: TypedProperties) extends 
BuiltinKeyGenerator(props)
       // in this case.
       if (partitionFragments.size != partitionSchema.get.size) {
         partitionPath
-      } else {
+      }
+      else {
         partitionFragments.zip(partitionSchema.get.fields).map {
           case (partitionValue, partitionField) =>
             val hiveStylePrefix = s"${partitionField.name}="
             val isHiveStyle = partitionValue.startsWith(hiveStylePrefix)
             val _partitionValue = if (isHiveStyle) 
partitionValue.substring(hiveStylePrefix.length) else partitionValue
-
-            partitionField.dataType match {
-              case TimestampType =>
-                val timeMs = if (rowType) { // In RowType, the 
partitionPathValue is the time format string, convert to millis
-                  
SqlKeyGenerator.sqlTimestampFormat.parseMillis(_partitionValue)
-                } else {
-                  if (isConsistentLogicalTimestampEnabled) {
-                    Timestamp.valueOf(_partitionValue).getTime
+            if (_partitionValue == 
PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH) {
+              partitionValue
+            }
+            else {
+              partitionField.dataType match {
+                case TimestampType =>
+                  val timeMs = if (rowType) { // In RowType, the 
partitionPathValue is the time format string, convert to millis
+                    
SqlKeyGenerator.sqlTimestampFormat.parseMillis(_partitionValue)
                   } else {
-                    MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS)
+                    if (isConsistentLogicalTimestampEnabled) {
+                      Timestamp.valueOf(_partitionValue).getTime
+                    } else {
+                      MILLISECONDS.convert(_partitionValue.toLong, 
MICROSECONDS)
+                    }
                   }
-                }
-                val timestampFormat = PartitionPathEncodeUtils.escapePathName(
-                  SqlKeyGenerator.timestampTimeFormat.print(timeMs))
-                if (isHiveStyle) s"$hiveStylePrefix$timestampFormat" else 
timestampFormat
-              case _ => partitionValue
+                  val timestampFormat = 
PartitionPathEncodeUtils.escapePathName(
+                    SqlKeyGenerator.timestampTimeFormat.print(timeMs))
+                  if (isHiveStyle) s"$hiveStylePrefix$timestampFormat" else 
timestampFormat
+                case _ => partitionValue
+              }
             }
         }.mkString(KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR)
       }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
index e7b20318a00..49dcd1776eb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
@@ -100,6 +100,41 @@ class TestSparkSqlWithTimestampKeyGenerator extends 
HoodieSparkSqlTestBase {
       }
     }
   }
+  test("Test Spark SQL with default partition path for timestamp key 
generator") {
+    withTempDir { tmp =>
+      val keyGeneratorSettings = timestampKeyGeneratorSettings(3)
+      val tsType = if (keyGeneratorSettings.contains("DATE_STRING")) "string" 
else "long"
+      spark.sql(
+        s"""
+           | CREATE TABLE test_default_path_ts (
+           |   id int,
+           |   name string,
+           |   precomb long,
+           |   ts TIMESTAMP
+           | ) USING HUDI
+           | LOCATION '${tmp.getCanonicalPath + "/test_default_path_ts"}'
+           | PARTITIONED BY (ts)
+           | TBLPROPERTIES (
+           |   type = 'COPY_ON_WRITE',
+           |   primaryKey = 'id',
+           |   preCombineField = 'precomb'
+           | )
+           |""".stripMargin)
+      val dataBatches =   Array(
+        "(1, 'a1', 1,TIMESTAMP '2025-01-15 01:02:03')",
+        "(2, 'a3', 1, null)"
+      )
+      val expectedQueryResult: String = "[1,a1,1,2025-01-15 01:02:03.0]; 
[2,a3,1,null]"
+      spark.sql(s"INSERT INTO test_default_path_ts VALUES ${dataBatches(0)}")
+      // inserting value with partition_timestamp value as null
+      spark.sql(s"INSERT INTO test_default_path_ts VALUES ${dataBatches(1)}")
+
+      val queryResult = spark.sql(s"SELECT id, name, precomb, ts FROM 
test_default_path_ts ORDER BY id").collect().mkString("; ")
+      LOG.warn(s"Query result: $queryResult")
+      assertResult(expectedQueryResult)(queryResult)
+
+    }
+  }
 
   test("Test mandatory partitioning for timestamp key generator") {
     withTempDir { tmp =>

Reply via email to