ricky2129 opened a new issue, #10641:
URL: https://github.com/apache/seatunnel/issues/10641

   ### Search before asking
   
   - [x] I had searched in the 
[feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22)
 and found no similar feature requirement.
   
   
   ### Description
   
     Currently the `Metadata` transform exposes `EventTime` and `Delay` for CDC 
sources.
     For downstream deduplication (e.g. CDC → S3 Parquet → StarRocks/Athena), 
users need
     a monotonically increasing, per-row unique key to identify the latest 
state of a record.
   
     `EventTime` is insufficient for this purpose because:
     1. Multiple CDC events for the same primary key can share the same 
millisecond timestamp
        when a MySQL statement affects multiple rows or two statements execute 
within 1ms
     2. `c_event_time - c_delay` (a common pattern) collapses ordering further 
through
        arithmetic coincidence — different delay values can produce identical 
results
   
     Example :
     ```json
     {"c_operation_type": "UPDATE_AFTER", "c_event_time": 1774342441039, 
"c_delay": 39, "c_db_change_time": 1774342441000}
     {"c_operation_type": "UPDATE_AFTER", "c_event_time": 1774342441040, 
"c_delay": 40, "c_db_change_time": 1774342441000}
     Both rows produce the same c_db_change_time despite being distinct CDC 
events.
   
     The correct solution — used by AWS DMS via AR_H_CHANGE_SEQ — is to expose 
the
     binlog filename + position + row index, which form a strict total order 
guaranteed
     by MySQL's binary log protocol. These are already present in the Debezium 
SourceRecord
     source struct (file, pos, row) but are never surfaced to the SeaTunnel 
metadata layer.
   
   
   ### Usage Scenario
   
   Any CDC pipeline that writes to a data lake (S3 Parquet, Iceberg) and needs 
to
     deduplicate or find the latest state of a record in downstream queries:
   
     Metadata {
       metadata_fields {
         BinlogFile = c_binlog_file   // e.g. "mysql-bin.000123"
         BinlogPos  = c_binlog_pos    // e.g. 4857392 (BIGINT)
         BinlogRow  = c_binlog_row    // e.g. 0 (INT, row index within event)
       }
     }
   
     Downstream dedup key in StarRocks/Athena:
     CONCAT(
       COALESCE(c_binlog_file, ''), ':',
       LPAD(CAST(COALESCE(c_binlog_pos, 0) AS STRING), 20, '0'), ':',
       LPAD(CAST(COALESCE(c_binlog_row, 0) AS STRING), 10, '0')
     ) AS c_change_seq
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to