This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f909f458 [FLINK-38691] [cdc connector mysql] Support for MySQL 
Transaction Boundary Events in Flink CDC Connector (#4170)
7f909f458 is described below

commit 7f909f458e49c1b0aaff83484fe13af37da4132e
Author: Tejansh <[email protected]>
AuthorDate: Tue Jan 27 16:07:54 2026 +0000

    [FLINK-38691] [cdc connector mysql] Support for MySQL Transaction Boundary 
Events in Flink CDC Connector (#4170)
---
 .../source/reader/MySqlPipelineRecordEmitter.java  |   3 +-
 .../cdc/connectors/mysql/source/MySqlSource.java   |   3 +-
 .../mysql/source/MySqlSourceBuilder.java           |   7 +
 .../mysql/source/config/MySqlSourceConfig.java     |   7 +
 .../source/config/MySqlSourceConfigFactory.java    |  15 +
 .../mysql/source/reader/MySqlRecordEmitter.java    |  10 +-
 .../connectors/mysql/source/utils/RecordUtils.java |  14 +
 .../source/reader/MySqlRecordEmitterTest.java      | 337 +++++++++++++++++++++
 .../mysql/source/reader/MySqlSourceReaderTest.java |  10 +-
 9 files changed, 401 insertions(+), 5 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index bf4f15cee..eb9e658cf 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -93,7 +93,8 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
                 debeziumDeserializationSchema,
                 sourceReaderMetrics,
                 sourceConfig.isIncludeSchemaChanges(),
-                false); // Explicitly disable heartbeat events
+                false, // Explicitly disable heartbeat events
+                false); // Explicitly disable transaction metadata events
         this.debeziumDeserializationSchema = debeziumDeserializationSchema;
         this.sourceConfig = sourceConfig;
         this.alreadySendCreateTableTables = new HashSet<>();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
index d7f030574..1ec4c50ea 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
@@ -133,7 +133,8 @@ public class MySqlSource<T>
                                 deserializationSchema,
                                 sourceReaderMetrics,
                                 sourceConfig.isIncludeSchemaChanges(),
