Toroidals opened a new issue, #13000: URL: https://github.com/apache/hudi/issues/13000
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? y - Join the mailing list to engage in conversations and get faster support at [email protected]. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** Primary Key: id Precombine Field: ts_ms (a 13-digit timestamp in milliseconds) options.put(FlinkOptions.PRE_COMBINE.key(), "true"); options.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), EventTimeAvroPayload.class.getName()); options.put(FlinkOptions.RECORD_MERGER_IMPLS.key(), HoodieAvroRecordMerger.class.getName()); Scenario 1: When using Flink to write to a Hudi MOR table, if a batch of new records with the same primary key (id) is submitted (where this primary key does not yet exist in the Hudi table), Hudi merges the records based on the ts_ms value. If multiple records have the same ts_ms, the merge follows the order in which the records arrived. This behavior is as expected. Scenario 2: When using Flink to write to a Hudi MOR table, if a batch of records with the same primary key is submitted (where this primary key already exists in the Hudi table), the result shows that Hudi randomly selects one of the records. This is not the expected outcome. Issue Description: When multiple records with the same primary key (id) are submitted in a batch, and the Hudi table already contains a record with this primary key, the merge process does not strictly follow the order based on ts_ms values or message arrival sequence. Instead, it appears to randomly select one record (which could be either a newly inserted record or an existing one). flink conf: HoodiePipeline.Builder builder = HoodiePipeline.builder(infoMap.get("hudi_table_name")); Map<String, String> options = new HashMap<>(); options.put(FlinkOptions.DATABASE_NAME.key(), infoMap.get("hudi_database_name")); options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name")); options.put(FlinkOptions.PATH.key(), infoMap.get("hudi_hdfs_path")); options.put("catalog.path", "hdfs:///apps/hudi/catalog/"); String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT); ArrayList<ArrayList<String>> fieldList = JSON.parseObject(hudiFieldMap, new TypeReference<ArrayList<ArrayList<String>>>() { }); log.info("fieldList: {}", fieldList.toString()); for (ArrayList<String> columnList : fieldList) { builder.column("`" + columnList.get(0) + "` " + columnList.get(1)); } String[] hudiPrimaryKeys = infoMap.get("hudi_primary_key").split(","); builder.pk(hudiPrimaryKeys); options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts_ms"); **options.put(FlinkOptions.PRE_COMBINE.key(), "true"); options.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), EventTimeAvroPayload.class.getName()); options.put(FlinkOptions.RECORD_MERGER_IMPLS.key(), HoodieAvroRecordMerger.class.getName());** options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name()); options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), infoMap.get("hudi_bucket_index_num_buckets")); options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), infoMap.get("hudi_bucket_index_engine_type")); options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), infoMap.get("hudi_compaction_trigger_strategy")); options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), infoMap.get("hudi_compaction_delta_commits")); options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), infoMap.get("hudi_compaction_delta_seconds")); options.put(FlinkOptions.COMPACTION_MAX_MEMORY.key(), infoMap.get("hudi_compaction_max_memory")); options.put(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), "true"); options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "150"); options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true"); options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms"); options.put(FlinkOptions.HIVE_SYNC_DB.key(), "hudi"); options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), "mor_test_01"); options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf"); options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), "thrift://xx01:9083,thrift://xx02:9083,thrift://xx03:9083"); options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), "jdbc:hive2://xx01:21181,xx02:21181,xx03:21181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2"); options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true"); options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true"); options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "part_dt"); options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "part_dt"); options.put(FlinkOptions.WRITE_RATE_LIMIT.key(), "20000"); options.put(FlinkOptions.WRITE_TASKS.key(), 8); options.put(FlinkOptions.OPERATION.key(), WriteOperationType.UPSERT.value()); builder.options(options); return builder; **To Reproduce** Steps to reproduce the behavior: 1.Build test data: package com.hand.kafka.application; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.ArrayList; import java.util.Collections; import java.util.Properties; import java.util.Random; import java.util.concurrent.Future; public class FlinkKafkaProducerTest { public static Random random = new Random(); public static void main(String[] args) throws Exception { ... KafkaProducer<String, String> producer = new KafkaProducer<>(properties); Integer id = 1; Long tsMs1 = 1741022687061L; Long tsMs2 = 1741022687062L; Long tsMs4 = 1741022687050L; Long tsMs5 = 1741022687000L; while (true) { String message1 = "{\"op\":\"c\",\"ts_ms\":" + tsMs1 + ",\"before\":{\"ID\":\"" + id + "\",\"VERSION\":1,\"CURRENT_ID\":\"" + id + "\"},\"after\":{\"ID\":\"" + id + "\",\"VERSION\":1,\"CURRENT_ID\":\"" + id + "\"},\"source\":{\"schema\":\"db\",\"tenant\":\"FSSC\",\"db\":null,\"table\":\"table_name_01\"},\"transaction\":null}"; String message2 = "{\"op\":\"u\",\"ts_ms\":" + tsMs2 + ",\"before\":{\"ID\":\"" + id + "\",\"VERSION\":2,\"CURRENT_ID\":\"" + id + "\"},\"after\":{\"ID\":\"" + id + "\",\"VERSION\":2,\"CURRENT_ID\":\"" + id + "\"},\"source\":{\"schema\":\"db\",\"tenant\":\"FSSC\",\"db\":null,\"table\":\"table_name_01\"},\"transaction\":null}"; String message3 = "{\"op\":\"u\",\"ts_ms\":" + tsMs2 + ",\"before\":{\"ID\":\"" + id + "\",\"VERSION\":3,\"CURRENT_ID\":\"" + id + "\"},\"after\":{\"ID\":\"" + id + "\",\"VERSION\":3,\"CURRENT_ID\":\"" + id + "\"},\"source\":{\"schema\":\"db\",\"tenant\":\"FSSC\",\"db\":null,\"table\":\"table_name_01\"},\"transaction\":null}"; String message4 = "{\"op\":\"d\",\"ts_ms\":" + tsMs4 + ",\"before\":{\"ID\":\"" + id + "\",\"VERSION\":4,\"CURRENT_ID\":\"" + id + "\"},\"after\":{\"ID\":\"" + id + "\",\"VERSION\":4,\"CURRENT_ID\":\"" + id + "\"},\"source\":{\"schema\":\"db\",\"tenant\":\"FSSC\",\"db\":null,\"table\":\"table_name_01\"},\"transaction\":null}"; Integer id5 = getRightId(id); Integer id6 = getRightId(id); String message5 = "{\"op\":\"c\",\"ts_ms\":" + tsMs5 + ",\"before\":{\"ID\":\"" + id5 + "\",\"VERSION\":5,\"CURRENT_ID\":\"" + id + "\"},\"after\":{\"ID\":\"" + id5 + "\",\"VERSION\":5,\"CURRENT_ID\":\"" + id + "\"},\"source\":{\"schema\":\"db\",\"tenant\":\"FSSC\",\"db\":null,\"table\":\"table_name_01\"},\"transaction\":null}"; String message6 = "{\"op\":\"u\",\"ts_ms\":" + tsMs5 + ",\"before\":{\"ID\":\"" + id6 + "\",\"VERSION\":6,\"CURRENT_ID\":\"" + id + "\"},\"after\":{\"ID\":\"" + id6 + "\",\"VERSION\":6,\"CURRENT_ID\":\"" + id + "\"},\"source\":{\"schema\":\"db\",\"tenant\":\"FSSC\",\"db\":null,\"table\":\"table_name_01\"},\"transaction\":null}"; Integer id7 = getLeftId(id); Integer id8 = getLeftId(id); String message7 = "{\"op\":\"u\",\"ts_ms\":" + tsMs5 + ",\"before\":{\"ID\":\"" + id7 + "\",\"VERSION\":7,\"CURRENT_ID\":\"" + id + "\"},\"after\":{\"ID\":\"" + id7 + "\",\"VERSION\":7,\"CURRENT_ID\":\"" + id + "\"},\"source\":{\"schema\":\"db\",\"tenant\":\"FSSC\",\"db\":null,\"table\":\"table_name_01\"},\"transaction\":null}"; String message8 = "{\"op\":\"d\",\"ts_ms\":" + tsMs5 + ",\"before\":{\"ID\":\"" + id8 + "\",\"VERSION\":8,\"CURRENT_ID\":\"" + id + "\"},\"after\":{\"ID\":\"" + id8 + "\",\"VERSION\":8,\"CURRENT_ID\":\"" + id + "\"},\"source\":{\"schema\":\"db\",\"tenant\":\"FSSC\",\"db\":null,\"table\":\"table_name_01\"},\"transaction\":null}"; ArrayList<ProducerRecord<String, String>> producerRecords = new ArrayList<>(); producerRecords.add(new ProducerRecord<>(topicName, "key-", message8)); producerRecords.add(new ProducerRecord<>(topicName, "key-", message1)); producerRecords.add(new ProducerRecord<>(topicName, "key-", message2)); producerRecords.add(new ProducerRecord<>(topicName, "key-", message5)); producerRecords.add(new ProducerRecord<>(topicName, "key-", message3)); producerRecords.add(new ProducerRecord<>(topicName, "key-", message6)); producerRecords.add(new ProducerRecord<>(topicName, "key-", message7)); producerRecords.add(new ProducerRecord<>(topicName, "key-", message4)); for (ProducerRecord<String, String> record : producerRecords) { producer.send(record); } producer.flush(); Thread.sleep(random.nextInt(6) * 500); id += 1; tsMs1 = System.currentTimeMillis() + random.nextInt(10); tsMs2 = System.currentTimeMillis() + 100; tsMs4 = System.currentTimeMillis() + random.nextInt(10); tsMs5 = System.currentTimeMillis() - 500000; } } public static Integer getRightId(Integer id) { return random.nextInt(5) + id; } public static Integer getLeftId(Integer id) { return id - random.nextInt(5); } } 2.When using two programs to consume data from this topic, it can be observed that the data written to Hudi in the two tables is inconsistent. The requirements are: Same Hudi configuration Same table schema Same Hudi writing program 3.Query abnormal data: select id,CURRENT_ID,version,`_flink_cdc_op`,`_flink_cdc_ts_ms`,`_flink_cdc_table` from ods_rbs.ods_rbs_hudi_mor_part_test_02_01 where id=247; select id,CURRENT_ID,version,`_flink_cdc_op`,`_flink_cdc_ts_ms`,`_flink_cdc_table` from ods_rbs.ods_rbs_hudi_mor_part_test_02_02 where id=247;  4. Filtered from topic consumption: {"op":"c","ts_ms":1742383356691,"before":{"ID":"247","VERSION":5,"CURRENT_ID":"243"},"after":{"ID":"247","VERSION":5,"CURRENT_ID":"243"},"source":{"schema":"db","tenant":"FSSC","db":null,"table":"table_name_01"},"transaction":null} {"op":"c","ts_ms":1742383357692,"before":{"ID":"247","VERSION":5,"CURRENT_ID":"244"},"after":{"ID":"247","VERSION":5,"CURRENT_ID":"244"},"source":{"schema":"db","tenant":"FSSC","db":null,"table":"table_name_01"},"transaction":null} {"op":"u","ts_ms":1742383358195,"before":{"ID":"247","VERSION":6,"CURRENT_ID":"246"},"after":{"ID":"247","VERSION":6,"CURRENT_ID":"246"},"source":{"schema":"db","tenant":"FSSC","db":null,"table":"table_name_01"},"transaction":null} {"op":"d","ts_ms":1742383360196,"before":{"ID":"247","VERSION":8,"CURRENT_ID":"247"},"after":{"ID":"247","VERSION":8,"CURRENT_ID":"247"},"source":{"schema":"db","tenant":"FSSC","db":null,"table":"table_name_01"},"transaction":null} {"op":"c","ts_ms":1742383860205,"before":{"ID":"247","VERSION":1,"CURRENT_ID":"247"},"after":{"ID":"247","VERSION":1,"CURRENT_ID":"247"},"source":{"schema":"db","tenant":"FSSC","db":null,"table":"table_name_01"},"transaction":null} {"op":"u","ts_ms":1742383860296,"before":{"ID":"247","VERSION":2,"CURRENT_ID":"247"},"after":{"ID":"247","VERSION":2,"CURRENT_ID":"247"},"source":{"schema":"db","tenant":"FSSC","db":null,"table":"table_name_01"},"transaction":null} {"op":"u","ts_ms":1742383860296,"before":{"ID":"247","VERSION":3,"CURRENT_ID":"247"},"after":{"ID":"247","VERSION":3,"CURRENT_ID":"247"},"source":{"schema":"db","tenant":"FSSC","db":null,"table":"table_name_01"},"transaction":null} {"op":"u","ts_ms":1742383360196,"before":{"ID":"247","VERSION":7,"CURRENT_ID":"247"},"after":{"ID":"247","VERSION":7,"CURRENT_ID":"247"},"source":{"schema":"db","tenant":"FSSC","db":null,"table":"table_name_01"},"transaction":null} {"op":"d","ts_ms":1742383860199,"before":{"ID":"247","VERSION":4,"CURRENT_ID":"247"},"after":{"ID":"247","VERSION":4,"CURRENT_ID":"247"},"source":{"schema":"db","tenant":"FSSC","db":null,"table":"table_name_01"},"transaction":null} {"op":"d","ts_ms":1742383362698,"before":{"ID":"247","VERSION":8,"CURRENT_ID":"248"},"after":{"ID":"247","VERSION":8,"CURRENT_ID":"248"},"source":{"schema":"db","tenant":"FSSC","db":null,"table":"table_name_01"},"transaction":null} {"op":"d","ts_ms":1742383364702,"before":{"ID":"247","VERSION":8,"CURRENT_ID":"249"},"after":{"ID":"247","VERSION":8,"CURRENT_ID":"249"},"source":{"schema":"db","tenant":"FSSC","db":null,"table":"table_name_01"},"transaction":null} **Expected behavior** Expected Final Result: version=3 {"op":"u","ts_ms":1742383860296,"before":{"ID":"247","VERSION":3,"CURRENT_ID":"247"},"after":{"ID":"247","VERSION":3,"CURRENT_ID":"247"},"source":{"schema":"db","tenant":"FSSC","db":null,"table":"table_name_01"},"transaction":null} Instead of version=8 {"op":"d","ts_ms":1742383364702,"before":{"ID":"247","VERSION":8,"CURRENT_ID":"249"},"after":{"ID":"247","VERSION":8,"CURRENT_ID":"249"},"source":{"schema":"db","tenant":"FSSC","db":null,"table":"table_name_01"},"transaction":null} Observation: Both tables follow the same writing rules, but the results are different. Issue: During the merge process, the data is not merged strictly according to the specified rules, leading to randomness in the final outcome. **Environment Description** * Hudi version : 1.0.0 * Flink version : 1.15.2 * Hive version : 3.1.3 * Hadoop version : 3.3.4 * Storage (HDFS/S3/GCS..) : * Running on Docker? (yes/no) : **Additional context** Add any other context about the problem here. **Stacktrace** ```Add the stacktrace of the error.``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
