ricky2129 commented on issue #10641:
URL: https://github.com/apache/seatunnel/issues/10641#issuecomment-4120930350
@dybyte
## Design Doc: [Feature][CDC] Expose binlog file/position/row as metadata
fields for deterministic downstream deduplication
---
### Problem Statement
The CDC → data lake pattern (MySQL-CDC → S3 Parquet / Iceberg / Delta) is
one of the most common SeaTunnel deployment topologies. In this pattern the
sink is append-only: every INSERT, UPDATE_AFTER, and DELETE from the source
lands as a new row in the lake. Downstream queries reconstruct the latest state
of each record using a deduplication query:
```sql
SELECT * FROM cdc_table
QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY <ordering_key> DESC) = 1
```
**The ordering key is the crux of the problem.** It must satisfy two
properties:
1. **Uniqueness** — no two distinct CDC events can have the same value
2. **Total order** — the value must reflect the true sequence of changes in
the source database
The SeaTunnel `Metadata` transform (v2.3.12) exposes the following fields
for CDC sources:
| Field | Type | Description |
|---|---|---|
| Database | STRING | Database name containing the row |
| Table | STRING | Table name containing the row |
| RowKind | STRING | Operation type (INSERT, UPDATE_BEFORE, etc.) |
| EventTime | BIGINT | Time at which the **connector** processed the event
(ms) |
| Delay | BIGINT | Difference between connector processing time and database
change time (ms) |
| Partition | STRING | Partition field information |
None of these fields is suitable as a unique ordering key for deduplication.
---
### Why existing fields fail as an ordering key
**Failure mode 1: Same-statement multi-row events**
A single UPDATE statement matching N rows produces N CDC events, all with
identical `EventTime` — they are all processed by the connector at the same
instant. There is no way to distinguish their order using timestamp alone.
```sql
-- One statement, two rows touched → two CDC events, same EventTime
UPDATE payments SET status = 'settled' WHERE batch_id = 42;
```
After deduplication on `EventTime`, one row is dropped arbitrarily. The lake
has an inconsistent view.
**Failure mode 2: Sub-millisecond transactions**
Two separate transactions committing within the same millisecond share the
same `EventTime`. High-throughput OLTP workloads (payment processing, order
management) routinely produce thousands of transactions per second —
sub-millisecond commit intervals are not edge cases.
**Failure mode 3: The `c_db_change_time` collapse**
A common pattern in SeaTunnel CDC jobs is to approximate the source database
change time by subtracting lag:
```sql
(c_event_time - c_delay) AS c_db_change_time
```
This can make the collision worse. Two events with slightly different
`EventTime` and `Delay` values produce identical `c_db_change_time`:
```json
{"c_event_time": 1774342441039, "c_delay": 39, "c_db_change_time":
1774342441000}
{"c_event_time": 1774342441040, "c_delay": 40, "c_db_change_time":
1774342441000}
```
These are two distinct committed changes. They are indistinguishable in the
lake.
**Failure mode 4: UPDATE_BEFORE and UPDATE_AFTER have the same EventTime**
For every UPDATE, SeaTunnel emits two rows: `UPDATE_BEFORE` and
`UPDATE_AFTER`. Both carry the same `EventTime` because they originate from the
same Debezium `SourceRecord`. A deduplication query using only timestamp cannot
determine which one represents the final state — it can and does pick
`UPDATE_BEFORE` as the current row.
---
### The correct solution: MySQL binlog coordinates
MySQL's binary log protocol (row-based replication, required for CDC)
provides an exact, stable, globally ordered position for every committed row
change.
For a committed transaction, the binlog guarantees:
- Each DML statement produces a separate `ROWS_EVENT` at a unique byte
offset (`pos`) in the binlog file (`file`)
- If one statement matches multiple rows, those rows share the same `pos`
but each gets a distinct `row` index (0, 1, 2…)
Therefore `(file, pos, row)` is **unique per committed row-change, across
all cases**:
| Scenario | `pos` | `row` |
|---|---|---|
| Two UPDATE statements, 1 row each | different | both `0` |
| One UPDATE matching 2 rows | same | `0` and `1` |
| Two UPDATEs on the same row in one transaction | different | both `0` |
| UPDATE_BEFORE vs UPDATE_AFTER | same | same — both from the same Debezium
SourceRecord |
The last row is important: `UPDATE_BEFORE` and `UPDATE_AFTER` intentionally
share `(file, pos, row)`. The downstream query should retain `UPDATE_AFTER`
using `RowKind` as a tiebreaker — not a timestamp coin flip.
This is the same guarantee AWS DMS exposes via `AR_H_CHANGE_SEQ`, a
zero-padded encoding of `(log_file, log_pos, row_index)` that is
lexicographically sortable. Users migrating from DMS to SeaTunnel currently
have no equivalent.
These three values are already present in every Debezium `SourceRecord` for
MySQL-CDC, inside the `source` struct:
```
source=Struct{
version=1.9.8.Final,
connector=mysql,
file=mysql-bin-changelog.309222,
pos=54106803,
row=0,
...
}
```
They are not surfaced to SeaTunnel's metadata layer today. This proposal
adds them as opt-in fields in the `Metadata` transform with a minimal, fully
backward-compatible change.
Non-MySQL connectors (PostgreSQL, Oracle, SQL Server) do not have a `file`
field in their Debezium source struct. All three extractors guard on
`source.schema().field("file") == null`, making the fields `null` for non-MySQL
sources with no behavioral change to those connectors.
---
### Design
#### Files changed
| File | Change |
|---|---|
| `seatunnel-api/.../CommonOptions.java` | Declare `BINLOG_FILE`,
`BINLOG_POS`, `BINLOG_ROW` enum values |
| `seatunnel-api/.../MetadataUtil.java` | Add `setBinlogFile`,
`setBinlogPos`, `setBinlogRow` setters |
| `.../cdc/base/utils/SourceRecordUtils.java` | Add `getBinlogFile`,
`getBinlogPos`, `getBinlogRow` extractors |
| `.../debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java` | Populate
fields on every emitted `SeaTunnelRow` |
| `.../cdc/base/source/IncrementalSource.java` | Register as advertised
metadata columns |
#### `CommonOptions.java`
Three values added after `DELAY` with `supportMetadataTrans = true`, which
is the flag the `Metadata` transform checks to include a field in its switch
statement:
```java
BINLOG_FILE("BinlogFile", true),
BINLOG_POS("BinlogPos", true),
BINLOG_ROW("BinlogRow", true);
```
#### `SourceRecordUtils.java`
Three extractor methods, each null-safe for `null` value, `null` source
struct, and missing schema field:
```java
public static String getBinlogFile(SourceRecord record) {
Struct value = (Struct) record.value();
if (value == null) return null;
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
if (source == null || source.schema().field("file") == null) return null;
return source.getString("file");
}
public static Long getBinlogPos(SourceRecord record) {
Struct value = (Struct) record.value();
if (value == null) return null;
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
if (source == null || source.schema().field("pos") == null) return null;
return source.getInt64("pos");
}
public static Integer getBinlogRow(SourceRecord record) {
Struct value = (Struct) record.value();
if (value == null) return null;
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
if (source == null || source.schema().field("row") == null) return null;
return source.getInt32("row");
}
```
#### `SeaTunnelRowDebeziumDeserializeSchema.java`
In `deserializeDataChangeRecord()`, binlog coordinates are extracted **once
per `SourceRecord`** (all rows within the same binlog event share the same
source struct), then set on every emitted row (INSERT, DELETE, UPDATE_BEFORE,
UPDATE_AFTER):
```java
// Extract binlog position once per record — same source struct for all rows
in the event
String binlogFile = SourceRecordUtils.getBinlogFile(record);
Long binlogPos = SourceRecordUtils.getBinlogPos(record);
Integer binlogRow = SourceRecordUtils.getBinlogRow(record);
// set on each emitted SeaTunnelRow:
MetadataUtil.setBinlogFile(row, binlogFile);
MetadataUtil.setBinlogPos(row, binlogPos);
MetadataUtil.setBinlogRow(row, binlogRow);
```
#### `IncrementalSource.java`
```java
metadata.add(MetadataColumn.of(
CommonOptions.BINLOG_FILE.getName(), BasicType.STRING_TYPE, (String)
null, true, null, null));
metadata.add(MetadataColumn.of(
CommonOptions.BINLOG_POS.getName(), BasicType.LONG_TYPE, (Long) null,
true, null, null));
metadata.add(MetadataColumn.of(
CommonOptions.BINLOG_ROW.getName(), BasicType.INT_TYPE, (Integer) null,
true, null, null));
```
---
### Parallelism and checkpoint safety
`SeaTunnelRow.options` is populated inside `deserializeDataChangeRecord()`,
which runs within the source `TaskGroup`. All transforms are co-located in the
same `TaskGroup` via `IntermediateBlockingQueue` (in-memory, no serialization).
`SeaTunnelRow.options` is not part of the `RecordSerializer` wire format — only
`tableId`, `rowKind`, `arity`, and `fields[]` are serialized — so this change
has no impact on checkpoint state or cross-task-boundary data movement at any
parallelism level.
On checkpoint restore, the source re-emits from its `BinlogOffset` and
binlog coordinates are repopulated fresh from the new Debezium records. No
state migration is required.
---
### User-facing configuration
Using the same `Metadata` transform syntax as existing fields:
```hocon
Metadata {
plugin_input = "cdc_stream"
plugin_output = "cdc_with_meta"
metadata_fields {
BinlogFile = c_binlog_file # STRING — e.g. "mysql-bin-changelog.309222"
BinlogPos = c_binlog_pos # BIGINT — e.g. 54106803
BinlogRow = c_binlog_row # INT — e.g. 0
}
}
```
Downstream dedup key — lexicographically sortable, equivalent to AWS DMS
`AR_H_CHANGE_SEQ`:
```sql
CONCAT(
COALESCE(c_binlog_file, ''), ':',
LPAD(CAST(COALESCE(c_binlog_pos, 0) AS VARCHAR), 20, '0'), ':',
LPAD(CAST(COALESCE(c_binlog_row, 0) AS VARCHAR), 10, '0')
) AS c_change_seq
```
Latest-state deduplication query:
```sql
SELECT * EXCEPT(c_row_kind, c_change_seq)
FROM cdc_table
WHERE c_row_kind != 'DELETE'
QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY c_change_seq DESC) = 1
```
---
### Alternatives considered
| Alternative | Why rejected |
|---|---|
| Single pre-formatted `ChangeSeq` STRING field | Loses type information;
`BinlogPos` as BIGINT is independently useful as a numeric sort key |
| `EventTime` alone | Not unique — same value for multi-row events, and
identical for UPDATE_BEFORE/UPDATE_AFTER pairs |
| Expose PostgreSQL LSN / Oracle SCN in the same PR | Different offset
semantics per connector — better addressed per-connector in a follow-up |
---
### Compatibility
- **Existing jobs**: no impact — `options` entries are populated on every
row but only materialise as output columns when explicitly mapped via the
`Metadata` transform
- **Non-MySQL connectors**: all three fields return `null`, guarded by
schema field presence check
- **Snapshot phase**: fields are `null` for snapshot rows (Debezium source
struct has no `file` field during snapshot); users with `startup.mode = latest`
are unaffected
- **Checkpoint format**: unchanged
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]