praneethh opened a new issue, #7335:
URL: https://github.com/apache/hudi/issues/7335

   Whenever there are late arriving records with lesser precombine field than 
an existing record which has greater precombine field the new record should be 
discarded. It should not upsert he current existing record. Also, the record 
has to be unique across all the partitions
   
   Steps to reproduce the behavior:
   
   1) Preparing the data
   
   ```
   scala> val df1 = df.filter($"rcvd_dt"==="2022-12-03")
   df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ts: string, 
name: string ... 3 more fields]
   
   scala> df1.show
   +--------------------+-----------+-------------------+----------+----------+
   |                  ts |        name|              email|src_rcv_dt|   
rcvd_dt|
   +--------------------+-----------+-------------------+----------+----------+
   |2022-12-02T09:47:...|Fake Name 5|[email protected]|2022-12-02|2022-12-03|
   |2022-11-29T09:47:...|Fake Name 4|[email protected]|2022-11-29|2022-12-03|
   +--------------------+-----------+-------------------+----------+----------+
   ```
   2) Initial load
   
   ```
   df1.write.format("hudi").options(
   Map("hoodie.table.name"-> "pharpan_poc",
   "hoodie.datasource.write.recordkey.field"-> "name",
   "hoodie.datasource.write.partitionpath.field"-> "src_rcv_dt",
   "hoodie.datasource.write.operation"-> "upsert",
   "hoodie.datasource.write.precombine.field"-> "ts",
   "hoodie.payload.ordering.field" -> "ts", 
   "hoodie.index.type"-> "GLOBAL_SIMPLE",
   "hoodie.upsert.shuffle.parallelism"-> "1",
   "hoodie.simple.index.update.partition.path"-> "true",
   "hoodie.datasource.write.hive_style_partitioning" -> "true",
   "hoodie.datasource.write.payload.class" -> 
"org.apache.hudi.common.model.DefaultHoodieRecordPayload", 
"hoodie.datasource.write.table.type" -> "MERGE_ON_READ"))
   .mode("append")
   .save("gs://xxxxxxx/pharpan/hudi_sample_output_data3")
   ```
   
   3) Initial load output
   
   ```
   scala> 
spark.read.format("hudi").load("gs://xxxxxxxx/pharpan/hudi_sample_output_data3").show()
   
+-------------------+--------------------+------------------+----------------------+--------------------+--------------------+-----------+-------------------+----------+----------+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|                  ts|       name|              
email|src_rcv_dt|   rcvd_dt|
   
+-------------------+--------------------+------------------+----------------------+--------------------+--------------------+-----------+-------------------+----------+----------+
   |  20221129014347955|20221129014347955...|       Fake Name 5|  
src_rcv_dt=2022-1...|ca15b408-72d2-450...|2022-12-02T09:47:...|Fake Name 
5|[email protected]|2022-12-02|2022-12-03|
   |  20221129014347955|20221129014347955...|       Fake Name 4|  
src_rcv_dt=2022-1...|04d66082-4f5e-443...|2022-11-29T09:47:...|Fake Name 
4|[email protected]|2022-11-29|2022-12-03|
   
+-------------------+--------------------+------------------+----------------------+--------------------+--------------------+-----------+-------------------+----------+----------+
   
   ```
   
   4) Load new set of data. Here, I'm loading again the same record key of  
Fake Name 4 with older ts value as 2022-11-28T09:47 and also the src_rcv_dt as 
2022-11-28. Now, since the ts value is less than the current record of Fake 
Name 4 which has 2022-11-29T09:47 it should skip updating. However, it's 
updating. 
   
   ```
   scala> df1.show
   +--------------------+-----------+-------------------+----------+----------+
   |                  ts|       name|              email|src_rcv_dt|   rcvd_dt|
   +--------------------+-----------+-------------------+----------+----------+
   |2022-12-04T09:47:...|Fake Name 6|fakename56email.com|2022-12-06|2022-12-05|
   |2022-11-28T09:47:...|Fake Name 4|[email protected]|2022-11-28|2022-12-05|
   +--------------------+-----------+-------------------+----------+----------+
   
   ```
   
   5) Output:
   
   ```
   df1.write.format("hudi").options(Map("hoodie.table.name"-> "pharpan_poc",
   "hoodie.datasource.write.recordkey.field"-> "name",
   "hoodie.datasource.write.partitionpath.field"-> "src_rcv_dt",
   "hoodie.datasource.write.operation"-> "upsert",
   "hoodie.payload.ordering.field" -> "ts",
   "hoodie.index.type"-> "GLOBAL_SIMPLE",
   "hoodie.upsert.shuffle.parallelism"-> "1",
   "hoodie.simple.index.update.partition.path"-> "true",
   "hoodie.datasource.write.hive_style_partitioning" -> "true",
   "hoodie.datasource.write.payload.class" -> 
"org.apache.hudi.common.model.DefaultHoodieRecordPayload", 
"hoodie.datasource.write.table.type" -> "MERGE_ON_READ"))
   .mode("append").save("gs://xxxxx/pharpan/hudi_sample_output_data3")
   
   
   
   scala> 
spark.read.format("hudi").load("gs://xxxxxx/pharpan/hudi_sample_output_data3").show()
   
+-------------------+--------------------+------------------+----------------------+--------------------+--------------------+-----------+-------------------+----------+----------+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|                  ts|       name|              
email|src_rcv_dt|   rcvd_dt|
   
+-------------------+--------------------+------------------+----------------------+--------------------+--------------------+-----------+-------------------+----------+----------+
   |  20221129014347955|20221129014347955...|       Fake Name 5|  
src_rcv_dt=2022-1...|ca15b408-72d2-450...|2022-12-02T09:47:...|Fake Name 
5|[email protected]|2022-12-02|2022-12-03|
   |  20221129014819701|20221129014819701...|       Fake Name 6|  
src_rcv_dt=2022-1...|efb71bab-2c6b-4df...|2022-12-04T09:47:...|Fake Name 
6|fakename56email.com|2022-12-06|2022-12-05|
   |  20221129014819701|20221129014819701...|       Fake Name 4|  
src_rcv_dt=2022-1...|72aaefe5-3526-4a9...|2022-11-28T09:47:...|Fake Name 
4|[email protected]|2022-11-28|2022-12-05|
   
+-------------------+--------------------+------------------+----------------------+--------------------+--------------------+-----------+-------------------+----------+----------+
   
   ```
   
   In the above output it inserted new record key of Fake Name 6, however it 
updated the Fake Name 4 record which shouldn't happen as the timestamp is lower.
   
   **Environment Description**
   
   * Hudi version : 0.12.0
   
   * Spark version : 3.1
   
   
   * Storage (HDFS/S3/GCS..) : GCS
   
   * Running on Docker? (yes/no) :no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to