ricky2129 opened a new issue, #10641: URL: https://github.com/apache/seatunnel/issues/10641
### Search before asking - [x] I had searched in the [feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement. ### Description Currently the `Metadata` transform exposes `EventTime` and `Delay` for CDC sources. For downstream deduplication (e.g. CDC → S3 Parquet → StarRocks/Athena), users need a monotonically increasing, per-row unique key to identify the latest state of a record. `EventTime` is insufficient for this purpose because: 1. Multiple CDC events for the same primary key can share the same millisecond timestamp when a MySQL statement affects multiple rows or two statements execute within 1ms 2. `c_event_time - c_delay` (a common pattern) collapses ordering further through arithmetic coincidence — different delay values can produce identical results Example : ```json {"c_operation_type": "UPDATE_AFTER", "c_event_time": 1774342441039, "c_delay": 39, "c_db_change_time": 1774342441000} {"c_operation_type": "UPDATE_AFTER", "c_event_time": 1774342441040, "c_delay": 40, "c_db_change_time": 1774342441000} Both rows produce the same c_db_change_time despite being distinct CDC events. The correct solution — used by AWS DMS via AR_H_CHANGE_SEQ — is to expose the binlog filename + position + row index, which form a strict total order guaranteed by MySQL's binary log protocol. These are already present in the Debezium SourceRecord source struct (file, pos, row) but are never surfaced to the SeaTunnel metadata layer. ### Usage Scenario Any CDC pipeline that writes to a data lake (S3 Parquet, Iceberg) and needs to deduplicate or find the latest state of a record in downstream queries: Metadata { metadata_fields { BinlogFile = c_binlog_file // e.g. "mysql-bin.000123" BinlogPos = c_binlog_pos // e.g. 4857392 (BIGINT) BinlogRow = c_binlog_row // e.g. 0 (INT, row index within event) } } Downstream dedup key in StarRocks/Athena: CONCAT( COALESCE(c_binlog_file, ''), ':', LPAD(CAST(COALESCE(c_binlog_pos, 0) AS STRING), 20, '0'), ':', LPAD(CAST(COALESCE(c_binlog_row, 0) AS STRING), 10, '0') ) AS c_change_seq ### Related issues _No response_ ### Are you willing to submit a PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
