hudi-bot opened a new issue, #17054:
URL: https://github.com/apache/hudi/issues/17054
Using custom key generator throws an error when upsert is being performed:
*Spark-SQL entrypoint:*
{code:java}
test("Create MOR table with custom keygen partition field") {
withTempDir { tmp =>
val tableName = "hudi_custom_keygen_pt_v8_mor"
spark.sql(
s"""
|CREATE TABLE $tableName (
| id INT,
| name STRING,
| price DOUBLE,
| ts LONG,
| -- Partition Source Fields --
| part_country STRING,
| part_date BIGINT
|) USING hudi
| LOCATION '${tmp.getCanonicalPath}'
| TBLPROPERTIES (
| primaryKey = 'id',
| type = 'mor',
| preCombineField = 'ts',
| -- Hive style partitioning needs to be disabled for timestamp
keygen to work --
| hoodie.datasource.write.hive_style_partitioning = 'false',
| -- Timestamp Keygen and Partition Configs --
| hoodie.table.keygenerator.class =
'org.apache.hudi.keygen.CustomKeyGenerator',
| hoodie.datasource.write.partitionpath.field =
'part_country:simple,part_date:timestamp',
| hoodie.keygen.timebased.timestamp.type = 'EPOCHMILLISECONDS',
| hoodie.keygen.timebased.output.dateformat = 'yyyy-MM-dd',
| hoodie.keygen.timebased.timezone = 'UTC'
| ) PARTITIONED BY (part_country, part_date)
""".stripMargin)
// RecordKey + partition
// Configure Hudi properties
spark.sql(s"SET hoodie.parquet.small.file.limit=0") // Write to a new
parquet file for each commit
spark.sql(s"SET hoodie.metadata.compact.max.delta.commits=1")
spark.sql(s"SET hoodie.metadata.enable=true")
spark.sql(s"SET hoodie.metadata.index.column.stats.enable=true")
// Insert data with new partition values
spark.sql(s"INSERT INTO $tableName VALUES(1, 'a1', 100.0, 1000, 'SG',
1749284360000)")
spark.sql(s"INSERT INTO $tableName VALUES(2, 'a2', 200.0, 1000, 'SG',
1749204000000)")
spark.sql(s"INSERT INTO $tableName VALUES(3, 'a3', 101.0, 1001, 'US',
1749202000000)")
spark.sql(s"INSERT INTO $tableName VALUES(4, 'a4', 201.0, 1001, 'CN',
1749102000000)")
spark.sql(s"INSERT INTO $tableName VALUES(5, 'a5', 300.0, 1002, 'MY',
1747102000000)")
// Generate logs through updates
spark.sql(s"UPDATE $tableName SET price = ROUND(price * 1.02, 2)")
spark.sql(s"SELECT * FROM $tableName").show(false)
}
}
{code}
Datasource-write entrypoint:
{code:java}
test("Create MOR table with custom keygen partition field") {
withTempDir { tmp =>
val tablePath = tmp.getCanonicalPath
val tableName = "hudi_custom_keygen_pt_v8_mor"
val hudiOptions = Map[String, String](
HoodieWriteConfig.TBL_NAME.key -> tableName,
DataSourceWriteOptions.TABLE_TYPE.key -> "MERGE_ON_READ",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
// Custom KeyGenerator and Partitioning Config
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
"org.apache.hudi.keygen.CustomKeyGenerator",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key ->
"part_country:SIMPLE,part_date:TIMESTAMP",
// Timestamp-based Partitioning Configs
"hoodie.keygen.timebased.timestamp.type" -> "EPOCHMILLISECONDS",
"hoodie.keygen.timebased.output.dateformat" -> "yyyy-MM-dd",
"hoodie.keygen.timebased.timezone" -> "UTC",
// Disable Hive-style partitioning for CustomKeyGenerator to format
the date partition
DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false",
// Other configs from the SQL SET commands
"hoodie.parquet.small.file.limit" -> "0",
"hoodie.metadata.compact.max.delta.commits" -> "1",
"hoodie.metadata.enable" -> "true",
"hoodie.metadata.index.column.stats.enable" -> "true"
)
spark.createDataFrame(Seq(
(1, "a1", 100.0, 1000L, "SG", 1749284360000L)
)).toDF("id", "name", "price", "ts", "part_country", "part_date").write
.format("hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(tablePath)
spark.createDataFrame(Seq(
(2, "a2", 200.0, 1000L, "SG", 1749204000000L)
)).toDF("id", "name", "price", "ts", "part_country",
"part_date").write
.format("hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(tablePath)
spark.createDataFrame(Seq(
(3, "a3", 101.0, 1001L, "US", 1749202000000L)
)).toDF("id", "name", "price", "ts", "part_country",
"part_date").write
.format("hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(tablePath)
spark.createDataFrame(Seq(
(4, "a4", 201.0, 1001L, "CN", 1749102000000L)
)).toDF("id", "name", "price", "ts", "part_country",
"part_date").write
.format("hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(tablePath)
spark.createDataFrame(Seq(
(5, "a5", 300.0, 1002L, "MY", 1747102000000L)
)).toDF("id", "name", "price", "ts", "part_country",
"part_date").write
.format("hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(tablePath)
// Read the current state of the table
val hudiTableDF = spark.read.format("hudi").load(tablePath)
// Apply the update logic
val updatedDF = hudiTableDF.withColumn("price",
functions.round(col("price") * 1.02, 2))
// Write the updated data back
updatedDF.write
.format("hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(tablePath)
val finalDF = spark.read.format("hudi").load(tablePath)
// Select relevant Hudi meta fields and all original columns to verify
the update
finalDF.select("_hoodie_commit_time", "id", "name", "price", "ts",
"part_country", "part_date")
.orderBy("id")
.show(false)
}
} {code}
Error:
{code:java}
Failed to cast value `2025-06-06` to `LongType` for partition column
`part_date`
java.lang.RuntimeException: Failed to cast value `2025-06-06` to `LongType`
for partition column `part_date`
at
org.apache.spark.sql.execution.datasources.Spark3ParsePartitionUtil$.$anonfun$parsePartition$3(Spark3ParsePartitionUtil.scala:78)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
{code}
Same code has been tested on 0.12.x, 0.14.x, 0.15.x, 1.0.2. All of them have
the same issue.
## JIRA info
- Link: https://issues.apache.org/jira/browse/HUDI-9519
- Type: Bug
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]