15663671003 opened a new issue, #9869:
URL: https://github.com/apache/hudi/issues/9869

   I implemented a custom payload based on HoodieRecordPayload.java, but there 
were problems. When I use incremental queries, record_time is the value of the 
incremental payload (incorrect). When running a snapshot query, record_time is 
an old value (correct), which does not meet my expectations. Does the payload 
obtained by incremental queries differ from the snapshot query results? Please 
help me
   ```java
    /* Omitted content */
   
   public class CustomPayload extends OverwriteWithLatestAvroPayload {
     /* Omitted content */
   
     @Override
     public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
       if (recordBytes.length == 0) {
         return Option.empty();
       }
   
       GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, 
schema);
       if (!needUpdatingPersistedRecord(currentValue, incomingRecord, 
properties)) {
         return Option.of(currentValue);
       }
   
       /*custom code*/
       if (((GenericRecord) currentValue).get("record_time") != null) {
         incomingRecord.put("record_time", ((GenericRecord) 
currentValue).get("record_time"));
       }
   
       eventTime = updateEventTime(incomingRecord, properties);
       return isDeleteRecord(incomingRecord) ? Option.empty() : 
Option.of(incomingRecord);
     }
   
    /* Omitted content */
   
     protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
                                                   IndexedRecord 
incomingRecord, Properties properties) {
   
        /* Omitted content */
       
       return (((Comparable) 
persistedOrderingVal).compareTo(incomingOrderingVal) < 0) && (
               ((GenericRecord) currentValue).get("valid").equals(true) || 
((GenericRecord) incomingRecord).get("valid").equals(true)) && (
                       ((GenericRecord) currentValue).get("content_md5") == null
                               || !((GenericRecord) 
currentValue).get("content_md5").equals(((GenericRecord) 
incomingRecord).get("content_md5"))
       );
     }
   
   }
   
   ```
   **Expected behavior**
   snapshot query
   ```shell
   >>> 
spark.read.format("hudi").load("*****").filter("*********").show(truncate=False)
   
+-------------------+----------------------------+----------------------------------------------------------------
   
+----------------------+------------------------------------------------------------------------+----
                                 
   |_hoodie_commit_time|_hoodie_commit_seqno        |_hoodie_record_key         
                                     
   |_hoodie_partition_path|_hoodie_file_name                                    
                   
   |valid|update_time        |record_time        |content_md5                   
  
   |id                                                                          
                                                    |
   
+-------------------+----------------------------+----------------------------------------------------------------
   
+----------------------+------------------------------------------------------------------------+----
                                 
   |20230324152704225  
|20230324152704225_13_2563860|7306da3dd0c41ff504447981c4e850949db69524154c0c5bf85e62758babf3cc
   |                      |00000013-ea30-4f9d-9704-e4f82fceb940-0               
                   |
   false|2023-03-24 14:48:09|2023-03-01 
21:40:42|df3c2a9f8eaf5b8eec26b363cc67003f
   |7306da3dd0c41ff504447981c4e850949db69524154c0c5bf85e62758babf3cc|           
                                       
   ```
   incremental query
   ```shell
   >>> df = 
spark.read.format("hudi").options(**{'hoodie.datasource.query.type': 
"incremental", "hoodie.datasource.read.begin.instanttime": 
'20230324151944584'}).load("******")
   >>> df.filter("*********").show(truncate=False)
   
+-------------------+----------------------------+----------------------------------------------------------------
   
+----------------------+------------------------------------------------------------------------+----
                                 
   |_hoodie_commit_time|_hoodie_commit_seqno        |_hoodie_record_key         
                                     
   |_hoodie_partition_path|_hoodie_file_name                                    
                   
   |valid|update_time        |record_time        |content_md5                   
  
   |id                                                                          
                                                    |
   
+-------------------+----------------------------+----------------------------------------------------------------
   
+----------------------+------------------------------------------------------------------------+----
                                 
   |20230324152704225  
|20230324152704225_13_2563860|7306da3dd0c41ff504447981c4e850949db69524154c0c5bf85e62758babf3cc
   |                      |00000013-ea30-4f9d-9704-e4f82fceb940-0               
                   |
   false|2023-03-24 14:48:09|2023-03-24 
14:48:09|df3c2a9f8eaf5b8eec26b363cc67003f
   |7306da3dd0c41ff504447981c4e850949db69524154c0c5bf85e62758babf3cc|          
   ```
   
   I read kafka from spark structured streaming,  write it to hudi use 
foreachBatch. There are two time fields, **update_time** and **record_time**. 
By default, the values of the two time fields are the same. Combined with the 
custom payload, it is implemented to determine whether to update. If updated, 
the persistent payload record_time overwrite new payload , I think in 
incremental queries, record_time should be the same as the snapshot query, but 
the result is different from what I expected. Why is this, pls help me
   
   
   
   **Environment Description**
   
   * Hudi version : 0.12.1
   
   * Spark version : 3.2.2
   
   * Hive version : 2.1.1
   
   * Hadoop version : 3.0.0
   
   * Storage (HDFS/S3/GCS..) : HDFS
   
   * Running on Docker? (yes/no) : no
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to