[ 
https://issues.apache.org/jira/browse/FLINK-38691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18040437#comment-18040437
 ] 

Tejansh Rana commented on FLINK-38691:
--------------------------------------

Discussed in email thread:
 # This feature should be supported in the DataStream API since Flink SQL/Table 
API relies on RowData.
 # To avoid a breaking change and performance overhead of parsing these events, 
this feature should be supported by an option to enable/disable parsing and 
emitting of these events. This option should be disabled by default.
 # MySQL pipeline connector is out of scope of this ticket. 

> Support for Transaction Boundary Events in Flink CDC Connector
> --------------------------------------------------------------
>
>                 Key: FLINK-38691
>                 URL: https://issues.apache.org/jira/browse/FLINK-38691
>             Project: Flink
>          Issue Type: New Feature
>          Components: Flink CDC
>            Reporter: Tejansh Rana
>            Priority: Major
>
> Following my discussion with [~leonard] at Flink Forward, I am writing to 
> propose a feature enhancement for the Flink CDC connectors based on Debezium, 
> related to how it handles transaction metadata.
> *Problem Statement:*
> In data streaming pipelines that require transactional guarantees or need to 
> group atomic changes together, it is essential to identify the boundaries of 
> the original database transaction (i.e., the BEGIN and COMMIT or END events). 
> Currently, the Flink CDC connectors appear to skip these transaction 
> lifecycle events - 
> [https://github.com/apache/flink-cdc/blob/23a1c2efb6fa9ce1c9f17b3836f6aaa995bb0660/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java#L77]
>  
> [https://github.com/apache/flink-cdc/blob/23a1c2efb6fa9ce1c9f17b3836f6aaa995bb0660/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java#L92]
> This omission makes it challenging to reconstruct the original transaction 
> scope. Without explicit transaction markers, downstream Flink jobs cannot 
> easily guarantee atomicity across sinks.
> *Proposed Solution:*
> The underlying CDC mechanism, Debezium, supports emitting transaction 
> boundary events (BEGIN and END/COMMIT) through its configuration.
> We propose enhancing the Flink CDC connectors to expose this transaction 
> metadata to the Flink pipeline. The connector should emit specialised records 
> or metadata fields that indicate the start and end of a transaction as 
> emitted.
> *Logs of current behaviour:*
> MySQL CDC connector
> {{2025-11-04 14:34:14,033 INFO  
> org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter [] - 
> Meet unknown element 
> SourceRecord\{sourcePartition={server=mysql_binlog_source}, 
> sourceOffset=\{transaction_id=4541146d-b988-11f0-87f6-0242ac140006:18, 
> file=mysql-bin.000003, pos=5850, 
> gtids=4541146d-b988-11f0-87f6-0242ac140006:1-17, server_id=1}} 
> ConnectRecord\{topic='mysql_binlog_source.transaction', kafkaPartition=null, 
> key=Struct{id=4541146d-b988-11f0-87f6-0242ac140006:18}, 
> keySchema=Schema\{io.debezium.connector.common.TransactionMetadataKey:STRUCT},
>  
> value=Struct{color:#FF0000}{status=BEGIN,id=4541146d-b988-11f0-87f6-0242ac140006:18},
>  
> valueSchema=Schema\{io.debezium.connector.common.TransactionMetadataValue:STRUCT},
>  timestamp=null, headers=ConnectHeaders(headers=)}, just skip{color}.}}
> {color:#172b4d}{{PostgreSQL CDC Connector}}{color}
> {color:#172b4d}{{2025-11-18 09:02:25,544 INFO  
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter
>  [] - Meet unknown element 
> SourceRecord\{sourcePartition={server=postgres_cdc_source}, 
> sourceOffset=\{transaction_id=843, lsn_proc=31198112, lsn=31198568, txId=843, 
> ts_usec=1763456544336451}} 
> ConnectRecord\{topic='postgres_cdc_source.transaction', kafkaPartition=null, 
> key=Struct{id=843}, 
> keySchema={color:#de350b}Schema\{io.debezium.connector.common.TransactionMetadataKey:STRUCT},
>  value=Struct\{status=BEGIN,id=843}{color}, 
> valueSchema=Schema\{io.debezium.connector.common.TransactionMetadataValue:STRUCT},
>  timestamp=null, headers=ConnectHeaders(headers=)} for splitState = 
> StreamSplitState\{startingOffset=Offset{lsn=LSN{0/1DC0BA0}, txId=840, 
> lastCommitTs=-9223372036854775808}, 
> endingOffset=Offset\{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null, 
> lastCommitTs=-9223372036853775810}, 
> split=StreamSplit\{splitId='stream-split', offset=Offset{lsn=LSN{0/1DC0BA0}, 
> txId=840, lastCommitTs=-9223372036854775808}, 
> endOffset=Offset\{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null, 
> lastCommitTs=-9223372036853775810}, isSuspended=false, 
> isSnapshotCompleted=true}}, just skip.}}{color}
>  
> {color:#172b4d}Draft PR for MySQL connector with proposed changes - 
> [https://github.com/apache/flink-cdc/pull/4170]{color}
> {color:#172b4d}I would be happy to create a corresponding PR for the base 
> Incremental Source Record Emitter as well should we accept this 
> proposal{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to