[ 
https://issues.apache.org/jira/browse/HUDI-9519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Voon Hou updated HUDI-9519:
---------------------------
    Description: 
 

Using custom key generator throws an error when upsert is being performed:

*Spark-SQL entrypoint:*
{code:java}
test("Create MOR table with custom keygen partition field") {
  withTempDir { tmp =>
    val tableName = "hudi_custom_keygen_pt_v8_mor"

    spark.sql(
      s"""
         |CREATE TABLE $tableName (
         |  id INT,
         |  name STRING,
         |  price DOUBLE,
         |  ts LONG,
         |  -- Partition Source Fields --
         |  part_country STRING,
         |  part_date BIGINT
         |) USING hudi
         | LOCATION '${tmp.getCanonicalPath}'
         | TBLPROPERTIES (
         |  primaryKey = 'id',
         |  type = 'mor',
         |  preCombineField = 'ts',
         |  -- Hive style partitioning needs to be disabled for timestamp 
keygen to work --
         |  hoodie.datasource.write.hive_style_partitioning = 'false',
         |  -- Timestamp Keygen and Partition Configs --
         |  hoodie.table.keygenerator.class = 
'org.apache.hudi.keygen.CustomKeyGenerator',
         |  hoodie.datasource.write.partitionpath.field = 
'part_country:simple,part_date:timestamp',
         |  hoodie.keygen.timebased.timestamp.type = 'EPOCHMILLISECONDS',
         |  hoodie.keygen.timebased.output.dateformat = 'yyyy-MM-dd',
         |  hoodie.keygen.timebased.timezone = 'UTC'
         | ) PARTITIONED BY (part_country, part_date)
   """.stripMargin)

    // RecordKey + partition

    // Configure Hudi properties
    spark.sql(s"SET hoodie.parquet.small.file.limit=0") // Write to a new 
parquet file for each commit
    spark.sql(s"SET hoodie.metadata.compact.max.delta.commits=1")
    spark.sql(s"SET hoodie.metadata.enable=true")
    spark.sql(s"SET hoodie.metadata.index.column.stats.enable=true")

    // Insert data with new partition values
    spark.sql(s"INSERT INTO $tableName VALUES(1, 'a1', 100.0, 1000, 'SG', 
1749284360000)")
    spark.sql(s"INSERT INTO $tableName VALUES(2, 'a2', 200.0, 1000, 'SG', 
1749204000000)")
    spark.sql(s"INSERT INTO $tableName VALUES(3, 'a3', 101.0, 1001, 'US', 
1749202000000)")
    spark.sql(s"INSERT INTO $tableName VALUES(4, 'a4', 201.0, 1001, 'CN', 
1749102000000)")
    spark.sql(s"INSERT INTO $tableName VALUES(5, 'a5', 300.0, 1002, 'MY', 
1747102000000)")

    // Generate logs through updates
    spark.sql(s"UPDATE $tableName SET price = ROUND(price * 1.02, 2)")

    spark.sql(s"SELECT * FROM $tableName").show(false)
  }
}
{code}
 

Datasource-write entrypoint:
{code:java}
test("Create MOR table with custom keygen partition field") {
    withTempDir { tmp =>
      val tablePath = tmp.getCanonicalPath
      val tableName = "hudi_custom_keygen_pt_v8_mor"

      val hudiOptions = Map[String, String](
        HoodieWriteConfig.TBL_NAME.key -> tableName,
        DataSourceWriteOptions.TABLE_TYPE.key -> "MERGE_ON_READ",
        DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
        DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",

        // Custom KeyGenerator and Partitioning Config
        DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
"org.apache.hudi.keygen.CustomKeyGenerator",
        DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> 
"part_country:SIMPLE,part_date:TIMESTAMP",

        // Timestamp-based Partitioning Configs
        "hoodie.keygen.timebased.timestamp.type" -> "EPOCHMILLISECONDS",
        "hoodie.keygen.timebased.output.dateformat" -> "yyyy-MM-dd",
        "hoodie.keygen.timebased.timezone" -> "UTC",

        // Disable Hive-style partitioning for CustomKeyGenerator to format the 
date partition
        DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false",

        // Other configs from the SQL SET commands
        "hoodie.parquet.small.file.limit" -> "0",
        "hoodie.metadata.compact.max.delta.commits" -> "1",
        "hoodie.metadata.enable" -> "true",
        "hoodie.metadata.index.column.stats.enable" -> "true"
      )

      spark.createDataFrame(Seq(
        (1, "a1", 100.0, 1000L, "SG", 1749284360000L)
      )).toDF("id", "name", "price", "ts", "part_country", "part_date").write
        .format("hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(tablePath)

      spark.createDataFrame(Seq(
          (2, "a2", 200.0, 1000L, "SG", 1749204000000L)
        )).toDF("id", "name", "price", "ts", "part_country", "part_date").write
        .format("hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(tablePath)

      spark.createDataFrame(Seq(
          (3, "a3", 101.0, 1001L, "US", 1749202000000L)
        )).toDF("id", "name", "price", "ts", "part_country", "part_date").write
        .format("hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(tablePath)

      spark.createDataFrame(Seq(
          (4, "a4", 201.0, 1001L, "CN", 1749102000000L)
        )).toDF("id", "name", "price", "ts", "part_country", "part_date").write
        .format("hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(tablePath)

      spark.createDataFrame(Seq(
          (5, "a5", 300.0, 1002L, "MY", 1747102000000L)
        )).toDF("id", "name", "price", "ts", "part_country", "part_date").write
        .format("hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(tablePath)

      // Read the current state of the table
      val hudiTableDF = spark.read.format("hudi").load(tablePath)

      // Apply the update logic
      val updatedDF = hudiTableDF.withColumn("price", 
functions.round(col("price") * 1.02, 2))

      // Write the updated data back
      updatedDF.write
        .format("hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(tablePath)

      val finalDF = spark.read.format("hudi").load(tablePath)

      // Select relevant Hudi meta fields and all original columns to verify 
the update
      finalDF.select("_hoodie_commit_time", "id", "name", "price", "ts", 
"part_country", "part_date")
        .orderBy("id")
        .show(false)

    }
  } {code}
 

Error:
{code:java}
Failed to cast value `2025-06-06` to `LongType` for partition column `part_date`
java.lang.RuntimeException: Failed to cast value `2025-06-06` to `LongType` for 
partition column `part_date`
        at 
org.apache.spark.sql.execution.datasources.Spark3ParsePartitionUtil$.$anonfun$parsePartition$3(Spark3ParsePartitionUtil.scala:78)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108) 
{code}
 

 

Same code has been tested on 0.12.x, 0.14.x, 0.15.x, 1.0.2. All of them have 
the same issue. 

  was:
 

Using custom key generator throws an error when upsert is being performed:

*Spark-SQL entrypoint:*
{code:java}
test("Create MOR table with custom keygen partition field") {
  withTempDir { tmp =>
    val tableName = "hudi_custom_keygen_pt_v8_mor"

    spark.sql(
      s"""
         |CREATE TABLE $tableName (
         |  id INT,
         |  name STRING,
         |  price DOUBLE,
         |  ts LONG,
         |  -- Partition Source Fields --
         |  part_country STRING,
         |  part_date BIGINT
         |) USING hudi
         | LOCATION '${tmp.getCanonicalPath}'
         | TBLPROPERTIES (
         |  primaryKey = 'id',
         |  type = 'mor',
         |  preCombineField = 'ts',
         |  -- Hive style partitioning needs to be disabled for timestamp 
keygen to work --
         |  hoodie.datasource.write.hive_style_partitioning = 'false',
         |  -- Timestamp Keygen and Partition Configs --
         |  hoodie.table.keygenerator.class = 
'org.apache.hudi.keygen.CustomKeyGenerator',
         |  hoodie.datasource.write.partitionpath.field = 
'part_country:simple,part_date:timestamp',
         |  hoodie.keygen.timebased.timestamp.type = 'EPOCHMILLISECONDS',
         |  hoodie.keygen.timebased.output.dateformat = 'yyyy-MM-dd',
         |  hoodie.keygen.timebased.timezone = 'UTC'
         | ) PARTITIONED BY (part_country, part_date)
   """.stripMargin)

    // RecordKey + partition

    // Configure Hudi properties
    spark.sql(s"SET hoodie.parquet.small.file.limit=0") // Write to a new 
parquet file for each commit
    spark.sql(s"SET hoodie.metadata.compact.max.delta.commits=1")
    spark.sql(s"SET hoodie.metadata.enable=true")
    spark.sql(s"SET hoodie.metadata.index.column.stats.enable=true")

    // Insert data with new partition values
    spark.sql(s"INSERT INTO $tableName VALUES(1, 'a1', 100.0, 1000, 'SG', 
1749284360000)")
    spark.sql(s"INSERT INTO $tableName VALUES(2, 'a2', 200.0, 1000, 'SG', 
1749204000000)")
    spark.sql(s"INSERT INTO $tableName VALUES(3, 'a3', 101.0, 1001, 'US', 
1749202000000)")
    spark.sql(s"INSERT INTO $tableName VALUES(4, 'a4', 201.0, 1001, 'CN', 
1749102000000)")
    spark.sql(s"INSERT INTO $tableName VALUES(5, 'a5', 300.0, 1002, 'MY', 
1747102000000)")

    // Generate logs through updates
    spark.sql(s"UPDATE $tableName SET price = ROUND(price * 1.02, 2)")

    spark.sql(s"SELECT * FROM $tableName").show(false)
  }
}
{code}
 

Datasource-write entrypoint:
{code:java}
test("Create MOR table with custom keygen partition field") {
    withTempDir { tmp =>
      val tablePath = tmp.getCanonicalPath
      val tableName = "hudi_custom_keygen_pt_v8_mor"

      val hudiOptions = Map[String, String](
        HoodieWriteConfig.TBL_NAME.key -> tableName,
        DataSourceWriteOptions.TABLE_TYPE.key -> "MERGE_ON_READ",
        DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
        DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",

        // Custom KeyGenerator and Partitioning Config
        DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
"org.apache.hudi.keygen.CustomKeyGenerator",
        DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> 
"part_country:SIMPLE,part_date:TIMESTAMP",

        // Timestamp-based Partitioning Configs
        "hoodie.keygen.timebased.timestamp.type" -> "EPOCHMILLISECONDS",
        "hoodie.keygen.timebased.output.dateformat" -> "yyyy-MM-dd",
        "hoodie.keygen.timebased.timezone" -> "UTC",

        // Disable Hive-style partitioning for CustomKeyGenerator to format the 
date partition
        DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false",

        // Other configs from the SQL SET commands
        "hoodie.parquet.small.file.limit" -> "0",
        "hoodie.metadata.compact.max.delta.commits" -> "1",
        "hoodie.metadata.enable" -> "true",
        "hoodie.metadata.index.column.stats.enable" -> "true"
      )

      spark.createDataFrame(Seq(
        (1, "a1", 100.0, 1000L, "SG", 1749284360000L)
      )).toDF("id", "name", "price", "ts", "part_country", "part_date").write
        .format("hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(tablePath)

      spark.createDataFrame(Seq(
          (2, "a2", 200.0, 1000L, "SG", 1749204000000L)
        )).toDF("id", "name", "price", "ts", "part_country", "part_date").write
        .format("hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(tablePath)

      spark.createDataFrame(Seq(
          (3, "a3", 101.0, 1001L, "US", 1749202000000L)
        )).toDF("id", "name", "price", "ts", "part_country", "part_date").write
        .format("hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(tablePath)

      spark.createDataFrame(Seq(
          (4, "a4", 201.0, 1001L, "CN", 1749102000000L)
        )).toDF("id", "name", "price", "ts", "part_country", "part_date").write
        .format("hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(tablePath)

      spark.createDataFrame(Seq(
          (5, "a5", 300.0, 1002L, "MY", 1747102000000L)
        )).toDF("id", "name", "price", "ts", "part_country", "part_date").write
        .format("hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(tablePath)

      // Read the current state of the table
      val hudiTableDF = spark.read.format("hudi").load(tablePath)

      // Apply the update logic
      val updatedDF = hudiTableDF.withColumn("price", 
functions.round(col("price") * 1.02, 2))

      // Write the updated data back
      updatedDF.write
        .format("hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(tablePath)

      val finalDF = spark.read.format("hudi").load(tablePath)

      // Select relevant Hudi meta fields and all original columns to verify 
the update
      finalDF.select("_hoodie_commit_time", "id", "name", "price", "ts", 
"part_country", "part_date")
        .orderBy("id")
        .show(false)

    }
  } {code}
 

Error:
{code:java}
Failed to cast value `2025-06-06` to `LongType` for partition column `part_date`
java.lang.RuntimeException: Failed to cast value `2025-06-06` to `LongType` for 
partition column `part_date`
        at 
org.apache.spark.sql.execution.datasources.Spark3ParsePartitionUtil$.$anonfun$parsePartition$3(Spark3ParsePartitionUtil.scala:78)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108) 
{code}
 

 

Same code has been tested on 0.14.x, 0.15.x, 1.0.2. All of them have the same 
issue. 

 

The error is thrown by code that was introduced in 2022.


> Writes with CustomKeyGenerator with TIMESTAMP as one of KEY gens not working
> ----------------------------------------------------------------------------
>
>                 Key: HUDI-9519
>                 URL: https://issues.apache.org/jira/browse/HUDI-9519
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Voon Hou
>            Priority: Major
>
>  
> Using custom key generator throws an error when upsert is being performed:
> *Spark-SQL entrypoint:*
> {code:java}
> test("Create MOR table with custom keygen partition field") {
>   withTempDir { tmp =>
>     val tableName = "hudi_custom_keygen_pt_v8_mor"
>     spark.sql(
>       s"""
>          |CREATE TABLE $tableName (
>          |  id INT,
>          |  name STRING,
>          |  price DOUBLE,
>          |  ts LONG,
>          |  -- Partition Source Fields --
>          |  part_country STRING,
>          |  part_date BIGINT
>          |) USING hudi
>          | LOCATION '${tmp.getCanonicalPath}'
>          | TBLPROPERTIES (
>          |  primaryKey = 'id',
>          |  type = 'mor',
>          |  preCombineField = 'ts',
>          |  -- Hive style partitioning needs to be disabled for timestamp 
> keygen to work --
>          |  hoodie.datasource.write.hive_style_partitioning = 'false',
>          |  -- Timestamp Keygen and Partition Configs --
>          |  hoodie.table.keygenerator.class = 
> 'org.apache.hudi.keygen.CustomKeyGenerator',
>          |  hoodie.datasource.write.partitionpath.field = 
> 'part_country:simple,part_date:timestamp',
>          |  hoodie.keygen.timebased.timestamp.type = 'EPOCHMILLISECONDS',
>          |  hoodie.keygen.timebased.output.dateformat = 'yyyy-MM-dd',
>          |  hoodie.keygen.timebased.timezone = 'UTC'
>          | ) PARTITIONED BY (part_country, part_date)
>    """.stripMargin)
>     // RecordKey + partition
>     // Configure Hudi properties
>     spark.sql(s"SET hoodie.parquet.small.file.limit=0") // Write to a new 
> parquet file for each commit
>     spark.sql(s"SET hoodie.metadata.compact.max.delta.commits=1")
>     spark.sql(s"SET hoodie.metadata.enable=true")
>     spark.sql(s"SET hoodie.metadata.index.column.stats.enable=true")
>     // Insert data with new partition values
>     spark.sql(s"INSERT INTO $tableName VALUES(1, 'a1', 100.0, 1000, 'SG', 
> 1749284360000)")
>     spark.sql(s"INSERT INTO $tableName VALUES(2, 'a2', 200.0, 1000, 'SG', 
> 1749204000000)")
>     spark.sql(s"INSERT INTO $tableName VALUES(3, 'a3', 101.0, 1001, 'US', 
> 1749202000000)")
>     spark.sql(s"INSERT INTO $tableName VALUES(4, 'a4', 201.0, 1001, 'CN', 
> 1749102000000)")
>     spark.sql(s"INSERT INTO $tableName VALUES(5, 'a5', 300.0, 1002, 'MY', 
> 1747102000000)")
>     // Generate logs through updates
>     spark.sql(s"UPDATE $tableName SET price = ROUND(price * 1.02, 2)")
>     spark.sql(s"SELECT * FROM $tableName").show(false)
>   }
> }
> {code}
>  
> Datasource-write entrypoint:
> {code:java}
> test("Create MOR table with custom keygen partition field") {
>     withTempDir { tmp =>
>       val tablePath = tmp.getCanonicalPath
>       val tableName = "hudi_custom_keygen_pt_v8_mor"
>       val hudiOptions = Map[String, String](
>         HoodieWriteConfig.TBL_NAME.key -> tableName,
>         DataSourceWriteOptions.TABLE_TYPE.key -> "MERGE_ON_READ",
>         DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
>         DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
>         // Custom KeyGenerator and Partitioning Config
>         DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
> "org.apache.hudi.keygen.CustomKeyGenerator",
>         DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> 
> "part_country:SIMPLE,part_date:TIMESTAMP",
>         // Timestamp-based Partitioning Configs
>         "hoodie.keygen.timebased.timestamp.type" -> "EPOCHMILLISECONDS",
>         "hoodie.keygen.timebased.output.dateformat" -> "yyyy-MM-dd",
>         "hoodie.keygen.timebased.timezone" -> "UTC",
>         // Disable Hive-style partitioning for CustomKeyGenerator to format 
> the date partition
>         DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false",
>         // Other configs from the SQL SET commands
>         "hoodie.parquet.small.file.limit" -> "0",
>         "hoodie.metadata.compact.max.delta.commits" -> "1",
>         "hoodie.metadata.enable" -> "true",
>         "hoodie.metadata.index.column.stats.enable" -> "true"
>       )
>       spark.createDataFrame(Seq(
>         (1, "a1", 100.0, 1000L, "SG", 1749284360000L)
>       )).toDF("id", "name", "price", "ts", "part_country", "part_date").write
>         .format("hudi")
>         .options(hudiOptions)
>         .option(DataSourceWriteOptions.OPERATION.key, 
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>         .mode(SaveMode.Append)
>         .save(tablePath)
>       spark.createDataFrame(Seq(
>           (2, "a2", 200.0, 1000L, "SG", 1749204000000L)
>         )).toDF("id", "name", "price", "ts", "part_country", 
> "part_date").write
>         .format("hudi")
>         .options(hudiOptions)
>         .option(DataSourceWriteOptions.OPERATION.key, 
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>         .mode(SaveMode.Append)
>         .save(tablePath)
>       spark.createDataFrame(Seq(
>           (3, "a3", 101.0, 1001L, "US", 1749202000000L)
>         )).toDF("id", "name", "price", "ts", "part_country", 
> "part_date").write
>         .format("hudi")
>         .options(hudiOptions)
>         .option(DataSourceWriteOptions.OPERATION.key, 
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>         .mode(SaveMode.Append)
>         .save(tablePath)
>       spark.createDataFrame(Seq(
>           (4, "a4", 201.0, 1001L, "CN", 1749102000000L)
>         )).toDF("id", "name", "price", "ts", "part_country", 
> "part_date").write
>         .format("hudi")
>         .options(hudiOptions)
>         .option(DataSourceWriteOptions.OPERATION.key, 
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>         .mode(SaveMode.Append)
>         .save(tablePath)
>       spark.createDataFrame(Seq(
>           (5, "a5", 300.0, 1002L, "MY", 1747102000000L)
>         )).toDF("id", "name", "price", "ts", "part_country", 
> "part_date").write
>         .format("hudi")
>         .options(hudiOptions)
>         .option(DataSourceWriteOptions.OPERATION.key, 
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>         .mode(SaveMode.Append)
>         .save(tablePath)
>       // Read the current state of the table
>       val hudiTableDF = spark.read.format("hudi").load(tablePath)
>       // Apply the update logic
>       val updatedDF = hudiTableDF.withColumn("price", 
> functions.round(col("price") * 1.02, 2))
>       // Write the updated data back
>       updatedDF.write
>         .format("hudi")
>         .options(hudiOptions)
>         .option(DataSourceWriteOptions.OPERATION.key, 
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>         .mode(SaveMode.Append)
>         .save(tablePath)
>       val finalDF = spark.read.format("hudi").load(tablePath)
>       // Select relevant Hudi meta fields and all original columns to verify 
> the update
>       finalDF.select("_hoodie_commit_time", "id", "name", "price", "ts", 
> "part_country", "part_date")
>         .orderBy("id")
>         .show(false)
>     }
>   } {code}
>  
> Error:
> {code:java}
> Failed to cast value `2025-06-06` to `LongType` for partition column 
> `part_date`
> java.lang.RuntimeException: Failed to cast value `2025-06-06` to `LongType` 
> for partition column `part_date`
>       at 
> org.apache.spark.sql.execution.datasources.Spark3ParsePartitionUtil$.$anonfun$parsePartition$3(Spark3ParsePartitionUtil.scala:78)
>       at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>       at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>       at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>       at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>       at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:108) 
> {code}
>  
>  
> Same code has been tested on 0.12.x, 0.14.x, 0.15.x, 1.0.2. All of them have 
> the same issue. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to