[
https://issues.apache.org/jira/browse/FLINK-38691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18040437#comment-18040437
]
Tejansh Rana commented on FLINK-38691:
--------------------------------------
Discussed in email thread:
# This feature should be supported in the DataStream API since Flink SQL/Table
API relies on RowData.
# To avoid a breaking change and performance overhead of parsing these events,
this feature should be supported by an option to enable/disable parsing and
emitting of these events. This option should be disabled by default.
# MySQL pipeline connector is out of scope of this ticket.
> Support for Transaction Boundary Events in Flink CDC Connector
> --------------------------------------------------------------
>
> Key: FLINK-38691
> URL: https://issues.apache.org/jira/browse/FLINK-38691
> Project: Flink
> Issue Type: New Feature
> Components: Flink CDC
> Reporter: Tejansh Rana
> Priority: Major
>
> Following my discussion with [~leonard] at Flink Forward, I am writing to
> propose a feature enhancement for the Flink CDC connectors based on Debezium,
> related to how it handles transaction metadata.
> *Problem Statement:*
> In data streaming pipelines that require transactional guarantees or need to
> group atomic changes together, it is essential to identify the boundaries of
> the original database transaction (i.e., the BEGIN and COMMIT or END events).
> Currently, the Flink CDC connectors appear to skip these transaction
> lifecycle events -
> [https://github.com/apache/flink-cdc/blob/23a1c2efb6fa9ce1c9f17b3836f6aaa995bb0660/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java#L77]
>
> [https://github.com/apache/flink-cdc/blob/23a1c2efb6fa9ce1c9f17b3836f6aaa995bb0660/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java#L92]
> This omission makes it challenging to reconstruct the original transaction
> scope. Without explicit transaction markers, downstream Flink jobs cannot
> easily guarantee atomicity across sinks.
> *Proposed Solution:*
> The underlying CDC mechanism, Debezium, supports emitting transaction
> boundary events (BEGIN and END/COMMIT) through its configuration.
> We propose enhancing the Flink CDC connectors to expose this transaction
> metadata to the Flink pipeline. The connector should emit specialised records
> or metadata fields that indicate the start and end of a transaction as
> emitted.
> *Logs of current behaviour:*
> MySQL CDC connector
> {{2025-11-04 14:34:14,033 INFO
> org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter [] -
> Meet unknown element
> SourceRecord\{sourcePartition={server=mysql_binlog_source},
> sourceOffset=\{transaction_id=4541146d-b988-11f0-87f6-0242ac140006:18,
> file=mysql-bin.000003, pos=5850,
> gtids=4541146d-b988-11f0-87f6-0242ac140006:1-17, server_id=1}}
> ConnectRecord\{topic='mysql_binlog_source.transaction', kafkaPartition=null,
> key=Struct{id=4541146d-b988-11f0-87f6-0242ac140006:18},
> keySchema=Schema\{io.debezium.connector.common.TransactionMetadataKey:STRUCT},
>
> value=Struct{color:#FF0000}{status=BEGIN,id=4541146d-b988-11f0-87f6-0242ac140006:18},
>
> valueSchema=Schema\{io.debezium.connector.common.TransactionMetadataValue:STRUCT},
> timestamp=null, headers=ConnectHeaders(headers=)}, just skip{color}.}}
> {color:#172b4d}{{PostgreSQL CDC Connector}}{color}
> {color:#172b4d}{{2025-11-18 09:02:25,544 INFO
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter
> [] - Meet unknown element
> SourceRecord\{sourcePartition={server=postgres_cdc_source},
> sourceOffset=\{transaction_id=843, lsn_proc=31198112, lsn=31198568, txId=843,
> ts_usec=1763456544336451}}
> ConnectRecord\{topic='postgres_cdc_source.transaction', kafkaPartition=null,
> key=Struct{id=843},
> keySchema={color:#de350b}Schema\{io.debezium.connector.common.TransactionMetadataKey:STRUCT},
> value=Struct\{status=BEGIN,id=843}{color},
> valueSchema=Schema\{io.debezium.connector.common.TransactionMetadataValue:STRUCT},
> timestamp=null, headers=ConnectHeaders(headers=)} for splitState =
> StreamSplitState\{startingOffset=Offset{lsn=LSN{0/1DC0BA0}, txId=840,
> lastCommitTs=-9223372036854775808},
> endingOffset=Offset\{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
> lastCommitTs=-9223372036853775810},
> split=StreamSplit\{splitId='stream-split', offset=Offset{lsn=LSN{0/1DC0BA0},
> txId=840, lastCommitTs=-9223372036854775808},
> endOffset=Offset\{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null,
> lastCommitTs=-9223372036853775810}, isSuspended=false,
> isSnapshotCompleted=true}}, just skip.}}{color}
>
> {color:#172b4d}Draft PR for MySQL connector with proposed changes -
> [https://github.com/apache/flink-cdc/pull/4170]{color}
> {color:#172b4d}I would be happy to create a corresponding PR for the base
> Incremental Source Record Emitter as well should we accept this
> proposal{color}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)