Toroidals opened a new issue, #12996: URL: https://github.com/apache/hudi/issues/12996
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? y - Join the mailing list to engage in conversations and get faster support at [email protected]. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** Primary Key: id precombine.field: ts_ms (ts_ms is a 13-digit timestamp in milliseconds) Scenario 1: The Hudi MOR table contains a record: id=1, version=1, ts_ms=1741022687053. A new record is submitted: id=1, version=2, ts_ms=1741022687053 (same ts_ms as the existing record). The merge behaves as expected, and the final result is: id=1, version=2, ts_ms=1741022687053 Scenario 2: The Hudi MOR table contains a record: id=1, version=1, ts_ms=1741022687053. Two new records are submitted: id=1, version=2, ts_ms=1741022687054 (ts_ms is 1 millisecond greater than the existing record). id=1, version=3, ts_ms=1741022687054 (same ts_ms as the first new record). Expected merge result: id=1, version=3, ts_ms=1741022687054 (latest version with the same ts_ms should be retained). However, sometimes the result is: id=1, version=2, ts_ms=1741022687054, which is not expected. Issue: When multiple records with the same primary key (id) and the same ts_ms are submitted in a batch, the merge process does not strictly follow the arrival order of the messages. Instead, it appears to randomly pick one of the records from the batch. flink conf: HoodiePipeline.Builder builder = HoodiePipeline.builder(infoMap.get("hudi_table_name")); Map<String, String> options = new HashMap<>(); options.put(FlinkOptions.DATABASE_NAME.key(), infoMap.get("hudi_database_name")); options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name")); options.put(FlinkOptions.PATH.key(), infoMap.get("hudi_hdfs_path")); options.put("catalog.path", "hdfs:///apps/hudi/catalog/"); String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT); ArrayList<ArrayList<String>> fieldList = JSON.parseObject(hudiFieldMap, new TypeReference<ArrayList<ArrayList<String>>>() { }); log.info("fieldList: {}", fieldList.toString()); for (ArrayList<String> columnList : fieldList) { builder.column("`" + columnList.get(0) + "` " + columnList.get(1)); } String[] hudiPrimaryKeys = infoMap.get("hudi_primary_key").split(","); builder.pk(hudiPrimaryKeys); options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts_ms"); **options.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), EventTimeAvroPayload.class.getName()); options.put(FlinkOptions.RECORD_MERGER_IMPLS.key(), HoodieAvroRecordMerger.class.getName());** options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name()); options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), infoMap.get("hudi_bucket_index_num_buckets")); options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), infoMap.get("hudi_bucket_index_engine_type")); options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), infoMap.get("hudi_compaction_trigger_strategy")); options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), infoMap.get("hudi_compaction_delta_commits")); options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), infoMap.get("hudi_compaction_delta_seconds")); options.put(FlinkOptions.COMPACTION_MAX_MEMORY.key(), infoMap.get("hudi_compaction_max_memory")); options.put(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), "true"); options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "150"); options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true"); options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms"); options.put(FlinkOptions.HIVE_SYNC_DB.key(), "hudi"); options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), "mor_test_01"); options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf"); options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), "thrift://xx01:9083,thrift://xx02:9083,thrift://xx03:9083"); options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), "jdbc:hive2://xx01:21181,xx02:21181,xx03:21181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2"); options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true"); options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true"); options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "part_dt"); options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "part_dt"); options.put(FlinkOptions.WRITE_RATE_LIMIT.key(), "20000"); options.put(FlinkOptions.WRITE_TASKS.key(), 8); options.put(FlinkOptions.OPERATION.key(), WriteOperationType.UPSERT.value()); builder.options(options); return builder; **To Reproduce** Steps to reproduce the behavior: 1. 2. 3. 4. **Expected behavior** A clear and concise description of what you expected to happen. **Environment Description** * Hudi version : 1.0.0 * Flink version : 1.15.2 * Hive version : 3.1.3 * Hadoop version : 3.3.4 * Storage (HDFS/S3/GCS..) : * Running on Docker? (yes/no) : **Additional context** Add any other context about the problem here. **Stacktrace** ```Add the stacktrace of the error.``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
