Tejansh Rana created FLINK-38691:
------------------------------------
Summary: Support for Transaction Boundary Events in Flink CDC
Connector
Key: FLINK-38691
URL: https://issues.apache.org/jira/browse/FLINK-38691
Project: Flink
Issue Type: New Feature
Components: Flink CDC
Reporter: Tejansh Rana
Following my discussion with [~leonard] at Flink Forward, I am writing to
propose a feature enhancement for the Flink CDC connectors based on Debezium,
related to how it handles transaction metadata.
*Problem Statement:*
In data streaming pipelines that require transactional guarantees or need to
group atomic changes together, it is essential to identify the boundaries of
the original database transaction (i.e., the BEGIN and COMMIT or END events).
Currently, the Flink CDC connectors appear to skip these transaction lifecycle
events -
[https://github.com/apache/flink-cdc/blob/23a1c2efb6fa9ce1c9f17b3836f6aaa995bb0660/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java#L77]
[https://github.com/apache/flink-cdc/blob/23a1c2efb6fa9ce1c9f17b3836f6aaa995bb0660/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java#L92]
This omission makes it challenging to reconstruct the original transaction
scope. Without explicit transaction markers, downstream Flink jobs cannot
easily guarantee atomicity across sinks.
*Proposed Solution:*
The underlying CDC mechanism, Debezium, supports emitting transaction boundary
events (BEGIN and END/COMMIT) through its configuration.
We propose enhancing the Flink CDC connectors to expose this transaction
metadata to the Flink pipeline. The connector should emit specialised records
or metadata fields that indicate the start and end of a transaction as emitted.
*Logs of current behaviour:*
MySQL CDC connector
{{2025-11-04 14:34:14,033 INFO
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter [] -
Meet unknown element
SourceRecord\{sourcePartition={server=mysql_binlog_source},
sourceOffset=\{transaction_id=4541146d-b988-11f0-87f6-0242ac140006:18,
file=mysql-bin.000003, pos=5850,
gtids=4541146d-b988-11f0-87f6-0242ac140006:1-17, server_id=1}}
ConnectRecord\{topic='mysql_binlog_source.transaction', kafkaPartition=null,
key=Struct{id=4541146d-b988-11f0-87f6-0242ac140006:18},
keySchema=Schema\{io.debezium.connector.common.TransactionMetadataKey:STRUCT},
value=Struct{color:#FF0000}{status=BEGIN,id=4541146d-b988-11f0-87f6-0242ac140006:18},
valueSchema=Schema\{io.debezium.connector.common.TransactionMetadataValue:STRUCT},
timestamp=null, headers=ConnectHeaders(headers=)}, just skip{color}.}}
{color:#172b4d}{{PostgreSQL CDC Connector}}{color}
{color:#172b4d}{{2025-11-18 09:02:25,544 INFO
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter
[] - Meet unknown element
SourceRecord\{sourcePartition={server=postgres_cdc_source},
sourceOffset=\{transaction_id=843, lsn_proc=31198112, lsn=31198568, txId=843,
ts_usec=1763456544336451}}
ConnectRecord\{topic='postgres_cdc_source.transaction', kafkaPartition=null,
key=Struct{id=843},
keySchema={color:#de350b}Schema\{io.debezium.connector.common.TransactionMetadataKey:STRUCT},
value=Struct\{status=BEGIN,id=843}{color},
valueSchema=Schema\{io.debezium.connector.common.TransactionMetadataValue:STRUCT},
timestamp=null, headers=ConnectHeaders(headers=)} for splitState =
StreamSplitState\{startingOffset=Offset{lsn=LSN{0/1DC0BA0}, txId=840,
lastCommitTs=-9223372036854775808},
endingOffset=Offset\{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
lastCommitTs=-9223372036853775810}, split=StreamSplit\{splitId='stream-split',
offset=Offset{lsn=LSN{0/1DC0BA0}, txId=840, lastCommitTs=-9223372036854775808},
endOffset=Offset\{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
lastCommitTs=-9223372036853775810}, isSuspended=false,
isSnapshotCompleted=true}}, just skip.}}{color}
{color:#172b4d}Draft PR for MySQL connector with proposed changes -
[https://github.com/apache/flink-cdc/pull/4170]{color}
{color:#172b4d}I would be happy to create a corresponding PR for the base
Incremental Source Record Emitter as well should we accept this proposal{color}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)