cbg-wx opened a new issue, #17642:
URL: https://github.com/apache/hudi/issues/17642

   ### Bug Description
   
   **What happened:**Canal collects data from MySQL and sends it to Kafka. The 
order of the arriving data is inconsistent with the actual production data 
order. This data contains delete records. Although the EventTimePayload is 
used, the data is not correctly sorted according to the preCombine field after 
being consumed by Flink and written to Hudi.
   
   **What you expected:**Based on the EventTimePayload, Flink writes data to 
Hudi to correctly sort out-of-order data according to the preCombine fields.
   
   **Steps to reproduce:**
   **1.simple data:**
   CREATE TABLE IF NOT EXISTS prodcuts (
     id BIGINT,
     name STRING,
     classify STRING,
     description STRING,
     weight DECIMAL(7,2),
     ts STRING,
     _hoodie_is_deleted BOOLEAN
   ) USING hudi
   TBLPROPERTIES (
     type = 'mor',
     primaryKey = 'id',
     precombineField = 'ts',
     hoodie.index.type='BUCKET',
     hoodie.bucket.index.num.buckets='4',
     
'hoodie.compaction.payload.class'='org.apache.hudi.common.model.EventTimeAvroPayload'
   );
   
   Product('1','apple','north fruit','98.8','2025-12-02 
08:30:08.453900','false','20251127')//insert record
   Product('1','apple','north fruit','98.8','2025-10-28 
16:31:11.453954','true','20251127')//delete record
   
   **2.flinksql sink mor:**
   CREATE TABLE prodcuts(
     id BIGINT,
     name STRING,
     classify STRING,
     description STRING,
     weight DECIMAL(7,2),
     ts STRING,
     _hoodie_is_deleted BOOLEAN
   ) WITH (
       'connector' = 'hudi',
       'path' = '/User/chenbengang/workspace/hudi-demo/products',
       'table.type' = 'MERGE_ON_READ',
       'write.operation'='upsert',
       'index.type' = 'BUCKET',
       'hoodie.bucket.index.num.buckets'='4',
       'hoodie.datasource.write.recordkey.field'='id',
       'hoodie.datasource.write.partitionpath.field'='classify',
       'preCombine.field'='ts',
       'payload.class' = 'org.apache.hudi.common.model.EventTimeAvroPayload',
       'write.precombine'='true',
       'compaction.schedule.enabled'='true',
       'compaction.async.enabled'='true'
   );
   sparksql query: select * from prodcuts;
   
   **3.problem description:**
   **Scene 1:**when turn off the compaction make insert record(first arrive) 
and delete record(second arrive) in 
log,HoodieMergedLogRecordScanner#processNextDeletedRecord insert record class 
is Utf8,delete record class is String.
   **Scene 2:**when turn off the compaction make insert record(first arrive) 
and delete record(second arrive) in log,EventTimeAvroPayload#preCombine delete 
record class is String,delete record class is Utf8.
   **Scene 3:**when turn on the compaction make insert record(first arrive) in 
parquet and delete record(second arrive) in 
log,EventTimeAvroPayload#combineAndGetUpdateValue delete record class is 
Utf8,delete record class is Sting.
   
   
   ### Environment
   
   **Hudi version:**hudi-0.14.1、hudi-0.15.0、hudi-1.0.2
   **Query engine:** Spark-3.4.2
   **Relevant configs:**
   
   
   ### Logs and Stack Trace
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to