Voon Hou created HUDI-9519:
------------------------------
Summary: Writes with CustomKeyGenerator with TIMESTAMP as one of
KEY gens not working
Key: HUDI-9519
URL: https://issues.apache.org/jira/browse/HUDI-9519
Project: Apache Hudi
Issue Type: Bug
Reporter: Voon Hou
Using custom key generator throws an error when upsert is being performed:
{code:java}
test("Create MOR table with custom keygen partition field") {
withTempDir { tmp =>
val tableName = "hudi_custom_keygen_pt_v8_mor"
spark.sql(
s"""
|CREATE TABLE $tableName (
| id INT,
| name STRING,
| price DOUBLE,
| ts LONG,
| -- Partition Source Fields --
| part_country STRING,
| part_date BIGINT
|) USING hudi
| LOCATION '${tmp.getCanonicalPath}'
| TBLPROPERTIES (
| primaryKey = 'id',
| type = 'mor',
| preCombineField = 'ts',
| -- Hive style partitioning needs to be disabled for timestamp
keygen to work --
| hoodie.datasource.write.hive_style_partitioning = 'false',
| -- Timestamp Keygen and Partition Configs --
| hoodie.table.keygenerator.class =
'org.apache.hudi.keygen.CustomKeyGenerator',
| hoodie.datasource.write.partitionpath.field =
'part_country:simple,part_date:timestamp',
| hoodie.keygen.timebased.timestamp.type = 'EPOCHMILLISECONDS',
| hoodie.keygen.timebased.output.dateformat = 'yyyy-MM-dd',
| hoodie.keygen.timebased.timezone = 'UTC'
| ) PARTITIONED BY (part_country, part_date)
""".stripMargin)
// RecordKey + partition
// Configure Hudi properties
spark.sql(s"SET hoodie.parquet.small.file.limit=0") // Write to a new
parquet file for each commit
spark.sql(s"SET hoodie.metadata.compact.max.delta.commits=1")
spark.sql(s"SET hoodie.metadata.enable=true")
spark.sql(s"SET hoodie.metadata.index.column.stats.enable=true")
// Insert data with new partition values
spark.sql(s"INSERT INTO $tableName VALUES(1, 'a1', 100.0, 1000, 'SG',
1749284360000)")
spark.sql(s"INSERT INTO $tableName VALUES(2, 'a2', 200.0, 1000, 'SG',
1749204000000)")
spark.sql(s"INSERT INTO $tableName VALUES(3, 'a3', 101.0, 1001, 'US',
1749202000000)")
spark.sql(s"INSERT INTO $tableName VALUES(4, 'a4', 201.0, 1001, 'CN',
1749102000000)")
spark.sql(s"INSERT INTO $tableName VALUES(5, 'a5', 300.0, 1002, 'MY',
1747102000000)")
// Generate logs through updates
spark.sql(s"UPDATE $tableName SET price = ROUND(price * 1.02, 2)")
spark.sql(s"SELECT * FROM $tableName").show(false)
}
}
{code}
Error:
{code:java}
Failed to cast value `2025-06-06` to `LongType` for partition column `part_date`
java.lang.RuntimeException: Failed to cast value `2025-06-06` to `LongType` for
partition column `part_date`
at
org.apache.spark.sql.execution.datasources.Spark3ParsePartitionUtil$.$anonfun$parsePartition$3(Spark3ParsePartitionUtil.scala:78)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)