Toroidals opened a new issue, #12996:
URL: https://github.com/apache/hudi/issues/12996

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? y
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   Primary Key: id
   precombine.field: ts_ms (ts_ms is a 13-digit timestamp in milliseconds)
   
   Scenario 1:
   The Hudi MOR table contains a record: id=1, version=1, ts_ms=1741022687053.
   A new record is submitted: id=1, version=2, ts_ms=1741022687053 (same ts_ms 
as the existing record).
   The merge behaves as expected, and the final result is:
   id=1, version=2, ts_ms=1741022687053
   Scenario 2:
   The Hudi MOR table contains a record: id=1, version=1, ts_ms=1741022687053.
   Two new records are submitted:
   id=1, version=2, ts_ms=1741022687054 (ts_ms is 1 millisecond greater than 
the existing record).
   id=1, version=3, ts_ms=1741022687054 (same ts_ms as the first new record).
   Expected merge result:
   id=1, version=3, ts_ms=1741022687054 (latest version with the same ts_ms 
should be retained).
   However, sometimes the result is:
   id=1, version=2, ts_ms=1741022687054, which is not expected.
   Issue:
   When multiple records with the same primary key (id) and the same ts_ms are 
submitted in a batch, the merge process does not strictly follow the arrival 
order of the messages. Instead, it appears to randomly pick one of the records 
from the batch.
   
   
   flink conf:
   HoodiePipeline.Builder builder = 
HoodiePipeline.builder(infoMap.get("hudi_table_name"));
   Map<String, String> options = new HashMap<>();
   options.put(FlinkOptions.DATABASE_NAME.key(), 
infoMap.get("hudi_database_name"));
   options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name"));
   options.put(FlinkOptions.PATH.key(), infoMap.get("hudi_hdfs_path"));
   options.put("catalog.path", "hdfs:///apps/hudi/catalog/");
   String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
   ArrayList<ArrayList<String>> fieldList = JSON.parseObject(hudiFieldMap, new 
TypeReference<ArrayList<ArrayList<String>>>() {
   });
   log.info("fieldList: {}",  fieldList.toString());
   for (ArrayList<String> columnList : fieldList) {
       builder.column("`" + columnList.get(0) + "` " + columnList.get(1));
   }
   String[] hudiPrimaryKeys = infoMap.get("hudi_primary_key").split(",");
   builder.pk(hudiPrimaryKeys);
   
   options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts_ms");
   **options.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), 
EventTimeAvroPayload.class.getName());
   options.put(FlinkOptions.RECORD_MERGER_IMPLS.key(), 
HoodieAvroRecordMerger.class.getName());**
   
   options.put(FlinkOptions.TABLE_TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name());
   options.put(FlinkOptions.INDEX_TYPE.key(), 
HoodieIndex.IndexType.BUCKET.name());
   options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), 
infoMap.get("hudi_bucket_index_num_buckets"));
   options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), 
infoMap.get("hudi_bucket_index_engine_type"));
   
   options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), 
infoMap.get("hudi_compaction_trigger_strategy"));
   options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), 
infoMap.get("hudi_compaction_delta_commits"));
   options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), 
infoMap.get("hudi_compaction_delta_seconds"));
   options.put(FlinkOptions.COMPACTION_MAX_MEMORY.key(), 
infoMap.get("hudi_compaction_max_memory"));
   
   options.put(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), "true");
   options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "150");
   options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
   options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
   options.put(FlinkOptions.HIVE_SYNC_DB.key(), "hudi");
   options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), "mor_test_01");
   options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
   options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), 
"thrift://xx01:9083,thrift://xx02:9083,thrift://xx03:9083");
   options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), 
"jdbc:hive2://xx01:21181,xx02:21181,xx03:21181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2");
   options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
   options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");
   
   options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "part_dt");
   options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "part_dt");
   
   options.put(FlinkOptions.WRITE_RATE_LIMIT.key(), "20000");
   
   options.put(FlinkOptions.WRITE_TASKS.key(), 8);
   
   options.put(FlinkOptions.OPERATION.key(), WriteOperationType.UPSERT.value());
   
   builder.options(options);
   return builder;
   
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1.
   2.
   3.
   4.
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version : 1.0.0
   
   * Flink version : 1.15.2
   
   * Hive version : 3.1.3
   
   * Hadoop version : 3.3.4
   
   * Storage (HDFS/S3/GCS..) :
   
   * Running on Docker? (yes/no) :
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to