This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 7f909f458 [FLINK-38691] [cdc connector mysql] Support for MySQL
Transaction Boundary Events in Flink CDC Connector (#4170)
7f909f458 is described below
commit 7f909f458e49c1b0aaff83484fe13af37da4132e
Author: Tejansh <[email protected]>
AuthorDate: Tue Jan 27 16:07:54 2026 +0000
[FLINK-38691] [cdc connector mysql] Support for MySQL Transaction Boundary
Events in Flink CDC Connector (#4170)
---
.../source/reader/MySqlPipelineRecordEmitter.java | 3 +-
.../cdc/connectors/mysql/source/MySqlSource.java | 3 +-
.../mysql/source/MySqlSourceBuilder.java | 7 +
.../mysql/source/config/MySqlSourceConfig.java | 7 +
.../source/config/MySqlSourceConfigFactory.java | 15 +
.../mysql/source/reader/MySqlRecordEmitter.java | 10 +-
.../connectors/mysql/source/utils/RecordUtils.java | 14 +
.../source/reader/MySqlRecordEmitterTest.java | 337 +++++++++++++++++++++
.../mysql/source/reader/MySqlSourceReaderTest.java | 10 +-
9 files changed, 401 insertions(+), 5 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index bf4f15cee..eb9e658cf 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -93,7 +93,8 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
debeziumDeserializationSchema,
sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges(),
- false); // Explicitly disable heartbeat events
+ false, // Explicitly disable heartbeat events
+ false); // Explicitly disable transaction metadata events
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
this.sourceConfig = sourceConfig;
this.alreadySendCreateTableTables = new HashSet<>();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
index d7f030574..1ec4c50ea 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
@@ -133,7 +133,8 @@ public class MySqlSource<T>
deserializationSchema,
sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges(),
- sourceConfig.isIncludeHeartbeatEvents()));
+ sourceConfig.isIncludeHeartbeatEvents(),
+
sourceConfig.isIncludeTransactionMetadataEvents()));
}
MySqlSource(
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
index b37ed19e6..93fa2a0d3 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
@@ -198,6 +198,13 @@ public class MySqlSourceBuilder<T> {
return this;
}
+ /** Whether the {@link MySqlSource} should output the transaction metadata
events or not. */
+ public MySqlSourceBuilder<T> includeTransactionMetadataEvents(
+ boolean includeTransactionMetadataEvents) {
+
this.configFactory.includeTransactionMetadataEvents(includeTransactionMetadataEvents);
+ return this;
+ }
+
/** Whether the {@link MySqlSource} should scan the newly added tables or
not. */
public MySqlSourceBuilder<T> scanNewlyAddedTableEnabled(boolean
scanNewlyAddedTableEnabled) {
this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
index 49570463b..cf456fcae 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
@@ -63,6 +63,7 @@ public class MySqlSourceConfig implements Serializable {
private final double distributionFactorLower;
private final boolean includeSchemaChanges;
private final boolean includeHeartbeatEvents;
+ private final boolean includeTransactionMetadataEvents;
private final boolean scanNewlyAddedTableEnabled;
private final boolean closeIdleReaders;
private final Properties jdbcProperties;
@@ -101,6 +102,7 @@ public class MySqlSourceConfig implements Serializable {
double distributionFactorLower,
boolean includeSchemaChanges,
boolean includeHeartbeatEvents,
+ boolean includeTransactionMetadataEvents,
boolean scanNewlyAddedTableEnabled,
boolean closeIdleReaders,
Properties dbzProperties,
@@ -131,6 +133,7 @@ public class MySqlSourceConfig implements Serializable {
this.distributionFactorLower = distributionFactorLower;
this.includeSchemaChanges = includeSchemaChanges;
this.includeHeartbeatEvents = includeHeartbeatEvents;
+ this.includeTransactionMetadataEvents =
includeTransactionMetadataEvents;
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
this.closeIdleReaders = closeIdleReaders;
this.dbzProperties = checkNotNull(dbzProperties);
@@ -234,6 +237,10 @@ public class MySqlSourceConfig implements Serializable {
return includeHeartbeatEvents;
}
+ public boolean isIncludeTransactionMetadataEvents() {
+ return includeTransactionMetadataEvents;
+ }
+
public boolean isScanNewlyAddedTableEnabled() {
return scanNewlyAddedTableEnabled;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
index 7010d732d..569b62232 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
@@ -23,6 +23,8 @@ import
org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.table.catalog.ObjectPath;
+import io.debezium.config.CommonConnectorConfig;
+
import java.io.Serializable;
import java.time.Duration;
import java.time.ZoneId;
@@ -64,6 +66,7 @@ public class MySqlSourceConfigFactory implements Serializable
{
MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
private boolean includeSchemaChanges = false;
private boolean includeHeartbeatEvents = false;
+ private boolean includeTransactionMetadataEvents = false;
private boolean scanNewlyAddedTableEnabled = false;
private boolean closeIdleReaders = false;
private Properties jdbcProperties;
@@ -242,6 +245,13 @@ public class MySqlSourceConfigFactory implements
Serializable {
return this;
}
+ /** Whether the {@link MySqlSource} should output the transaction metadata
events or not. */
+ public MySqlSourceConfigFactory includeTransactionMetadataEvents(
+ boolean includeTransactionMetadataEvents) {
+ this.includeTransactionMetadataEvents =
includeTransactionMetadataEvents;
+ return this;
+ }
+
/** Whether the {@link MySqlSource} should scan the newly added tables or
not. */
public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean
scanNewlyAddedTableEnabled) {
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
@@ -366,6 +376,10 @@ public class MySqlSourceConfigFactory implements
Serializable {
// Note: the includeSchemaChanges parameter is used to control
emitting the schema record,
// only DataStream API program need to emit the schema record, the
Table API need not
props.setProperty("include.schema.changes", String.valueOf(true));
+ // enable transaction metadata if includeTransactionMetadataEvents is
true
+ props.setProperty(
+ CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA.name(),
+ String.valueOf(includeTransactionMetadataEvents));
// disable the offset flush totally
props.setProperty("offset.flush.interval.ms",
String.valueOf(Long.MAX_VALUE));
// disable tombstones
@@ -420,6 +434,7 @@ public class MySqlSourceConfigFactory implements
Serializable {
distributionFactorLower,
includeSchemaChanges,
includeHeartbeatEvents,
+ includeTransactionMetadataEvents,
scanNewlyAddedTableEnabled,
closeIdleReaders,
props,
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
index 486e637ce..79066a512 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
@@ -54,17 +54,20 @@ public class MySqlRecordEmitter<T> implements
RecordEmitter<SourceRecords, T, My
private final MySqlSourceReaderMetrics sourceReaderMetrics;
private final boolean includeSchemaChanges;
private final boolean includeHeartbeatEvents;
+ private final boolean includeTransactionMetadataEvents;
private final OutputCollector<T> outputCollector;
public MySqlRecordEmitter(
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
MySqlSourceReaderMetrics sourceReaderMetrics,
boolean includeSchemaChanges,
- boolean includeHeartbeatEvents) {
+ boolean includeHeartbeatEvents,
+ boolean includeTransactionMetadataEvents) {
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
this.sourceReaderMetrics = sourceReaderMetrics;
this.includeSchemaChanges = includeSchemaChanges;
this.includeHeartbeatEvents = includeHeartbeatEvents;
+ this.includeTransactionMetadataEvents =
includeTransactionMetadataEvents;
this.outputCollector = new OutputCollector<>();
}
@@ -108,6 +111,11 @@ public class MySqlRecordEmitter<T> implements
RecordEmitter<SourceRecords, T, My
if (includeHeartbeatEvents) {
emitElement(element, output);
}
+ } else if (RecordUtils.isTransactionMetadataEvent(element)) {
+ updateStartingOffsetForSplit(splitState, element);
+ if (includeTransactionMetadataEvents) {
+ emitElement(element, output);
+ }
} else {
// unknown element
LOG.info("Meet unknown element {}, just skip.", element);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
index 8ae91ad9d..bf4d4f29f 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
@@ -73,6 +73,8 @@ public class RecordUtils {
"io.debezium.connector.mysql.SchemaChangeKey";
public static final String SCHEMA_HEARTBEAT_EVENT_KEY_NAME =
"io.debezium.connector.common.Heartbeat";
+ public static final String SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME =
+ "io.debezium.connector.common.TransactionMetadataKey";
private static final DocumentReader DOCUMENT_READER =
DocumentReader.defaultReader();
/** Converts a {@link ResultSet} row to an array of Objects. */
@@ -339,6 +341,18 @@ public class RecordUtils {
&&
SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
}
+ /**
+ * Check whether the given source record is a transaction metadata event
(BEGIN or END).
+ *
+ * <p>Transaction events are emitted by Debezium to mark transaction
boundaries when
+ * provide.transaction.metadata is enabled.
+ */
+ public static boolean isTransactionMetadataEvent(SourceRecord record) {
+ Schema keySchema = record.keySchema();
+ return keySchema != null
+ &&
SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name());
+ }
+
/**
* Return the finished snapshot split information.
*
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java
index 7a3764461..22167e0e5 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java
@@ -23,6 +23,7 @@ import
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import
org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplitState;
import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
+import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
@@ -36,11 +37,17 @@ import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import static io.debezium.config.CommonConnectorConfig.TRANSACTION_TOPIC;
import static io.debezium.connector.mysql.MySqlConnectorConfig.SERVER_NAME;
@@ -103,9 +110,233 @@ class MySqlRecordEmitterTest {
new MySqlSourceReaderMetrics(
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()),
false,
+ false,
false);
}
+ @Test
+ void testTransactionMetadataEventsDisabledByDefault() throws Exception {
+ SourceRecord transactionBeginEvent =
+ createTransactionMetadataEvent("BEGIN", "tx-123", 100L);
+
+
Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent))
+ .isTrue();
+
+ AtomicInteger emittedRecordsCount = new AtomicInteger(0);
+ MySqlRecordEmitter<String> recordEmitter =
+ createRecordEmitterWithTransactionConfig(emittedRecordsCount,
false);
+ MySqlBinlogSplitState splitState = createBinlogSplitState();
+
+ BinlogOffset offsetBeforeEmit = splitState.getStartingOffset();
+
+ TestingReaderOutput<String> readerOutput = new TestingReaderOutput<>();
+ recordEmitter.emitRecord(
+ SourceRecords.fromSingleRecord(transactionBeginEvent),
readerOutput, splitState);
+
+ // Verify the offset was updated (this should always happen)
+ BinlogOffset expectedOffset =
RecordUtils.getBinlogPosition(transactionBeginEvent);
+ Assertions.assertThat(splitState.getStartingOffset())
+ .isNotNull()
+ .isNotEqualTo(offsetBeforeEmit)
+ .isEqualByComparingTo(expectedOffset);
+
+ // Verify the event was NOT emitted (because
includeTransactionMetadataEvents=false)
+ Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(0);
+ Assertions.assertThat(readerOutput.getEmittedRecords()).isEmpty();
+ }
+
+ @Test
+ void testTransactionMetadataEventsEnabledExplicitly() throws Exception {
+ SourceRecord transactionBeginEvent =
+ createTransactionMetadataEvent("BEGIN", "tx-456", 150L);
+
+
Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent))
+ .isTrue();
+
+ AtomicInteger emittedRecordsCount = new AtomicInteger(0);
+ MySqlRecordEmitter<String> recordEmitter =
+ createRecordEmitterWithTransactionConfig(emittedRecordsCount,
true);
+ MySqlBinlogSplitState splitState = createBinlogSplitState();
+
+ BinlogOffset offsetBeforeEmit = splitState.getStartingOffset();
+
+ TestingReaderOutput<String> readerOutput = new TestingReaderOutput<>();
+ recordEmitter.emitRecord(
+ SourceRecords.fromSingleRecord(transactionBeginEvent),
readerOutput, splitState);
+
+ // Verify the offset was updated
+ BinlogOffset expectedOffset =
RecordUtils.getBinlogPosition(transactionBeginEvent);
+ Assertions.assertThat(splitState.getStartingOffset())
+ .isNotNull()
+ .isNotEqualTo(offsetBeforeEmit)
+ .isEqualByComparingTo(expectedOffset);
+
+ // Verify the event was emitted (because
includeTransactionMetadataEvents=true)
+ Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1);
+ Assertions.assertThat(readerOutput.getEmittedRecords()).hasSize(1);
+ }
+
+ @Test
+ void testTransactionBeginEventHandling() throws Exception {
+ SourceRecord transactionBeginEvent =
+ createTransactionMetadataEvent("BEGIN", "tx-123", 100L);
+
+
Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent))
+ .isTrue();
+
+ AtomicInteger emittedRecordsCount = new AtomicInteger(0);
+ MySqlRecordEmitter<String> recordEmitter =
+ createRecordEmitterWithCounter(emittedRecordsCount);
+ MySqlBinlogSplitState splitState = createBinlogSplitState();
+
+ BinlogOffset offsetBeforeEmit = splitState.getStartingOffset();
+
+ TestingReaderOutput<String> readerOutput = new TestingReaderOutput<>();
+ recordEmitter.emitRecord(
+ SourceRecords.fromSingleRecord(transactionBeginEvent),
readerOutput, splitState);
+
+ // Verify the offset was updated
+ BinlogOffset expectedOffset =
RecordUtils.getBinlogPosition(transactionBeginEvent);
+ Assertions.assertThat(splitState.getStartingOffset())
+ .isNotNull()
+ .isNotEqualTo(offsetBeforeEmit)
+ .isEqualByComparingTo(expectedOffset);
+
+ // Verify the event was emitted
+ Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1);
+ }
+
+ @Test
+ void testTransactionEndEventHandling() throws Exception {
+ SourceRecord transactionEndEvent =
createTransactionMetadataEvent("END", "tx-123", 200L);
+
+
Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionEndEvent)).isTrue();
+
+ AtomicInteger emittedRecordsCount = new AtomicInteger(0);
+ MySqlRecordEmitter<String> recordEmitter =
+ createRecordEmitterWithCounter(emittedRecordsCount);
+ MySqlBinlogSplitState splitState = createBinlogSplitState();
+
+ TestingReaderOutput<String> readerOutput = new TestingReaderOutput<>();
+ recordEmitter.emitRecord(
+ SourceRecords.fromSingleRecord(transactionEndEvent),
readerOutput, splitState);
+
+ // Verify the offset was updated
+ BinlogOffset expectedOffset =
RecordUtils.getBinlogPosition(transactionEndEvent);
+ Assertions.assertThat(splitState.getStartingOffset())
+ .isNotNull()
+ .isEqualByComparingTo(expectedOffset);
+
+ // Verify the event was emitted
+ Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1);
+ }
+
+ @Test
+ void testNonTransactionEventNotDetected() {
+ Schema keySchema = SchemaBuilder.struct().field("id",
Schema.INT32_SCHEMA).build();
+ Schema valueSchema = SchemaBuilder.struct().field("op",
Schema.STRING_SCHEMA).build();
+
+ Struct key = new Struct(keySchema).put("id", 1);
+ Struct value = new Struct(valueSchema).put("op", "c");
+
+ Map<String, Object> offset = new HashMap<>();
+ offset.put("file", "mysql-bin.000001");
+ offset.put("pos", 100L);
+
+ SourceRecord dataRecord =
+ new SourceRecord(
+ Collections.singletonMap("server", "mysql"),
+ offset,
+ "test.table",
+ keySchema,
+ key,
+ valueSchema,
+ value);
+
+ // Verify it's NOT detected as a transaction metadata event
+
Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(dataRecord)).isFalse();
+ }
+
+ @Test
+ void testTransactionEventWithoutKeySchemaNotDetected() {
+ Schema valueSchema =
+ SchemaBuilder.struct()
+
.name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME)
+ .field("status", Schema.STRING_SCHEMA)
+ .build();
+
+ Struct value = new Struct(valueSchema).put("status", "BEGIN");
+
+ Map<String, Object> offset = new HashMap<>();
+ offset.put("file", "mysql-bin.000001");
+ offset.put("pos", 100L);
+
+ SourceRecord record =
+ new SourceRecord(
+ Collections.singletonMap("server", "mysql"),
+ offset,
+ "transaction.topic",
+ null, // No key schema
+ null,
+ valueSchema,
+ value);
+
+ // Verify it's NOT detected as a transaction metadata event
+
Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(record)).isFalse();
+ }
+
+ @Test
+ void testMultipleTransactionEventsWithDisabledConfig() throws Exception {
+ SourceRecord beginEvent = createTransactionMetadataEvent("BEGIN",
"tx-789", 300L);
+ SourceRecord endEvent = createTransactionMetadataEvent("END",
"tx-789", 400L);
+
+ AtomicInteger emittedRecordsCount = new AtomicInteger(0);
+ MySqlRecordEmitter<String> recordEmitter =
+ createRecordEmitterWithTransactionConfig(emittedRecordsCount,
false);
+ MySqlBinlogSplitState splitState = createBinlogSplitState();
+
+ TestingReaderOutput<String> readerOutput = new TestingReaderOutput<>();
+
+ recordEmitter.emitRecord(
+ SourceRecords.fromSingleRecord(beginEvent), readerOutput,
splitState);
+
+ recordEmitter.emitRecord(
+ SourceRecords.fromSingleRecord(endEvent), readerOutput,
splitState);
+
+ // Verify offsets were updated but no events were emitted
+ BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(endEvent);
+ Assertions.assertThat(splitState.getStartingOffset())
+ .isNotNull()
+ .isEqualByComparingTo(expectedOffset);
+
+ // Verify no events were emitted (because
includeTransactionMetadataEvents=false)
+ Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(0);
+ Assertions.assertThat(readerOutput.getEmittedRecords()).isEmpty();
+ }
+
+ @Test
+ void testMixedEventsWithTransactionMetadataDisabled() throws Exception {
+ SourceRecord transactionEvent =
createTransactionMetadataEvent("BEGIN", "tx-mixed", 500L);
+ SourceRecord dataEvent = createDataChangeEvent("test.table", 501L);
+
+ AtomicInteger emittedRecordsCount = new AtomicInteger(0);
+ MySqlRecordEmitter<String> recordEmitter =
+ createRecordEmitterWithTransactionConfig(emittedRecordsCount,
false);
+ MySqlBinlogSplitState splitState = createBinlogSplitState();
+
+ TestingReaderOutput<String> readerOutput = new TestingReaderOutput<>();
+
+ recordEmitter.emitRecord(
+ SourceRecords.fromSingleRecord(transactionEvent),
readerOutput, splitState);
+
+ recordEmitter.emitRecord(
+ SourceRecords.fromSingleRecord(dataEvent), readerOutput,
splitState);
+
+ // Verify only data event was emitted (count=1, not 2)
+ Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1);
+ Assertions.assertThat(readerOutput.getEmittedRecords()).hasSize(1);
+ }
+
private MySqlBinlogSplitState createBinlogSplitState() {
return new MySqlBinlogSplitState(
new MySqlBinlogSplit(
@@ -116,4 +347,110 @@ class MySqlRecordEmitterTest {
Collections.emptyMap(),
0));
}
+
+ /** Helper method to create a MySqlRecordEmitter that counts emitted
records. */
+ private MySqlRecordEmitter<String>
createRecordEmitterWithCounter(AtomicInteger counter) {
+ return createRecordEmitterWithTransactionConfig(counter, true);
+ }
+
+ /**
+ * Helper method to create a MySqlRecordEmitter with configurable
transaction metadata events.
+ */
+ private MySqlRecordEmitter<String>
createRecordEmitterWithTransactionConfig(
+ AtomicInteger counter, boolean includeTransactionMetadataEvents) {
+ return new MySqlRecordEmitter<>(
+ new DebeziumDeserializationSchema<String>() {
+ @Override
+ public void deserialize(SourceRecord record,
Collector<String> out) {
+ counter.incrementAndGet();
+ out.collect("transaction-event");
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return TypeInformation.of(String.class);
+ }
+ },
+ new MySqlSourceReaderMetrics(
+
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()),
+ false,
+ false,
+ includeTransactionMetadataEvents);
+ }
+
+ private SourceRecord createTransactionMetadataEvent(
+ String status, String transactionId, long position) {
+ Schema keySchema =
+ SchemaBuilder.struct()
+
.name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME)
+ .field("id", Schema.STRING_SCHEMA)
+ .build();
+
+ Schema valueSchema =
+ SchemaBuilder.struct()
+
.name("io.debezium.connector.common.TransactionMetadataValue")
+ .field("status", Schema.STRING_SCHEMA)
+ .field("id", Schema.STRING_SCHEMA)
+ .field("event_count", Schema.OPTIONAL_INT64_SCHEMA)
+ .field("ts_ms", Schema.INT64_SCHEMA)
+ .build();
+
+ Struct key = new Struct(keySchema).put("id", transactionId);
+
+ Struct value =
+ new Struct(valueSchema)
+ .put("status", status)
+ .put("id", transactionId)
+ .put("ts_ms", System.currentTimeMillis());
+
+ if ("END".equals(status)) {
+ value.put("event_count", 5L);
+ }
+
+ Map<String, Object> offset = new HashMap<>();
+ offset.put("file", "mysql-bin.000001");
+ offset.put("pos", position);
+ offset.put("transaction_id", transactionId);
+
+ return new SourceRecord(
+ Collections.singletonMap("server", "mysql_binlog_source"),
+ offset,
+ "mysql_binlog_source.transaction",
+ keySchema,
+ key,
+ valueSchema,
+ value);
+ }
+
+ private SourceRecord createDataChangeEvent(String topicName, long
position) {
+ Schema keySchema = SchemaBuilder.struct().field("id",
Schema.INT32_SCHEMA).build();
+ Schema valueSchema =
+ SchemaBuilder.struct()
+ .field("op", Schema.STRING_SCHEMA)
+ .field(
+ "after",
+ SchemaBuilder.struct()
+ .field("id", Schema.INT32_SCHEMA)
+ .field("name", Schema.STRING_SCHEMA)
+ .optional())
+ .build();
+
+ Struct key = new Struct(keySchema).put("id", 1);
+ Struct after =
+ new Struct(valueSchema.field("after").schema()).put("id",
1).put("name", "test");
+ Struct value = new Struct(valueSchema).put("op", "c").put("after",
after);
+
+ Map<String, Object> offset = new HashMap<>();
+ offset.put("file", "mysql-bin.000001");
+ offset.put("pos", position);
+
+ return new SourceRecord(
+ Collections.singletonMap("server", "mysql"),
+ offset,
+ topicName,
+ keySchema,
+ key,
+ valueSchema,
+ value);
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
index 08c337967..7c9ce5ba9 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
@@ -574,7 +574,8 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
new ForwardDeserializeSchema(),
new
MySqlSourceReaderMetrics(readerContext.metricGroup()),
configuration.isIncludeSchemaChanges(),
- configuration.isIncludeHeartbeatEvents());
+ configuration.isIncludeHeartbeatEvents(),
+
configuration.isIncludeTransactionMetadataEvents());
final MySqlSourceReaderContext mySqlSourceReaderContext =
new MySqlSourceReaderContext(readerContext);
return new MySqlSourceReader<>(
@@ -741,7 +742,12 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
MySqlSourceReaderMetrics sourceReaderMetrics,
boolean includeSchemaChanges,
int limit) {
- super(debeziumDeserializationSchema, sourceReaderMetrics,
includeSchemaChanges, false);
+ super(
+ debeziumDeserializationSchema,
+ sourceReaderMetrics,
+ includeSchemaChanges,
+ false,
+ false);
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
this.sourceReaderMetrics = sourceReaderMetrics;
this.includeSchemaChanges = includeSchemaChanges;