15663671003 opened a new issue, #8287:
URL: https://github.com/apache/hudi/issues/8287

   I implemented a custom payload based on HoudieRecordPayload, but there were 
problems. When I use incremental queries, record_ "Time is the value of the 
incremental payload (incorrect). When running a snapshot query, record"_ Time 
is an old value (correct), which does not meet my expectations. Does the 
payload obtained by incremental queries differ from the snapshot query results? 
Please help me
   ```java
    /* Omitted content */
   
   public class CustomPayload extends OverwriteWithLatestAvroPayload {
     /* Omitted content */
   
     @Override
     public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
       if (recordBytes.length == 0) {
         return Option.empty();
       }
   
       GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, 
schema);
       if (!needUpdatingPersistedRecord(currentValue, incomingRecord, 
properties)) {
         return Option.of(currentValue);
       }
   
       /*custom code*/
       if (((GenericRecord) currentValue).get("record_time") != null) {
         incomingRecord.put("record_time", ((GenericRecord) 
currentValue).get("record_time"));
       }
   
       eventTime = updateEventTime(incomingRecord, properties);
       return isDeleteRecord(incomingRecord) ? Option.empty() : 
Option.of(incomingRecord);
     }
   
    /* Omitted content */
   
     protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
                                                   IndexedRecord 
incomingRecord, Properties properties) {
   
        /* Omitted content */
       
       return (((Comparable) 
persistedOrderingVal).compareTo(incomingOrderingVal) < 0) && (
               ((GenericRecord) currentValue).get("valid").equals(true) || 
((GenericRecord) incomingRecord).get("valid").equals(true)) && (
                       ((GenericRecord) currentValue).get("content_md5") == null
                               || !((GenericRecord) 
currentValue).get("content_md5").equals(((GenericRecord) 
incomingRecord).get("content_md5"))
       );
     }
   
   }
   
   ```
   **Expected behavior**
   ```shell
   >>> 
spark.read.format("hudi").load("*****").filter("*********").show(truncate=False)
   
+-------------------+----------------------------+----------------------------------------------------------------
   
+----------------------+------------------------------------------------------------------------+----
                                 
   |_hoodie_commit_time|_hoodie_commit_seqno        |_hoodie_record_key         
                                     
   |_hoodie_partition_path|_hoodie_file_name                                    
                   
   |valid|update_time        |record_time        |content_md5                   
  
   |id                                                                          
                                                    |
   
+-------------------+----------------------------+----------------------------------------------------------------
   
+----------------------+------------------------------------------------------------------------+----
                                 
   |20230324152704225  
|20230324152704225_13_2563860|7306da3dd0c41ff504447981c4e850949db69524154c0c5bf85e62758babf3cc
   |                      |00000013-ea30-4f9d-9704-e4f82fceb940-0               
                   |
   false|2023-03-24 14:48:09|2023-03-01 
21:40:42|df3c2a9f8eaf5b8eec26b363cc67003f
   |7306da3dd0c41ff504447981c4e850949db69524154c0c5bf85e62758babf3cc|
   
   
   
   
   >>> df = 
spark.read.format("hudi").options(**{'hoodie.datasource.query.type': 
"incremental", "hoodie.datasource.read.begin.instanttime": 
'20230324151944584'}).load("******")
   >>> 
spark.read.format("hudi").load("*****").filter("*********").show(truncate=False)
   
+-------------------+----------------------------+----------------------------------------------------------------
   
+----------------------+------------------------------------------------------------------------+----
                                 
   |_hoodie_commit_time|_hoodie_commit_seqno        |_hoodie_record_key         
                                     
   |_hoodie_partition_path|_hoodie_file_name                                    
                   
   |valid|update_time        |record_time        |content_md5                   
  
   |id                                                                          
                                                    |
   
+-------------------+----------------------------+----------------------------------------------------------------
   
+----------------------+------------------------------------------------------------------------+----
                                 
   |20230324152704225  
|20230324152704225_13_2563860|7306da3dd0c41ff504447981c4e850949db69524154c0c5bf85e62758babf3cc
   |                      |00000013-ea30-4f9d-9704-e4f82fceb940-0               
                   |
   false|2023-03-24 14:48:09|2023-03-24 
14:48:09|df3c2a9f8eaf5b8eec26b363cc67003f
   |7306da3dd0c41ff504447981c4e850949db69524154c0c5bf85e62758babf3cc|           
                                                 
   
   ```
   
   I read kafka from spark structured streaming and write it to hudi. There are 
two time fields, update_ Time and record_ Time. By default, the values of the 
two time fields are the same. Combined with the user-defined payload, it is 
implemented to determine whether to update. If updated, the persistent payload 
record is taken away_ Time, I think in incremental queries, record_ The time 
should be the same as the snapshot query, but the results are different. Why is 
this, pls help me
   
   
   
   **Environment Description**
   
   * Hudi version : 0.12.1
   
   * Spark version : 3.2.2
   
   * Hive version : 2.1.1
   
   * Hadoop version : 3.0.0
   
   * Storage (HDFS/S3/GCS..) : HDFS
   
   * Running on Docker? (yes/no) : no
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to