Toroidals commented on issue #13000:
URL: https://github.com/apache/hudi/issues/13000#issuecomment-2738949582
> I see for your test, the source records itselft is kind of random
generated, for every batch of inputs, the arrival sequence is already
different, maybe you should make the inputs as explicit sequence first.
@danny0405
**Build test data:** :According to this logic, when writing to the topic,
the final write order for each ID in Hudi should be version = 3, followed by
version = 2, and finally version = 1.
`public class FlinkKafkaProducerTest02 {
public static Random random = new Random();
public static void main(String[] args) throws Exception {
...
KafkaProducer<String, String> producer = new
KafkaProducer<>(properties);
long tsMs1;
try {
for (int id = 0; id < 100000; id++) {
tsMs1 = System.currentTimeMillis();
String message1 = "{\"op\":\"c\",\"ts_ms\":" + tsMs1 +
",\"before\":{\"ID\":\"" + (id - 1) + "\",\"VERSION\":1,\"CURRENT_ID\":\"" + id
+ "\"},\"after\":{\"ID\":\"" + (id - 1) + "\",\"VERSION\":1,\"CURRENT_ID\":\""
+ id +
"\"},\"source\":{\"schema\":\"db\",\"tenant\":\"FSSC\",\"db\":null,\"table\":\"table_name_01\"},\"transaction\":null}";
String message2 = "{\"op\":\"u\",\"ts_ms\":" + tsMs1 +
",\"before\":{\"ID\":\"" + id + "\",\"VERSION\":2,\"CURRENT_ID\":\"" + id +
"\"},\"after\":{\"ID\":\"" + id + "\",\"VERSION\":2,\"CURRENT_ID\":\"" + id +
"\"},\"source\":{\"schema\":\"db\",\"tenant\":\"FSSC\",\"db\":null,\"table\":\"table_name_01\"},\"transaction\":null}";
String message3 = "{\"op\":\"u\",\"ts_ms\":" + tsMs1 +
",\"before\":{\"ID\":\"" + (id + 1) + "\",\"VERSION\":3,\"CURRENT_ID\":\"" + id
+ "\"},\"after\":{\"ID\":\"" + (id + 1) + "\",\"VERSION\":3,\"CURRENT_ID\":\""
+ id +
"\"},\"source\":{\"schema\":\"db\",\"tenant\":\"FSSC\",\"db\":null,\"table\":\"table_name_01\"},\"transaction\":null}";
ProducerRecord<String, String> record1 = new
ProducerRecord<>(topicName, "key-", message1);
ProducerRecord<String, String> record2 = new
ProducerRecord<>(topicName, "key-", message2);
ProducerRecord<String, String> record3 = new
ProducerRecord<>(topicName, "key-", message3);
producer.send(record1);
producer.send(record2);
producer.send(record3);
producer.flush();
Thread.sleep(random.nextInt(5) * 1000);
}
} catch (Exception e) {
e.printStackTrace();
producer.close();
}
}
}`
**hudi sink**
`HoodiePipeline.Builder builder =
HoodiePipeline.builder(infoMap.get("hudi_table_name"));
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.DATABASE_NAME.key(),
infoMap.get("hudi_database_name"));
options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name"));
options.put(FlinkOptions.PATH.key(), infoMap.get("hudi_hdfs_path"));
options.put("catalog.path", "hdfs:///apps/hudi/catalog/");
String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
ArrayList<ArrayList> fieldList = JSON.parseObject(hudiFieldMap, new
TypeReference<ArrayList<ArrayList>>() {
});
log.info("fieldList: {}", fieldList.toString());
for (ArrayList columnList : fieldList) {
builder.column("" + columnList.get(0) + " " + columnList.get(1));
}
builder.pk("id");
options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts_ms");
options.put(FlinkOptions.PRE_COMBINE.key(), "true");
options.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(),
EventTimeAvroPayload.class.getName());
options.put(FlinkOptions.RECORD_MERGER_IMPLS.key(),
OverwriteWithLatestMerger.class.getName());
options.put(FlinkOptions.TABLE_TYPE.key(),
HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.INDEX_TYPE.key(),
HoodieIndex.IndexType.BUCKET.name());
options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "16");
options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), "SIMPLE");
options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), "num_or_time");
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "10");
options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), "30");
options.put(FlinkOptions.COMPACTION_MAX_MEMORY.key(), "100");
options.put(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), "true");
options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "150");
options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
options.put(FlinkOptions.HIVE_SYNC_DB.key(), "hudi");
options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), "mor_test_01");
options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(),
"thrift://xx01:9083,thrift://xx02:9083,thrift://xx03:9083");
options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(),
"jdbc:hive2://xx01:21181,xx02:21181,xx03:21181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2");
options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");
options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "part_dt");
options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "part_dt");
options.put(FlinkOptions.WRITE_RATE_LIMIT.key(), "20000");
options.put(FlinkOptions.WRITE_TASKS.key(), 8);
options.put(FlinkOptions.OPERATION.key(), WriteOperationType.UPSERT.value());
builder.options(options);
return builder;`
**Query result of Table ods_rbs_hudi_mor_part_test_02_01 :**:
select
id,CURRENT_ID,version,`_flink_cdc_op`,`_flink_cdc_ts_ms`,`_flink_cdc_table`
from ods_rbs.ods_rbs_hudi_mor_part_test_02_01 where version not in (1);

**Query result of Table ods_rbs_hudi_mor_part_test_02_02:**:
select
id,CURRENT_ID,version,`_flink_cdc_op`,`_flink_cdc_ts_ms`,`_flink_cdc_table`
from ods_rbs.ods_rbs_hudi_mor_part_test_02_02 where version not in (1);

The expected result is that version = 1 data should not appear in the Hudi
table (except for the last two records). It is evident that the records with id
= 56 and id = 57 in the ods_rbs_hudi_mor_part_test_02_01 table are abnormal.
The query result for id = 56 and id = 57 in the
ods_rbs_hudi_mor_part_test_02_02 table is as follows:

This is the expected result. The filtered data from the topic for id=56 and
id=57 are as follows:

Both tasks are running normally.


**Stacktrace of ods_rbs_hudi_mor_part_test_02_01:**
https://gist.github.com/Toroidals/2b64f7bf1b42cc58e98bc8129848cf04
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]