praneethh opened a new issue, #7335: URL: https://github.com/apache/hudi/issues/7335
Whenever there are late arriving records with lesser precombine field than an existing record which has greater precombine field the new record should be discarded. It should not upsert he current existing record. Also, the record has to be unique across all the partitions Steps to reproduce the behavior: 1) Preparing the data ``` scala> val df1 = df.filter($"rcvd_dt"==="2022-12-03") df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ts: string, name: string ... 3 more fields] scala> df1.show +--------------------+-----------+-------------------+----------+----------+ | ts | name| email|src_rcv_dt| rcvd_dt| +--------------------+-----------+-------------------+----------+----------+ |2022-12-02T09:47:...|Fake Name 5|[email protected]|2022-12-02|2022-12-03| |2022-11-29T09:47:...|Fake Name 4|[email protected]|2022-11-29|2022-12-03| +--------------------+-----------+-------------------+----------+----------+ ``` 2) Initial load ``` df1.write.format("hudi").options( Map("hoodie.table.name"-> "pharpan_poc", "hoodie.datasource.write.recordkey.field"-> "name", "hoodie.datasource.write.partitionpath.field"-> "src_rcv_dt", "hoodie.datasource.write.operation"-> "upsert", "hoodie.datasource.write.precombine.field"-> "ts", "hoodie.payload.ordering.field" -> "ts", "hoodie.index.type"-> "GLOBAL_SIMPLE", "hoodie.upsert.shuffle.parallelism"-> "1", "hoodie.simple.index.update.partition.path"-> "true", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.write.payload.class" -> "org.apache.hudi.common.model.DefaultHoodieRecordPayload", "hoodie.datasource.write.table.type" -> "MERGE_ON_READ")) .mode("append") .save("gs://xxxxxxx/pharpan/hudi_sample_output_data3") ``` 3) Initial load output ``` scala> spark.read.format("hudi").load("gs://xxxxxxxx/pharpan/hudi_sample_output_data3").show() +-------------------+--------------------+------------------+----------------------+--------------------+--------------------+-----------+-------------------+----------+----------+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| ts| name| email|src_rcv_dt| rcvd_dt| +-------------------+--------------------+------------------+----------------------+--------------------+--------------------+-----------+-------------------+----------+----------+ | 20221129014347955|20221129014347955...| Fake Name 5| src_rcv_dt=2022-1...|ca15b408-72d2-450...|2022-12-02T09:47:...|Fake Name 5|[email protected]|2022-12-02|2022-12-03| | 20221129014347955|20221129014347955...| Fake Name 4| src_rcv_dt=2022-1...|04d66082-4f5e-443...|2022-11-29T09:47:...|Fake Name 4|[email protected]|2022-11-29|2022-12-03| +-------------------+--------------------+------------------+----------------------+--------------------+--------------------+-----------+-------------------+----------+----------+ ``` 4) Load new set of data. Here, I'm loading again the same record key of Fake Name 4 with older ts value as 2022-11-28T09:47 and also the src_rcv_dt as 2022-11-28. Now, since the ts value is less than the current record of Fake Name 4 which has 2022-11-29T09:47 it should skip updating. However, it's updating. ``` scala> df1.show +--------------------+-----------+-------------------+----------+----------+ | ts| name| email|src_rcv_dt| rcvd_dt| +--------------------+-----------+-------------------+----------+----------+ |2022-12-04T09:47:...|Fake Name 6|fakename56email.com|2022-12-06|2022-12-05| |2022-11-28T09:47:...|Fake Name 4|[email protected]|2022-11-28|2022-12-05| +--------------------+-----------+-------------------+----------+----------+ ``` 5) Output: ``` df1.write.format("hudi").options(Map("hoodie.table.name"-> "pharpan_poc", "hoodie.datasource.write.recordkey.field"-> "name", "hoodie.datasource.write.partitionpath.field"-> "src_rcv_dt", "hoodie.datasource.write.operation"-> "upsert", "hoodie.payload.ordering.field" -> "ts", "hoodie.index.type"-> "GLOBAL_SIMPLE", "hoodie.upsert.shuffle.parallelism"-> "1", "hoodie.simple.index.update.partition.path"-> "true", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.write.payload.class" -> "org.apache.hudi.common.model.DefaultHoodieRecordPayload", "hoodie.datasource.write.table.type" -> "MERGE_ON_READ")) .mode("append").save("gs://xxxxx/pharpan/hudi_sample_output_data3") scala> spark.read.format("hudi").load("gs://xxxxxx/pharpan/hudi_sample_output_data3").show() +-------------------+--------------------+------------------+----------------------+--------------------+--------------------+-----------+-------------------+----------+----------+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| ts| name| email|src_rcv_dt| rcvd_dt| +-------------------+--------------------+------------------+----------------------+--------------------+--------------------+-----------+-------------------+----------+----------+ | 20221129014347955|20221129014347955...| Fake Name 5| src_rcv_dt=2022-1...|ca15b408-72d2-450...|2022-12-02T09:47:...|Fake Name 5|[email protected]|2022-12-02|2022-12-03| | 20221129014819701|20221129014819701...| Fake Name 6| src_rcv_dt=2022-1...|efb71bab-2c6b-4df...|2022-12-04T09:47:...|Fake Name 6|fakename56email.com|2022-12-06|2022-12-05| | 20221129014819701|20221129014819701...| Fake Name 4| src_rcv_dt=2022-1...|72aaefe5-3526-4a9...|2022-11-28T09:47:...|Fake Name 4|[email protected]|2022-11-28|2022-12-05| +-------------------+--------------------+------------------+----------------------+--------------------+--------------------+-----------+-------------------+----------+----------+ ``` In the above output it inserted new record key of Fake Name 6, however it updated the Fake Name 4 record which shouldn't happen as the timestamp is lower. **Environment Description** * Hudi version : 0.12.0 * Spark version : 3.1 * Storage (HDFS/S3/GCS..) : GCS * Running on Docker? (yes/no) :no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