-                                sourceConfig.isIncludeHeartbeatEvents()));
+                                sourceConfig.isIncludeHeartbeatEvents(),
+                                
sourceConfig.isIncludeTransactionMetadataEvents()));
     }
 
     MySqlSource(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
index b37ed19e6..93fa2a0d3 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
@@ -198,6 +198,13 @@ public class MySqlSourceBuilder<T> {
         return this;
     }
 
+    /** Whether the {@link MySqlSource} should output the transaction metadata 
events or not. */
+    public MySqlSourceBuilder<T> includeTransactionMetadataEvents(
+            boolean includeTransactionMetadataEvents) {
+        
this.configFactory.includeTransactionMetadataEvents(includeTransactionMetadataEvents);
+        return this;
+    }
+
     /** Whether the {@link MySqlSource} should scan the newly added tables or 
not. */
     public MySqlSourceBuilder<T> scanNewlyAddedTableEnabled(boolean 
scanNewlyAddedTableEnabled) {
         
this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
index 49570463b..cf456fcae 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
@@ -63,6 +63,7 @@ public class MySqlSourceConfig implements Serializable {
     private final double distributionFactorLower;
     private final boolean includeSchemaChanges;
     private final boolean includeHeartbeatEvents;
+    private final boolean includeTransactionMetadataEvents;
     private final boolean scanNewlyAddedTableEnabled;
     private final boolean closeIdleReaders;
     private final Properties jdbcProperties;
@@ -101,6 +102,7 @@ public class MySqlSourceConfig implements Serializable {
             double distributionFactorLower,
             boolean includeSchemaChanges,
             boolean includeHeartbeatEvents,
+            boolean includeTransactionMetadataEvents,
             boolean scanNewlyAddedTableEnabled,
             boolean closeIdleReaders,
             Properties dbzProperties,
@@ -131,6 +133,7 @@ public class MySqlSourceConfig implements Serializable {
         this.distributionFactorLower = distributionFactorLower;
         this.includeSchemaChanges = includeSchemaChanges;
         this.includeHeartbeatEvents = includeHeartbeatEvents;
+        this.includeTransactionMetadataEvents = 
includeTransactionMetadataEvents;
         this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
         this.closeIdleReaders = closeIdleReaders;
         this.dbzProperties = checkNotNull(dbzProperties);
@@ -234,6 +237,10 @@ public class MySqlSourceConfig implements Serializable {
         return includeHeartbeatEvents;
     }
 
+    public boolean isIncludeTransactionMetadataEvents() {
+        return includeTransactionMetadataEvents;
+    }
+
     public boolean isScanNewlyAddedTableEnabled() {
         return scanNewlyAddedTableEnabled;
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
index 7010d732d..569b62232 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
 import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
 import org.apache.flink.table.catalog.ObjectPath;
 
+import io.debezium.config.CommonConnectorConfig;
+
 import java.io.Serializable;
 import java.time.Duration;
 import java.time.ZoneId;
@@ -64,6 +66,7 @@ public class MySqlSourceConfigFactory implements Serializable 
{
             
MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
     private boolean includeSchemaChanges = false;
     private boolean includeHeartbeatEvents = false;
+    private boolean includeTransactionMetadataEvents = false;
     private boolean scanNewlyAddedTableEnabled = false;
     private boolean closeIdleReaders = false;
     private Properties jdbcProperties;
@@ -242,6 +245,13 @@ public class MySqlSourceConfigFactory implements 
Serializable {
         return this;
     }
 
+    /** Whether the {@link MySqlSource} should output the transaction metadata 
events or not. */
+    public MySqlSourceConfigFactory includeTransactionMetadataEvents(
+            boolean includeTransactionMetadataEvents) {
+        this.includeTransactionMetadataEvents = 
includeTransactionMetadataEvents;
+        return this;
+    }
+
     /** Whether the {@link MySqlSource} should scan the newly added tables or 
not. */
     public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean 
scanNewlyAddedTableEnabled) {
         this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
@@ -366,6 +376,10 @@ public class MySqlSourceConfigFactory implements 
Serializable {
         // Note: the includeSchemaChanges parameter is used to control 
emitting the schema record,
         // only DataStream API program need to emit the schema record, the 
Table API need not
         props.setProperty("include.schema.changes", String.valueOf(true));
+        // enable transaction metadata if includeTransactionMetadataEvents is 
true
+        props.setProperty(
+                CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA.name(),
+                String.valueOf(includeTransactionMetadataEvents));
         // disable the offset flush totally
         props.setProperty("offset.flush.interval.ms", 
String.valueOf(Long.MAX_VALUE));
         // disable tombstones
@@ -420,6 +434,7 @@ public class MySqlSourceConfigFactory implements 
Serializable {
                 distributionFactorLower,
                 includeSchemaChanges,
                 includeHeartbeatEvents,
+                includeTransactionMetadataEvents,
                 scanNewlyAddedTableEnabled,
                 closeIdleReaders,
                 props,
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
index 486e637ce..79066a512 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
@@ -54,17 +54,20 @@ public class MySqlRecordEmitter<T> implements 
RecordEmitter<SourceRecords, T, My
     private final MySqlSourceReaderMetrics sourceReaderMetrics;
     private final boolean includeSchemaChanges;
     private final boolean includeHeartbeatEvents;
+    private final boolean includeTransactionMetadataEvents;
     private final OutputCollector<T> outputCollector;
 
     public MySqlRecordEmitter(
             DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
             MySqlSourceReaderMetrics sourceReaderMetrics,
             boolean includeSchemaChanges,
-            boolean includeHeartbeatEvents) {
+            boolean includeHeartbeatEvents,
+            boolean includeTransactionMetadataEvents) {
         this.debeziumDeserializationSchema = debeziumDeserializationSchema;
         this.sourceReaderMetrics = sourceReaderMetrics;
         this.includeSchemaChanges = includeSchemaChanges;
         this.includeHeartbeatEvents = includeHeartbeatEvents;
+        this.includeTransactionMetadataEvents = 
includeTransactionMetadataEvents;
         this.outputCollector = new OutputCollector<>();
     }
 
@@ -108,6 +111,11 @@ public class MySqlRecordEmitter<T> implements 
RecordEmitter<SourceRecords, T, My
             if (includeHeartbeatEvents) {
                 emitElement(element, output);
             }
+        } else if (RecordUtils.isTransactionMetadataEvent(element)) {
+            updateStartingOffsetForSplit(splitState, element);
+            if (includeTransactionMetadataEvents) {
+                emitElement(element, output);
+            }
         } else {
             // unknown element
             LOG.info("Meet unknown element {}, just skip.", element);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
index 8ae91ad9d..bf4d4f29f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
@@ -73,6 +73,8 @@ public class RecordUtils {
             "io.debezium.connector.mysql.SchemaChangeKey";
     public static final String SCHEMA_HEARTBEAT_EVENT_KEY_NAME =
             "io.debezium.connector.common.Heartbeat";
+    public static final String SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME =
+            "io.debezium.connector.common.TransactionMetadataKey";
     private static final DocumentReader DOCUMENT_READER = 
DocumentReader.defaultReader();
 
     /** Converts a {@link ResultSet} row to an array of Objects. */
@@ -339,6 +341,18 @@ public class RecordUtils {
                 && 
SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
     }
 
+    /**
+     * Check whether the given source record is a transaction metadata event 
(BEGIN or END).
+     *
+     * <p>Transaction events are emitted by Debezium to mark transaction 
boundaries when
+     * provide.transaction.metadata is enabled.
+     */
+    public static boolean isTransactionMetadataEvent(SourceRecord record) {
+        Schema keySchema = record.keySchema();
+        return keySchema != null
+                && 
SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name());
+    }
+
     /**
      * Return the finished snapshot split information.
      *
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java
index 7a3764461..22167e0e5 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
 import 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplitState;
 import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
+import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
 import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
@@ -36,11 +37,17 @@ import io.debezium.jdbc.JdbcConfiguration;
 import io.debezium.relational.TableId;
 import io.debezium.schema.TopicSelector;
 import io.debezium.util.SchemaNameAdjuster;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static io.debezium.config.CommonConnectorConfig.TRANSACTION_TOPIC;
 import static io.debezium.connector.mysql.MySqlConnectorConfig.SERVER_NAME;
@@ -103,9 +110,233 @@ class MySqlRecordEmitterTest {
                 new MySqlSourceReaderMetrics(
                         
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()),
                 false,
+                false,
                 false);
     }
 
+    @Test
+    void testTransactionMetadataEventsDisabledByDefault() throws Exception {
+        SourceRecord transactionBeginEvent =
+                createTransactionMetadataEvent("BEGIN", "tx-123", 100L);
+
+        
Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent))
+                .isTrue();
+
+        AtomicInteger emittedRecordsCount = new AtomicInteger(0);
+        MySqlRecordEmitter<String> recordEmitter =
+                createRecordEmitterWithTransactionConfig(emittedRecordsCount, 
false);
+        MySqlBinlogSplitState splitState = createBinlogSplitState();
+
+        BinlogOffset offsetBeforeEmit = splitState.getStartingOffset();
+
+        TestingReaderOutput<String> readerOutput = new TestingReaderOutput<>();
+        recordEmitter.emitRecord(
+                SourceRecords.fromSingleRecord(transactionBeginEvent), 
readerOutput, splitState);
+
+        // Verify the offset was updated (this should always happen)
+        BinlogOffset expectedOffset = 
RecordUtils.getBinlogPosition(transactionBeginEvent);
+        Assertions.assertThat(splitState.getStartingOffset())
+                .isNotNull()
+                .isNotEqualTo(offsetBeforeEmit)
+                .isEqualByComparingTo(expectedOffset);
+
+        // Verify the event was NOT emitted (because 
includeTransactionMetadataEvents=false)
+        Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(0);
+        Assertions.assertThat(readerOutput.getEmittedRecords()).isEmpty();
+    }
+
+    @Test
+    void testTransactionMetadataEventsEnabledExplicitly() throws Exception {
+        SourceRecord transactionBeginEvent =
+                createTransactionMetadataEvent("BEGIN", "tx-456", 150L);
+
+        
Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent))
+                .isTrue();
+
+        AtomicInteger emittedRecordsCount = new AtomicInteger(0);
+        MySqlRecordEmitter<String> recordEmitter =
+                createRecordEmitterWithTransactionConfig(emittedRecordsCount, 
true);
+        MySqlBinlogSplitState splitState = createBinlogSplitState();
+
+        BinlogOffset offsetBeforeEmit = splitState.getStartingOffset();
+
+        TestingReaderOutput<String> readerOutput = new TestingReaderOutput<>();
+        recordEmitter.emitRecord(
+                SourceRecords.fromSingleRecord(transactionBeginEvent), 
readerOutput, splitState);
+
+        // Verify the offset was updated
+        BinlogOffset expectedOffset = 
RecordUtils.getBinlogPosition(transactionBeginEvent);
+        Assertions.assertThat(splitState.getStartingOffset())
+                .isNotNull()
+                .isNotEqualTo(offsetBeforeEmit)
+                .isEqualByComparingTo(expectedOffset);
+
+        // Verify the event was emitted (because 
includeTransactionMetadataEvents=true)
+        Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1);
+        Assertions.assertThat(readerOutput.getEmittedRecords()).hasSize(1);
+    }
+
+    @Test
+    void testTransactionBeginEventHandling() throws Exception {
+        SourceRecord transactionBeginEvent =
+                createTransactionMetadataEvent("BEGIN", "tx-123", 100L);
+
+        
Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent))
+                .isTrue();
+
+        AtomicInteger emittedRecordsCount = new AtomicInteger(0);
+        MySqlRecordEmitter<String> recordEmitter =
+                createRecordEmitterWithCounter(emittedRecordsCount);
+        MySqlBinlogSplitState splitState = createBinlogSplitState();
+
+        BinlogOffset offsetBeforeEmit = splitState.getStartingOffset();
+
+        TestingReaderOutput<String> readerOutput = new TestingReaderOutput<>();
+        recordEmitter.emitRecord(
+                SourceRecords.fromSingleRecord(transactionBeginEvent), 
readerOutput, splitState);
+
+        // Verify the offset was updated
+        BinlogOffset expectedOffset = 
RecordUtils.getBinlogPosition(transactionBeginEvent);
+        Assertions.assertThat(splitState.getStartingOffset())
+                .isNotNull()
+                .isNotEqualTo(offsetBeforeEmit)
+                .isEqualByComparingTo(expectedOffset);
+
+        // Verify the event was emitted
+        Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1);
+    }
+
+    @Test
+    void testTransactionEndEventHandling() throws Exception {
+        SourceRecord transactionEndEvent = 
createTransactionMetadataEvent("END", "tx-123", 200L);
+
+        
Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionEndEvent)).isTrue();
+
+        AtomicInteger emittedRecordsCount = new AtomicInteger(0);
+        MySqlRecordEmitter<String> recordEmitter =
+                createRecordEmitterWithCounter(emittedRecordsCount);
+        MySqlBinlogSplitState splitState = createBinlogSplitState();
+
+        TestingReaderOutput<String> readerOutput = new TestingReaderOutput<>();
+        recordEmitter.emitRecord(
+                SourceRecords.fromSingleRecord(transactionEndEvent), 
readerOutput, splitState);
+
+        // Verify the offset was updated
+        BinlogOffset expectedOffset = 
RecordUtils.getBinlogPosition(transactionEndEvent);
+        Assertions.assertThat(splitState.getStartingOffset())
+                .isNotNull()
+                .isEqualByComparingTo(expectedOffset);
+
+        // Verify the event was emitted
+        Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1);
+    }
+
+    @Test
+    void testNonTransactionEventNotDetected() {
+        Schema keySchema = SchemaBuilder.struct().field("id", 
Schema.INT32_SCHEMA).build();
+        Schema valueSchema = SchemaBuilder.struct().field("op", 
Schema.STRING_SCHEMA).build();
+
+        Struct key = new Struct(keySchema).put("id", 1);
+        Struct value = new Struct(valueSchema).put("op", "c");
+
+        Map<String, Object> offset = new HashMap<>();
+        offset.put("file", "mysql-bin.000001");
+        offset.put("pos", 100L);
+
+        SourceRecord dataRecord =
+                new SourceRecord(
+                        Collections.singletonMap("server", "mysql"),
+                        offset,
+                        "test.table",
+                        keySchema,
+                        key,
+                        valueSchema,
+                        value);
+
+        // Verify it's NOT detected as a transaction metadata event
+        
Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(dataRecord)).isFalse();
+    }
+
+    @Test
+    void testTransactionEventWithoutKeySchemaNotDetected() {
+        Schema valueSchema =
+                SchemaBuilder.struct()
+                        
.name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME)
+                        .field("status", Schema.STRING_SCHEMA)
+                        .build();
+
+        Struct value = new Struct(valueSchema).put("status", "BEGIN");
+
+        Map<String, Object> offset = new HashMap<>();
+        offset.put("file", "mysql-bin.000001");
+        offset.put("pos", 100L);
+
+        SourceRecord record =
+                new SourceRecord(
+                        Collections.singletonMap("server", "mysql"),
+                        offset,
+                        "transaction.topic",
+                        null, // No key schema
+                        null,
+                        valueSchema,
+                        value);
+
+        // Verify it's NOT detected as a transaction metadata event
+        
Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(record)).isFalse();
+    }
+
+    @Test
+    void testMultipleTransactionEventsWithDisabledConfig() throws Exception {
+        SourceRecord beginEvent = createTransactionMetadataEvent("BEGIN", 
"tx-789", 300L);
+        SourceRecord endEvent = createTransactionMetadataEvent("END", 
"tx-789", 400L);
+
+        AtomicInteger emittedRecordsCount = new AtomicInteger(0);
+        MySqlRecordEmitter<String> recordEmitter =
+                createRecordEmitterWithTransactionConfig(emittedRecordsCount, 
false);
+        MySqlBinlogSplitState splitState = createBinlogSplitState();
+
+        TestingReaderOutput<String> readerOutput = new TestingReaderOutput<>();
+
+        recordEmitter.emitRecord(
+                SourceRecords.fromSingleRecord(beginEvent), readerOutput, 
splitState);
+
+        recordEmitter.emitRecord(
+                SourceRecords.fromSingleRecord(endEvent), readerOutput, 
splitState);
+
+        // Verify offsets were updated but no events were emitted
+        BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(endEvent);
+        Assertions.assertThat(splitState.getStartingOffset())
+                .isNotNull()
+                .isEqualByComparingTo(expectedOffset);
+
+        // Verify no events were emitted (because 
includeTransactionMetadataEvents=false)
+        Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(0);
+        Assertions.assertThat(readerOutput.getEmittedRecords()).isEmpty();
+    }
+
+    @Test
+    void testMixedEventsWithTransactionMetadataDisabled() throws Exception {
+        SourceRecord transactionEvent = 
createTransactionMetadataEvent("BEGIN", "tx-mixed", 500L);
+        SourceRecord dataEvent = createDataChangeEvent("test.table", 501L);
+
+        AtomicInteger emittedRecordsCount = new AtomicInteger(0);
+        MySqlRecordEmitter<String> recordEmitter =
+                createRecordEmitterWithTransactionConfig(emittedRecordsCount, 
false);
+        MySqlBinlogSplitState splitState = createBinlogSplitState();
+
+        TestingReaderOutput<String> readerOutput = new TestingReaderOutput<>();
+
+        recordEmitter.emitRecord(
+                SourceRecords.fromSingleRecord(transactionEvent), 
readerOutput, splitState);
+
+        recordEmitter.emitRecord(
+                SourceRecords.fromSingleRecord(dataEvent), readerOutput, 
splitState);
+
+        // Verify only data event was emitted (count=1, not 2)
+        Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1);
+        Assertions.assertThat(readerOutput.getEmittedRecords()).hasSize(1);
+    }
+
     private MySqlBinlogSplitState createBinlogSplitState() {
         return new MySqlBinlogSplitState(
                 new MySqlBinlogSplit(
@@ -116,4 +347,110 @@ class MySqlRecordEmitterTest {
                         Collections.emptyMap(),
                         0));
     }
+
+    /** Helper method to create a MySqlRecordEmitter that counts emitted 
records. */
+    private MySqlRecordEmitter<String> 
createRecordEmitterWithCounter(AtomicInteger counter) {
+        return createRecordEmitterWithTransactionConfig(counter, true);
+    }
+
+    /**
+     * Helper method to create a MySqlRecordEmitter with configurable 
transaction metadata events.
+     */
+    private MySqlRecordEmitter<String> 
createRecordEmitterWithTransactionConfig(
+            AtomicInteger counter, boolean includeTransactionMetadataEvents) {
+        return new MySqlRecordEmitter<>(
+                new DebeziumDeserializationSchema<String>() {
+                    @Override
+                    public void deserialize(SourceRecord record, 
Collector<String> out) {
+                        counter.incrementAndGet();
+                        out.collect("transaction-event");
+                    }
+
+                    @Override
+                    public TypeInformation<String> getProducedType() {
+                        return TypeInformation.of(String.class);
+                    }
+                },
+                new MySqlSourceReaderMetrics(
+                        
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()),
+                false,
+                false,
+                includeTransactionMetadataEvents);
+    }
+
+    private SourceRecord createTransactionMetadataEvent(
+            String status, String transactionId, long position) {
+        Schema keySchema =
+                SchemaBuilder.struct()
+                        
.name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME)
+                        .field("id", Schema.STRING_SCHEMA)
+                        .build();
+
+        Schema valueSchema =
+                SchemaBuilder.struct()
+                        
.name("io.debezium.connector.common.TransactionMetadataValue")
+                        .field("status", Schema.STRING_SCHEMA)
+                        .field("id", Schema.STRING_SCHEMA)
+                        .field("event_count", Schema.OPTIONAL_INT64_SCHEMA)
+                        .field("ts_ms", Schema.INT64_SCHEMA)
+                        .build();
+
+        Struct key = new Struct(keySchema).put("id", transactionId);
+
+        Struct value =
+                new Struct(valueSchema)
+                        .put("status", status)
+                        .put("id", transactionId)
+                        .put("ts_ms", System.currentTimeMillis());
+
+        if ("END".equals(status)) {
+            value.put("event_count", 5L);
+        }
+
+        Map<String, Object> offset = new HashMap<>();
+        offset.put("file", "mysql-bin.000001");
+        offset.put("pos", position);
+        offset.put("transaction_id", transactionId);
+
+        return new SourceRecord(
+                Collections.singletonMap("server", "mysql_binlog_source"),
+                offset,
+                "mysql_binlog_source.transaction",
+                keySchema,
+                key,
+                valueSchema,
+                value);
+    }
+
+    private SourceRecord createDataChangeEvent(String topicName, long 
position) {
+        Schema keySchema = SchemaBuilder.struct().field("id", 
Schema.INT32_SCHEMA).build();
+        Schema valueSchema =
+                SchemaBuilder.struct()
+                        .field("op", Schema.STRING_SCHEMA)
+                        .field(
+                                "after",
+                                SchemaBuilder.struct()
+                                        .field("id", Schema.INT32_SCHEMA)
+                                        .field("name", Schema.STRING_SCHEMA)
+                                        .optional())
+                        .build();
+
+        Struct key = new Struct(keySchema).put("id", 1);
+        Struct after =
+                new Struct(valueSchema.field("after").schema()).put("id", 
1).put("name", "test");
+        Struct value = new Struct(valueSchema).put("op", "c").put("after", 
after);
+
+        Map<String, Object> offset = new HashMap<>();
+        offset.put("file", "mysql-bin.000001");
+        offset.put("pos", position);
+
+        return new SourceRecord(
+                Collections.singletonMap("server", "mysql"),
+                offset,
+                topicName,
+                keySchema,
+                key,
+                valueSchema,
+                value);
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
index 08c337967..7c9ce5ba9 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
@@ -574,7 +574,8 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
                                 new ForwardDeserializeSchema(),
                                 new 
MySqlSourceReaderMetrics(readerContext.metricGroup()),
                                 configuration.isIncludeSchemaChanges(),
-                                configuration.isIncludeHeartbeatEvents());
+                                configuration.isIncludeHeartbeatEvents(),
+                                
configuration.isIncludeTransactionMetadataEvents());
         final MySqlSourceReaderContext mySqlSourceReaderContext =
                 new MySqlSourceReaderContext(readerContext);
         return new MySqlSourceReader<>(
@@ -741,7 +742,12 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
                 MySqlSourceReaderMetrics sourceReaderMetrics,
                 boolean includeSchemaChanges,
                 int limit) {
-            super(debeziumDeserializationSchema, sourceReaderMetrics, 
includeSchemaChanges, false);
+            super(
+                    debeziumDeserializationSchema,
+                    sourceReaderMetrics,
+                    includeSchemaChanges,
+                    false,
+                    false);
             this.debeziumDeserializationSchema = debeziumDeserializationSchema;
             this.sourceReaderMetrics = sourceReaderMetrics;
             this.includeSchemaChanges = includeSchemaChanges;

Reply via email to