Voon Hou created HUDI-9519:
------------------------------

             Summary: Writes with CustomKeyGenerator with TIMESTAMP as one of 
KEY gens not working
                 Key: HUDI-9519
                 URL: https://issues.apache.org/jira/browse/HUDI-9519
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: Voon Hou


 

Using custom key generator throws an error when upsert is being performed:

 

 
{code:java}
test("Create MOR table with custom keygen partition field") {
  withTempDir { tmp =>
    val tableName = "hudi_custom_keygen_pt_v8_mor"

    spark.sql(
      s"""
         |CREATE TABLE $tableName (
         |  id INT,
         |  name STRING,
         |  price DOUBLE,
         |  ts LONG,
         |  -- Partition Source Fields --
         |  part_country STRING,
         |  part_date BIGINT
         |) USING hudi
         | LOCATION '${tmp.getCanonicalPath}'
         | TBLPROPERTIES (
         |  primaryKey = 'id',
         |  type = 'mor',
         |  preCombineField = 'ts',
         |  -- Hive style partitioning needs to be disabled for timestamp 
keygen to work --
         |  hoodie.datasource.write.hive_style_partitioning = 'false',
         |  -- Timestamp Keygen and Partition Configs --
         |  hoodie.table.keygenerator.class = 
'org.apache.hudi.keygen.CustomKeyGenerator',
         |  hoodie.datasource.write.partitionpath.field = 
'part_country:simple,part_date:timestamp',
         |  hoodie.keygen.timebased.timestamp.type = 'EPOCHMILLISECONDS',
         |  hoodie.keygen.timebased.output.dateformat = 'yyyy-MM-dd',
         |  hoodie.keygen.timebased.timezone = 'UTC'
         | ) PARTITIONED BY (part_country, part_date)
   """.stripMargin)

    // RecordKey + partition

    // Configure Hudi properties
    spark.sql(s"SET hoodie.parquet.small.file.limit=0") // Write to a new 
parquet file for each commit
    spark.sql(s"SET hoodie.metadata.compact.max.delta.commits=1")
    spark.sql(s"SET hoodie.metadata.enable=true")
    spark.sql(s"SET hoodie.metadata.index.column.stats.enable=true")

    // Insert data with new partition values
    spark.sql(s"INSERT INTO $tableName VALUES(1, 'a1', 100.0, 1000, 'SG', 
1749284360000)")
    spark.sql(s"INSERT INTO $tableName VALUES(2, 'a2', 200.0, 1000, 'SG', 
1749204000000)")
    spark.sql(s"INSERT INTO $tableName VALUES(3, 'a3', 101.0, 1001, 'US', 
1749202000000)")
    spark.sql(s"INSERT INTO $tableName VALUES(4, 'a4', 201.0, 1001, 'CN', 
1749102000000)")
    spark.sql(s"INSERT INTO $tableName VALUES(5, 'a5', 300.0, 1002, 'MY', 
1747102000000)")

    // Generate logs through updates
    spark.sql(s"UPDATE $tableName SET price = ROUND(price * 1.02, 2)")

    spark.sql(s"SELECT * FROM $tableName").show(false)
  }
}
{code}
 

Error:
{code:java}
Failed to cast value `2025-06-06` to `LongType` for partition column `part_date`
java.lang.RuntimeException: Failed to cast value `2025-06-06` to `LongType` for 
partition column `part_date`
        at 
org.apache.spark.sql.execution.datasources.Spark3ParsePartitionUtil$.$anonfun$parsePartition$3(Spark3ParsePartitionUtil.scala:78)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108) 
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to