This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new eee0cc06f [FLINK-37386][cdc-runtime] Emit CreateTableEvent only when
met the related SourceRecord
eee0cc06f is described below
commit eee0cc06f7bf842034d0e22bc45f8ffb811f7f78
Author: Kunni <[email protected]>
AuthorDate: Wed Mar 12 15:58:11 2025 +0800
[FLINK-37386][cdc-runtime] Emit CreateTableEvent only when met the related
SourceRecord
This closes #3932.
---
.../source/reader/MySqlPipelineRecordEmitter.java | 47 +++++++--------
.../mysql/source/MySqlPipelineITCase.java | 70 +++++++---------------
.../flink/cdc/pipeline/tests/MysqlE2eITCase.java | 13 +++-
.../operators/sink/DataSinkWriterOperator.java | 3 +-
.../operators/transform/PreTransformOperator.java | 37 ++++++++++--
5 files changed, 89 insertions(+), 81 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index 143efa120..de058df14 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -27,8 +27,6 @@ import
org.apache.flink.cdc.connectors.mysql.schema.MySqlTableDefinition;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import
org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
-import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
-import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
import org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
@@ -48,13 +46,18 @@ import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static
org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection;
+import static
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getTableId;
+import static
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord;
import static
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isLowWatermarkEvent;
+import static
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
import static
org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables;
/** The {@link RecordEmitter} implementation for pipeline mysql connector. */
@@ -68,9 +71,7 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
// Used when startup mode is initial
private Set<TableId> alreadySendCreateTableTables;
- // Used when startup mode is not initial
- private boolean alreadySendCreateTableForBinlogSplit = false;
- private List<CreateTableEvent> createTableEventCache;
+ private Map<TableId, CreateTableEvent> createTableEventCache;
public MySqlPipelineRecordEmitter(
DebeziumDeserializationSchema<Event> debeziumDeserializationSchema,
@@ -99,23 +100,17 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
alreadySendCreateTableTables.add(tableId);
}
}
- } else if (splitState.isBinlogSplitState() &&
!alreadySendCreateTableForBinlogSplit) {
- alreadySendCreateTableForBinlogSplit = true;
- if
(sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
- // In Snapshot -> Binlog transition of INITIAL startup mode,
ensure all table
- // schemas have been sent to downstream. We use previously
cached schema instead of
- // re-request latest schema because there might be some
pending schema change events
- // in the queue, and that may accidentally emit evolved schema
before corresponding
- // schema change events.
- createTableEventCache.stream()
- .filter(
- event ->
- !alreadySendCreateTableTables.contains(
-
MySqlSchemaUtils.toDbzTableId(event.tableId())))
- .forEach(output::collect);
- } else {
- // In Binlog only mode, we simply emit all schemas at once.
- createTableEventCache.forEach(output::collect);
+ } else {
+ if (isDataChangeRecord(element) || isSchemaChangeEvent(element)) {
+ TableId tableId = getTableId(element);
+ if (!alreadySendCreateTableTables.contains(tableId)) {
+ CreateTableEvent createTableEvent =
createTableEventCache.get(tableId);
+ // New created table in binlog reading phase.
+ if (createTableEvent != null) {
+ output.collect(createTableEvent);
+ }
+ alreadySendCreateTableTables.add(tableId);
+ }
}
}
super.processElement(element, output, splitState);
@@ -246,15 +241,17 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
return mySqlAntlrDdlParser;
}
- private List<CreateTableEvent> generateCreateTableEvent(MySqlSourceConfig
sourceConfig) {
+ private Map<TableId, CreateTableEvent> generateCreateTableEvent(
+ MySqlSourceConfig sourceConfig) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
- List<CreateTableEvent> createTableEventCache = new ArrayList<>();
+ Map<TableId, CreateTableEvent> createTableEventCache = new
HashMap<>();
List<TableId> capturedTableIds =
listTables(
jdbc, sourceConfig.getDatabaseFilter(),
sourceConfig.getTableFilter());
for (TableId tableId : capturedTableIds) {
Schema schema = getSchema(jdbc, tableId);
- createTableEventCache.add(
+ createTableEventCache.put(
+ tableId,
new CreateTableEvent(
org.apache.flink.cdc.common.event.TableId.tableId(
tableId.catalog(), tableId.table()),
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 7ef6cbeb1..a5239c137 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -634,9 +634,7 @@ public class MySqlPipelineITCase extends
MySqlSourceTestBase {
.executeAndCollect();
Thread.sleep(5_000);
- List<Event> expected =
- new ArrayList<>(
-
getInventoryCreateAllTableEvents(inventoryDatabase.getDatabaseName()));
+ List<Event> expected = new ArrayList<>();
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
@@ -645,6 +643,16 @@ public class MySqlPipelineITCase extends
MySqlSourceTestBase {
String.format(
"ALTER TABLE `%s`.`customers` ADD COLUMN `newcol1`
INT NULL;",
inventoryDatabase.getDatabaseName()));
+ expected.add(
+ new CreateTableEvent(
+
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+ Schema.newBuilder()
+ .physicalColumn("id",
DataTypes.INT().notNull())
+ .physicalColumn("first_name",
DataTypes.VARCHAR(255).notNull())
+ .physicalColumn("last_name",
DataTypes.VARCHAR(255).notNull())
+ .physicalColumn("email",
DataTypes.VARCHAR(255).notNull())
+
.primaryKey(Collections.singletonList("id"))
+ .build()));
expected.add(
new AddColumnEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
@@ -737,7 +745,17 @@ public class MySqlPipelineITCase extends
MySqlSourceTestBase {
statement.execute(
String.format(
"TRUNCATE TABLE `%s`.`orders`;",
inventoryDatabase.getDatabaseName()));
-
+ expected.add(
+ new CreateTableEvent(
+
TableId.tableId(inventoryDatabase.getDatabaseName(), "orders"),
+ Schema.newBuilder()
+ .physicalColumn("order_number",
DataTypes.INT().notNull())
+ .physicalColumn("order_date",
DataTypes.DATE().notNull())
+ .physicalColumn("purchaser",
DataTypes.INT().notNull())
+ .physicalColumn("quantity",
DataTypes.INT().notNull())
+ .physicalColumn("product_id",
DataTypes.INT().notNull())
+
.primaryKey(Collections.singletonList("order_number"))
+ .build()));
expected.add(
new TruncateTableEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "orders")));
@@ -1005,9 +1023,7 @@ public class MySqlPipelineITCase extends
MySqlSourceTestBase {
.executeAndCollect();
Thread.sleep(5_000);
- List<Event> expectedEvents =
- new ArrayList<>(
-
getInventoryCreateAllTableEvents(inventoryDatabase.getDatabaseName()));
+ List<Event> expectedEvents = new ArrayList<>();
expectedEvents.add(
new DropTableEvent(
@@ -1192,46 +1208,6 @@ public class MySqlPipelineITCase extends
MySqlSourceTestBase {
.build());
}
- private List<CreateTableEvent> getInventoryCreateAllTableEvents(String
databaseName) {
- return Arrays.asList(
- new CreateTableEvent(
- TableId.tableId(databaseName, "products"),
- Schema.newBuilder()
- .physicalColumn("id",
DataTypes.INT().notNull())
- .physicalColumn("name",
DataTypes.VARCHAR(255).notNull(), "flink")
- .physicalColumn("description",
DataTypes.VARCHAR(512))
- .physicalColumn("weight", DataTypes.FLOAT())
- .primaryKey(Collections.singletonList("id"))
- .build()),
- new CreateTableEvent(
- TableId.tableId(databaseName, "customers"),
- Schema.newBuilder()
- .physicalColumn("id",
DataTypes.INT().notNull())
- .physicalColumn("first_name",
DataTypes.VARCHAR(255).notNull())
- .physicalColumn("last_name",
DataTypes.VARCHAR(255).notNull())
- .physicalColumn("email",
DataTypes.VARCHAR(255).notNull())
- .primaryKey(Collections.singletonList("id"))
- .build()),
- new CreateTableEvent(
- TableId.tableId(databaseName, "orders"),
- Schema.newBuilder()
- .physicalColumn("order_number",
DataTypes.INT().notNull())
- .physicalColumn("order_date",
DataTypes.DATE().notNull())
- .physicalColumn("purchaser",
DataTypes.INT().notNull())
- .physicalColumn("quantity",
DataTypes.INT().notNull())
- .physicalColumn("product_id",
DataTypes.INT().notNull())
-
.primaryKey(Collections.singletonList("order_number"))
- .build()),
- new CreateTableEvent(
- TableId.tableId(databaseName, "multi_max_table"),
- Schema.newBuilder()
- .physicalColumn("order_id",
DataTypes.VARCHAR(128).notNull())
- .physicalColumn("index",
DataTypes.INT().notNull())
- .physicalColumn("desc",
DataTypes.VARCHAR(512).notNull())
- .primaryKey(Arrays.asList("order_id", "index"))
- .build()));
- }
-
private List<Event> getSnapshotExpected(TableId tableId) {
RowType rowType =
RowType.of(
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index 9669b8437..85521d302 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -554,9 +554,18 @@ public class MysqlE2eITCase extends
PipelineTestEnvironment {
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar,
mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
+ try (Connection connection =
mysqlInventoryDatabase.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "UPDATE products SET description='18oz carpenter hammer'
WHERE id=106;");
+ }
validateResult(
- "CreateTableEvent{tableId=%s.customers, schema=columns={`id`
INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address`
VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}",
- "CreateTableEvent{tableId=%s.products, schema=columns={`id`
INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description`
VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c`
STRING}, primaryKeys=id, options=()}");
+ String.format(
+ "CreateTableEvent{tableId=%s.products,
schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL
'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING
'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[106,
hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer,
18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()));
}
private void validateResult(String... expectedEvents) throws Exception {
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
index 472ed85ee..5f9a394c2 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
@@ -199,7 +199,8 @@ public class DataSinkWriterOperator<CommT> extends
AbstractStreamOperator<Commit
private void handleFlushEvent(FlushEvent event) throws Exception {
copySinkWriter.flush(false);
- if (event.getSchemaChangeEventType() !=
SchemaChangeEventType.CREATE_TABLE) {
+ if (event.getSchemaChangeEventType() !=
SchemaChangeEventType.CREATE_TABLE
+ && event.getSchemaChangeEventType() !=
SchemaChangeEventType.DROP_TABLE) {
event.getTableIds().stream()
.filter(tableId -> !processedTableIds.contains(tableId))
.forEach(
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index 845fd4df1..29142a54f 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
@@ -52,10 +53,12 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -70,6 +73,10 @@ public class PreTransformOperator extends
AbstractStreamOperator<Event>
private final List<TransformRule> transformRules;
private transient List<PreTransformer> transforms;
private final Map<TableId, PreTransformChangeInfo>
preTransformChangeInfoMap;
+
+ /** All tables which have been sent {@link CreateTableEvent} to
downstream. */
+ private final Set<TableId> alreadySentCreateTableEvents;
+
private final List<Tuple2<Selectors, SchemaMetadataTransform>>
schemaMetadataTransformers;
private transient ListState<byte[]> state;
private final List<Tuple3<String, String, Map<String, String>>>
udfFunctions;
@@ -150,6 +157,7 @@ public class PreTransformOperator extends
AbstractStreamOperator<Event>
List<Tuple3<String, String, Map<String, String>>> udfFunctions,
boolean canContainDistributedTables) {
this.preTransformChangeInfoMap = new ConcurrentHashMap<>();
+ this.alreadySentCreateTableEvents = new HashSet<>();
this.preTransformProcessorMap = new ConcurrentHashMap<>();
this.schemaMetadataTransformers = new ArrayList<>();
this.chainingStrategy = ChainingStrategy.ALWAYS;
@@ -224,10 +232,6 @@ public class PreTransformOperator extends
AbstractStreamOperator<Event>
stateTableChangeInfo.getPreTransformedSchema());
// hasAsteriskMap needs to be recalculated after restoring
from a checkpoint.
cacheTransformRuleInfo(restoredCreateTableEvent);
-
- // Since PostTransformOperator doesn't preserve state,
pre-transformed schema
- // information needs to be passed by PreTransformOperator.
- output.collect(new StreamRecord<>(restoredCreateTableEvent));
}
}
}
@@ -272,23 +276,44 @@ public class PreTransformOperator extends
AbstractStreamOperator<Event>
Event event = element.getValue();
if (event instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) event;
- preTransformProcessorMap.remove(createTableEvent.tableId());
- output.collect(new
StreamRecord<>(cacheCreateTable(createTableEvent)));
+ // CreateTableEvent from Source Contains the latest schema,
+ // which may be different with the schema currently being
processed.
+ if
(!preTransformProcessorMap.containsKey(createTableEvent.tableId())) {
+ output.collect(new
StreamRecord<>(cacheCreateTable(createTableEvent)));
+ alreadySentCreateTableEvents.add(createTableEvent.tableId());
+ }
} else if (event instanceof DropTableEvent) {
preTransformProcessorMap.remove(((DropTableEvent)
event).tableId());
output.collect(new StreamRecord<>(event));
} else if (event instanceof TruncateTableEvent) {
output.collect(new StreamRecord<>(event));
} else if (event instanceof SchemaChangeEvent) {
+ lazilyEmitCreateTableEvent(event);
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
preTransformProcessorMap.remove(schemaChangeEvent.tableId());
cacheChangeSchema(schemaChangeEvent)
.ifPresent(e -> output.collect(new StreamRecord<>(e)));
} else if (event instanceof DataChangeEvent) {
+ lazilyEmitCreateTableEvent(event);
output.collect(new
StreamRecord<>(processDataChangeEvent(((DataChangeEvent) event))));
}
}
+ /** Emit related CreateTableEvent for the first time when meeting
ChangeEvent. */
+ private void lazilyEmitCreateTableEvent(Event event) {
+ ChangeEvent changeEvent = (ChangeEvent) event;
+ if (!alreadySentCreateTableEvents.contains(changeEvent.tableId())) {
+ PreTransformChangeInfo stateTableChangeInfo =
+ preTransformChangeInfoMap.get(changeEvent.tableId());
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ stateTableChangeInfo.getTableId(),
+ stateTableChangeInfo.getPreTransformedSchema());
+ output.collect(new StreamRecord<>(createTableEvent));
+ alreadySentCreateTableEvents.add(changeEvent.tableId());
+ }
+ }
+
private SchemaChangeEvent cacheCreateTable(CreateTableEvent event) {
TableId tableId = event.tableId();
Schema originalSchema = event.getSchema();