Toroidals commented on issue #13000:
URL: https://github.com/apache/hudi/issues/13000#issuecomment-2738949582

   > I see for your test, the source records itselft is kind of random 
generated, for every batch of inputs, the arrival sequence is already 
different, maybe you should make the inputs as explicit sequence first.
   
   @danny0405  
   
   **Build test data:** :According to this logic, when writing to the topic, 
the final write order for each ID in Hudi should be version = 3, followed by 
version = 2, and finally version = 1.
   
   `public class FlinkKafkaProducerTest02 {
   
       public static Random random = new Random();
   
       public static void main(String[] args) throws Exception {
   
           ...
           KafkaProducer<String, String> producer = new 
KafkaProducer<>(properties);
   
           long tsMs1;
           try {
               for (int id = 0; id < 100000; id++) {
                   tsMs1 = System.currentTimeMillis();
                   String message1 = "{\"op\":\"c\",\"ts_ms\":" + tsMs1 + 
",\"before\":{\"ID\":\"" + (id - 1) + "\",\"VERSION\":1,\"CURRENT_ID\":\"" + id 
+ "\"},\"after\":{\"ID\":\"" + (id - 1) + "\",\"VERSION\":1,\"CURRENT_ID\":\"" 
+ id + 
"\"},\"source\":{\"schema\":\"db\",\"tenant\":\"FSSC\",\"db\":null,\"table\":\"table_name_01\"},\"transaction\":null}";
                   String message2 = "{\"op\":\"u\",\"ts_ms\":" + tsMs1 + 
",\"before\":{\"ID\":\"" + id + "\",\"VERSION\":2,\"CURRENT_ID\":\"" + id + 
"\"},\"after\":{\"ID\":\"" + id + "\",\"VERSION\":2,\"CURRENT_ID\":\"" + id + 
"\"},\"source\":{\"schema\":\"db\",\"tenant\":\"FSSC\",\"db\":null,\"table\":\"table_name_01\"},\"transaction\":null}";
                   String message3 = "{\"op\":\"u\",\"ts_ms\":" + tsMs1 + 
",\"before\":{\"ID\":\"" + (id + 1) + "\",\"VERSION\":3,\"CURRENT_ID\":\"" + id 
+ "\"},\"after\":{\"ID\":\"" + (id + 1) + "\",\"VERSION\":3,\"CURRENT_ID\":\"" 
+ id + 
"\"},\"source\":{\"schema\":\"db\",\"tenant\":\"FSSC\",\"db\":null,\"table\":\"table_name_01\"},\"transaction\":null}";
   
                   ProducerRecord<String, String> record1 = new 
ProducerRecord<>(topicName, "key-", message1);
                   ProducerRecord<String, String> record2 = new 
ProducerRecord<>(topicName, "key-", message2);
                   ProducerRecord<String, String> record3 = new 
ProducerRecord<>(topicName, "key-", message3);
   
                   producer.send(record1);
                   producer.send(record2);
                   producer.send(record3);
                   producer.flush();
                   Thread.sleep(random.nextInt(5) * 1000);
               }
           } catch (Exception e) {
               e.printStackTrace();
               producer.close();
           }
       }
   }`
   
   **hudi sink**
   `HoodiePipeline.Builder builder = 
HoodiePipeline.builder(infoMap.get("hudi_table_name"));
   Map<String, String> options = new HashMap<>();
   options.put(FlinkOptions.DATABASE_NAME.key(), 
infoMap.get("hudi_database_name"));
   options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name"));
   options.put(FlinkOptions.PATH.key(), infoMap.get("hudi_hdfs_path"));
   options.put("catalog.path", "hdfs:///apps/hudi/catalog/");
   String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
   ArrayList<ArrayList> fieldList = JSON.parseObject(hudiFieldMap, new 
TypeReference<ArrayList<ArrayList>>() {
   });
   log.info("fieldList: {}", fieldList.toString());
   for (ArrayList columnList : fieldList) {
   builder.column("" + columnList.get(0) + " " + columnList.get(1));
   }
   
   builder.pk("id");
   options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts_ms");
   options.put(FlinkOptions.PRE_COMBINE.key(), "true");
   options.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), 
EventTimeAvroPayload.class.getName());
   options.put(FlinkOptions.RECORD_MERGER_IMPLS.key(), 
OverwriteWithLatestMerger.class.getName());
   
   options.put(FlinkOptions.TABLE_TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name());
   options.put(FlinkOptions.INDEX_TYPE.key(), 
HoodieIndex.IndexType.BUCKET.name());
   options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "16");
   options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), "SIMPLE");
   
   options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), "num_or_time");
   options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "10");
   options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), "30");
   options.put(FlinkOptions.COMPACTION_MAX_MEMORY.key(), "100");
   
   options.put(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), "true");
   options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "150");
   options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
   options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
   options.put(FlinkOptions.HIVE_SYNC_DB.key(), "hudi");
   options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), "mor_test_01");
   options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
   options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), 
"thrift://xx01:9083,thrift://xx02:9083,thrift://xx03:9083");
   options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), 
"jdbc:hive2://xx01:21181,xx02:21181,xx03:21181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2");
   options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
   options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");
   
   options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "part_dt");
   options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "part_dt");
   
   options.put(FlinkOptions.WRITE_RATE_LIMIT.key(), "20000");
   
   options.put(FlinkOptions.WRITE_TASKS.key(), 8);
   
   options.put(FlinkOptions.OPERATION.key(), WriteOperationType.UPSERT.value());
   
   builder.options(options);
   return builder;`
   
   **Query result of Table ods_rbs_hudi_mor_part_test_02_01 :**:
    select 
id,CURRENT_ID,version,`_flink_cdc_op`,`_flink_cdc_ts_ms`,`_flink_cdc_table` 
from ods_rbs.ods_rbs_hudi_mor_part_test_02_01 where version not in (1);
   
![Image](https://github.com/user-attachments/assets/55e7f08e-7f26-4ffd-8e69-828800ad7e30)
   
   **Query result of Table ods_rbs_hudi_mor_part_test_02_02:**:
   
   select 
id,CURRENT_ID,version,`_flink_cdc_op`,`_flink_cdc_ts_ms`,`_flink_cdc_table` 
from ods_rbs.ods_rbs_hudi_mor_part_test_02_02 where version not in (1);
   
   
![Image](https://github.com/user-attachments/assets/9fa9010f-5d71-4d91-8286-0c541dedb425)
   
   The expected result is that version = 1 data should not appear in the Hudi 
table (except for the last two records). It is evident that the records with id 
= 56 and id = 57 in the ods_rbs_hudi_mor_part_test_02_01 table are abnormal.
   
   The query result for id = 56 and id = 57 in the 
ods_rbs_hudi_mor_part_test_02_02 table is as follows:
   
   
![Image](https://github.com/user-attachments/assets/28ae1db2-e783-460d-b5fc-d1e36630a91e)
   This is the expected result. The filtered data from the topic for id=56 and 
id=57 are as follows:
   
   
![Image](https://github.com/user-attachments/assets/67c71654-1eb9-41ce-818b-60943099cba7)
   Both tasks are running normally.
   
   
![Image](https://github.com/user-attachments/assets/90ecc5c8-ba6e-434b-88ab-54a841ae7c4f)
   
   
![Image](https://github.com/user-attachments/assets/bf042ce1-521a-4c17-a216-27996fcdb568)
   
   **Stacktrace of ods_rbs_hudi_mor_part_test_02_01:**
   https://gist.github.com/Toroidals/2b64f7bf1b42cc58e98bc8129848cf04
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to