This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d812eec3e53 [HUDI-8315] update
SqlKeyGenerator#convertPartitionPathToSqlType to handle default_partition_path
(#12621)
d812eec3e53 is described below
commit d812eec3e53eeb99c6b9195e3c44e80a21cc262d
Author: karthick-de-25 <[email protected]>
AuthorDate: Sun Jan 19 06:57:06 2025 +0530
[HUDI-8315] update SqlKeyGenerator#convertPartitionPathToSqlType to handle
default_partition_path (#12621)
---
.../spark/sql/hudi/command/SqlKeyGenerator.scala | 36 +++++++++++++---------
.../TestSparkSqlWithTimestampKeyGenerator.scala | 35 +++++++++++++++++++++
2 files changed, 56 insertions(+), 15 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
index 740ac675868..0f21a9e16db 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
import org.joda.time.format.DateTimeFormat
+import org.apache.hudi.common.util.PartitionPathEncodeUtils
import java.sql.Timestamp
import java.util
@@ -152,28 +153,33 @@ class SqlKeyGenerator(props: TypedProperties) extends
BuiltinKeyGenerator(props)
// in this case.
if (partitionFragments.size != partitionSchema.get.size) {
partitionPath
- } else {
+ }
+ else {
partitionFragments.zip(partitionSchema.get.fields).map {
case (partitionValue, partitionField) =>
val hiveStylePrefix = s"${partitionField.name}="
val isHiveStyle = partitionValue.startsWith(hiveStylePrefix)
val _partitionValue = if (isHiveStyle)
partitionValue.substring(hiveStylePrefix.length) else partitionValue
-
- partitionField.dataType match {
- case TimestampType =>
- val timeMs = if (rowType) { // In RowType, the
partitionPathValue is the time format string, convert to millis
-
SqlKeyGenerator.sqlTimestampFormat.parseMillis(_partitionValue)
- } else {
- if (isConsistentLogicalTimestampEnabled) {
- Timestamp.valueOf(_partitionValue).getTime
+ if (_partitionValue ==
PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH) {
+ partitionValue
+ }
+ else {
+ partitionField.dataType match {
+ case TimestampType =>
+ val timeMs = if (rowType) { // In RowType, the
partitionPathValue is the time format string, convert to millis
+
SqlKeyGenerator.sqlTimestampFormat.parseMillis(_partitionValue)
} else {
- MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS)
+ if (isConsistentLogicalTimestampEnabled) {
+ Timestamp.valueOf(_partitionValue).getTime
+ } else {
+ MILLISECONDS.convert(_partitionValue.toLong,
MICROSECONDS)
+ }
}
- }
- val timestampFormat = PartitionPathEncodeUtils.escapePathName(
- SqlKeyGenerator.timestampTimeFormat.print(timeMs))
- if (isHiveStyle) s"$hiveStylePrefix$timestampFormat" else
timestampFormat
- case _ => partitionValue
+ val timestampFormat =
PartitionPathEncodeUtils.escapePathName(
+ SqlKeyGenerator.timestampTimeFormat.print(timeMs))
+ if (isHiveStyle) s"$hiveStylePrefix$timestampFormat" else
timestampFormat
+ case _ => partitionValue
+ }
}
}.mkString(KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
index e7b20318a00..49dcd1776eb 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
@@ -100,6 +100,41 @@ class TestSparkSqlWithTimestampKeyGenerator extends
HoodieSparkSqlTestBase {
}
}
}
+ test("Test Spark SQL with default partition path for timestamp key
generator") {
+ withTempDir { tmp =>
+ val keyGeneratorSettings = timestampKeyGeneratorSettings(3)
+ val tsType = if (keyGeneratorSettings.contains("DATE_STRING")) "string"
else "long"
+ spark.sql(
+ s"""
+ | CREATE TABLE test_default_path_ts (
+ | id int,
+ | name string,
+ | precomb long,
+ | ts TIMESTAMP
+ | ) USING HUDI
+ | LOCATION '${tmp.getCanonicalPath + "/test_default_path_ts"}'
+ | PARTITIONED BY (ts)
+ | TBLPROPERTIES (
+ | type = 'COPY_ON_WRITE',
+ | primaryKey = 'id',
+ | preCombineField = 'precomb'
+ | )
+ |""".stripMargin)
+ val dataBatches = Array(
+ "(1, 'a1', 1,TIMESTAMP '2025-01-15 01:02:03')",
+ "(2, 'a3', 1, null)"
+ )
+ val expectedQueryResult: String = "[1,a1,1,2025-01-15 01:02:03.0];
[2,a3,1,null]"
+ spark.sql(s"INSERT INTO test_default_path_ts VALUES ${dataBatches(0)}")
+ // inserting value with partition_timestamp value as null
+ spark.sql(s"INSERT INTO test_default_path_ts VALUES ${dataBatches(1)}")
+
+ val queryResult = spark.sql(s"SELECT id, name, precomb, ts FROM
test_default_path_ts ORDER BY id").collect().mkString("; ")
+ LOG.warn(s"Query result: $queryResult")
+ assertResult(expectedQueryResult)(queryResult)
+
+ }
+ }
test("Test mandatory partitioning for timestamp key generator") {
withTempDir { tmp =>