This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new eee0cc06f [FLINK-37386][cdc-runtime] Emit CreateTableEvent only when 
met the related SourceRecord
eee0cc06f is described below

commit eee0cc06f7bf842034d0e22bc45f8ffb811f7f78
Author: Kunni <[email protected]>
AuthorDate: Wed Mar 12 15:58:11 2025 +0800

    [FLINK-37386][cdc-runtime] Emit CreateTableEvent only when met the related 
SourceRecord
    
    This closes #3932.
---
 .../source/reader/MySqlPipelineRecordEmitter.java  | 47 +++++++--------
 .../mysql/source/MySqlPipelineITCase.java          | 70 +++++++---------------
 .../flink/cdc/pipeline/tests/MysqlE2eITCase.java   | 13 +++-
 .../operators/sink/DataSinkWriterOperator.java     |  3 +-
 .../operators/transform/PreTransformOperator.java  | 37 ++++++++++--
 5 files changed, 89 insertions(+), 81 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index 143efa120..de058df14 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -27,8 +27,6 @@ import 
org.apache.flink.cdc.connectors.mysql.schema.MySqlTableDefinition;
 import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 import 
org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
-import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
-import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
 import org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils;
 import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
@@ -48,13 +46,18 @@ import org.slf4j.LoggerFactory;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
 import static 
org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection;
+import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getTableId;
+import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord;
 import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isLowWatermarkEvent;
+import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
 import static 
org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables;
 
 /** The {@link RecordEmitter} implementation for pipeline mysql connector. */
@@ -68,9 +71,7 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
     // Used when startup mode is initial
     private Set<TableId> alreadySendCreateTableTables;
 
-    // Used when startup mode is not initial
-    private boolean alreadySendCreateTableForBinlogSplit = false;
-    private List<CreateTableEvent> createTableEventCache;
+    private Map<TableId, CreateTableEvent> createTableEventCache;
 
     public MySqlPipelineRecordEmitter(
             DebeziumDeserializationSchema<Event> debeziumDeserializationSchema,
@@ -99,23 +100,17 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
                     alreadySendCreateTableTables.add(tableId);
                 }
             }
-        } else if (splitState.isBinlogSplitState() && 
!alreadySendCreateTableForBinlogSplit) {
-            alreadySendCreateTableForBinlogSplit = true;
-            if 
(sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
-                // In Snapshot -> Binlog transition of INITIAL startup mode, 
ensure all table
-                // schemas have been sent to downstream. We use previously 
cached schema instead of
-                // re-request latest schema because there might be some 
pending schema change events
-                // in the queue, and that may accidentally emit evolved schema 
before corresponding
-                // schema change events.
-                createTableEventCache.stream()
-                        .filter(
-                                event ->
-                                        !alreadySendCreateTableTables.contains(
-                                                
MySqlSchemaUtils.toDbzTableId(event.tableId())))
-                        .forEach(output::collect);
-            } else {
-                // In Binlog only mode, we simply emit all schemas at once.
-                createTableEventCache.forEach(output::collect);
+        } else {
+            if (isDataChangeRecord(element) || isSchemaChangeEvent(element)) {
+                TableId tableId = getTableId(element);
+                if (!alreadySendCreateTableTables.contains(tableId)) {
+                    CreateTableEvent createTableEvent = 
createTableEventCache.get(tableId);
+                    // New created table in binlog reading phase.
+                    if (createTableEvent != null) {
+                        output.collect(createTableEvent);
+                    }
+                    alreadySendCreateTableTables.add(tableId);
+                }
             }
         }
         super.processElement(element, output, splitState);
@@ -246,15 +241,17 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
         return mySqlAntlrDdlParser;
     }
 
-    private List<CreateTableEvent> generateCreateTableEvent(MySqlSourceConfig 
sourceConfig) {
+    private Map<TableId, CreateTableEvent> generateCreateTableEvent(
+            MySqlSourceConfig sourceConfig) {
         try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
-            List<CreateTableEvent> createTableEventCache = new ArrayList<>();
+            Map<TableId, CreateTableEvent> createTableEventCache = new 
HashMap<>();
             List<TableId> capturedTableIds =
                     listTables(
                             jdbc, sourceConfig.getDatabaseFilter(), 
sourceConfig.getTableFilter());
             for (TableId tableId : capturedTableIds) {
                 Schema schema = getSchema(jdbc, tableId);
-                createTableEventCache.add(
+                createTableEventCache.put(
+                        tableId,
                         new CreateTableEvent(
                                 
org.apache.flink.cdc.common.event.TableId.tableId(
                                         tableId.catalog(), tableId.table()),
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 7ef6cbeb1..a5239c137 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -634,9 +634,7 @@ public class MySqlPipelineITCase extends 
MySqlSourceTestBase {
                         .executeAndCollect();
         Thread.sleep(5_000);
 
-        List<Event> expected =
-                new ArrayList<>(
-                        
getInventoryCreateAllTableEvents(inventoryDatabase.getDatabaseName()));
+        List<Event> expected = new ArrayList<>();
 
         try (Connection connection = inventoryDatabase.getJdbcConnection();
                 Statement statement = connection.createStatement()) {
@@ -645,6 +643,16 @@ public class MySqlPipelineITCase extends 
MySqlSourceTestBase {
                     String.format(
                             "ALTER TABLE `%s`.`customers` ADD COLUMN `newcol1` 
INT NULL;",
                             inventoryDatabase.getDatabaseName()));
+            expected.add(
+                    new CreateTableEvent(
+                            
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+                            Schema.newBuilder()
+                                    .physicalColumn("id", 
DataTypes.INT().notNull())
+                                    .physicalColumn("first_name", 
DataTypes.VARCHAR(255).notNull())
+                                    .physicalColumn("last_name", 
DataTypes.VARCHAR(255).notNull())
+                                    .physicalColumn("email", 
DataTypes.VARCHAR(255).notNull())
+                                    
.primaryKey(Collections.singletonList("id"))
+                                    .build()));
             expected.add(
                     new AddColumnEvent(
                             
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
@@ -737,7 +745,17 @@ public class MySqlPipelineITCase extends 
MySqlSourceTestBase {
             statement.execute(
                     String.format(
                             "TRUNCATE TABLE `%s`.`orders`;", 
inventoryDatabase.getDatabaseName()));
-
+            expected.add(
+                    new CreateTableEvent(
+                            
TableId.tableId(inventoryDatabase.getDatabaseName(), "orders"),
+                            Schema.newBuilder()
+                                    .physicalColumn("order_number", 
DataTypes.INT().notNull())
+                                    .physicalColumn("order_date", 
DataTypes.DATE().notNull())
+                                    .physicalColumn("purchaser", 
DataTypes.INT().notNull())
+                                    .physicalColumn("quantity", 
DataTypes.INT().notNull())
+                                    .physicalColumn("product_id", 
DataTypes.INT().notNull())
+                                    
.primaryKey(Collections.singletonList("order_number"))
+                                    .build()));
             expected.add(
                     new TruncateTableEvent(
                             
TableId.tableId(inventoryDatabase.getDatabaseName(), "orders")));
@@ -1005,9 +1023,7 @@ public class MySqlPipelineITCase extends 
MySqlSourceTestBase {
                         .executeAndCollect();
         Thread.sleep(5_000);
 
-        List<Event> expectedEvents =
-                new ArrayList<>(
-                        
getInventoryCreateAllTableEvents(inventoryDatabase.getDatabaseName()));
+        List<Event> expectedEvents = new ArrayList<>();
 
         expectedEvents.add(
                 new DropTableEvent(
@@ -1192,46 +1208,6 @@ public class MySqlPipelineITCase extends 
MySqlSourceTestBase {
                         .build());
     }
 
-    private List<CreateTableEvent> getInventoryCreateAllTableEvents(String 
databaseName) {
-        return Arrays.asList(
-                new CreateTableEvent(
-                        TableId.tableId(databaseName, "products"),
-                        Schema.newBuilder()
-                                .physicalColumn("id", 
DataTypes.INT().notNull())
-                                .physicalColumn("name", 
DataTypes.VARCHAR(255).notNull(), "flink")
-                                .physicalColumn("description", 
DataTypes.VARCHAR(512))
-                                .physicalColumn("weight", DataTypes.FLOAT())
-                                .primaryKey(Collections.singletonList("id"))
-                                .build()),
-                new CreateTableEvent(
-                        TableId.tableId(databaseName, "customers"),
-                        Schema.newBuilder()
-                                .physicalColumn("id", 
DataTypes.INT().notNull())
-                                .physicalColumn("first_name", 
DataTypes.VARCHAR(255).notNull())
-                                .physicalColumn("last_name", 
DataTypes.VARCHAR(255).notNull())
-                                .physicalColumn("email", 
DataTypes.VARCHAR(255).notNull())
-                                .primaryKey(Collections.singletonList("id"))
-                                .build()),
-                new CreateTableEvent(
-                        TableId.tableId(databaseName, "orders"),
-                        Schema.newBuilder()
-                                .physicalColumn("order_number", 
DataTypes.INT().notNull())
-                                .physicalColumn("order_date", 
DataTypes.DATE().notNull())
-                                .physicalColumn("purchaser", 
DataTypes.INT().notNull())
-                                .physicalColumn("quantity", 
DataTypes.INT().notNull())
-                                .physicalColumn("product_id", 
DataTypes.INT().notNull())
-                                
.primaryKey(Collections.singletonList("order_number"))
-                                .build()),
-                new CreateTableEvent(
-                        TableId.tableId(databaseName, "multi_max_table"),
-                        Schema.newBuilder()
-                                .physicalColumn("order_id", 
DataTypes.VARCHAR(128).notNull())
-                                .physicalColumn("index", 
DataTypes.INT().notNull())
-                                .physicalColumn("desc", 
DataTypes.VARCHAR(512).notNull())
-                                .primaryKey(Arrays.asList("order_id", "index"))
-                                .build()));
-    }
-
     private List<Event> getSnapshotExpected(TableId tableId) {
         RowType rowType =
                 RowType.of(
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index 9669b8437..85521d302 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -554,9 +554,18 @@ public class MysqlE2eITCase extends 
PipelineTestEnvironment {
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
         submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, 
mysqlDriverJar);
         waitUntilJobRunning(Duration.ofSeconds(30));
+        try (Connection connection = 
mysqlInventoryDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    "UPDATE products SET description='18oz carpenter hammer' 
WHERE id=106;");
+        }
         validateResult(
-                "CreateTableEvent{tableId=%s.customers, schema=columns={`id` 
INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` 
VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}",
-                "CreateTableEvent{tableId=%s.products, schema=columns={`id` 
INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` 
VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` 
STRING}, primaryKeys=id, options=()}");
+                String.format(
+                        "CreateTableEvent{tableId=%s.products, 
schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 
'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 
'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}",
+                        mysqlInventoryDatabase.getDatabaseName()),
+                String.format(
+                        "DataChangeEvent{tableId=%s.products, before=[106, 
hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 
18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}",
+                        mysqlInventoryDatabase.getDatabaseName()));
     }
 
     private void validateResult(String... expectedEvents) throws Exception {
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
index 472ed85ee..5f9a394c2 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
@@ -199,7 +199,8 @@ public class DataSinkWriterOperator<CommT> extends 
AbstractStreamOperator<Commit
 
     private void handleFlushEvent(FlushEvent event) throws Exception {
         copySinkWriter.flush(false);
-        if (event.getSchemaChangeEventType() != 
SchemaChangeEventType.CREATE_TABLE) {
+        if (event.getSchemaChangeEventType() != 
SchemaChangeEventType.CREATE_TABLE
+                && event.getSchemaChangeEventType() != 
SchemaChangeEventType.DROP_TABLE) {
             event.getTableIds().stream()
                     .filter(tableId -> !processedTableIds.contains(tableId))
                     .forEach(
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index 845fd4df1..29142a54f 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.event.ChangeEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.DropTableEvent;
@@ -52,10 +53,12 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
@@ -70,6 +73,10 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
     private final List<TransformRule> transformRules;
     private transient List<PreTransformer> transforms;
     private final Map<TableId, PreTransformChangeInfo> 
preTransformChangeInfoMap;
+
+    /** All tables which have been sent {@link CreateTableEvent} to 
downstream. */
+    private final Set<TableId> alreadySentCreateTableEvents;
+
     private final List<Tuple2<Selectors, SchemaMetadataTransform>> 
schemaMetadataTransformers;
     private transient ListState<byte[]> state;
     private final List<Tuple3<String, String, Map<String, String>>> 
udfFunctions;
@@ -150,6 +157,7 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
             List<Tuple3<String, String, Map<String, String>>> udfFunctions,
             boolean canContainDistributedTables) {
         this.preTransformChangeInfoMap = new ConcurrentHashMap<>();
+        this.alreadySentCreateTableEvents = new HashSet<>();
         this.preTransformProcessorMap = new ConcurrentHashMap<>();
         this.schemaMetadataTransformers = new ArrayList<>();
         this.chainingStrategy = ChainingStrategy.ALWAYS;
@@ -224,10 +232,6 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
                                 
stateTableChangeInfo.getPreTransformedSchema());
                 // hasAsteriskMap needs to be recalculated after restoring 
from a checkpoint.
                 cacheTransformRuleInfo(restoredCreateTableEvent);
-
-                // Since PostTransformOperator doesn't preserve state, 
pre-transformed schema
-                // information needs to be passed by PreTransformOperator.
-                output.collect(new StreamRecord<>(restoredCreateTableEvent));
             }
         }
     }
@@ -272,23 +276,44 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
         Event event = element.getValue();
         if (event instanceof CreateTableEvent) {
             CreateTableEvent createTableEvent = (CreateTableEvent) event;
-            preTransformProcessorMap.remove(createTableEvent.tableId());
-            output.collect(new 
StreamRecord<>(cacheCreateTable(createTableEvent)));
+            // CreateTableEvent from Source Contains the latest schema,
+            // which may be different with the schema currently being 
processed.
+            if 
(!preTransformProcessorMap.containsKey(createTableEvent.tableId())) {
+                output.collect(new 
StreamRecord<>(cacheCreateTable(createTableEvent)));
+                alreadySentCreateTableEvents.add(createTableEvent.tableId());
+            }
         } else if (event instanceof DropTableEvent) {
             preTransformProcessorMap.remove(((DropTableEvent) 
event).tableId());
             output.collect(new StreamRecord<>(event));
         } else if (event instanceof TruncateTableEvent) {
             output.collect(new StreamRecord<>(event));
         } else if (event instanceof SchemaChangeEvent) {
+            lazilyEmitCreateTableEvent(event);
             SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
             preTransformProcessorMap.remove(schemaChangeEvent.tableId());
             cacheChangeSchema(schemaChangeEvent)
                     .ifPresent(e -> output.collect(new StreamRecord<>(e)));
         } else if (event instanceof DataChangeEvent) {
+            lazilyEmitCreateTableEvent(event);
             output.collect(new 
StreamRecord<>(processDataChangeEvent(((DataChangeEvent) event))));
         }
     }
 
+    /** Emit related CreateTableEvent for the first time when meeting 
ChangeEvent. */
+    private void lazilyEmitCreateTableEvent(Event event) {
+        ChangeEvent changeEvent = (ChangeEvent) event;
+        if (!alreadySentCreateTableEvents.contains(changeEvent.tableId())) {
+            PreTransformChangeInfo stateTableChangeInfo =
+                    preTransformChangeInfoMap.get(changeEvent.tableId());
+            CreateTableEvent createTableEvent =
+                    new CreateTableEvent(
+                            stateTableChangeInfo.getTableId(),
+                            stateTableChangeInfo.getPreTransformedSchema());
+            output.collect(new StreamRecord<>(createTableEvent));
+            alreadySentCreateTableEvents.add(changeEvent.tableId());
+        }
+    }
+
     private SchemaChangeEvent cacheCreateTable(CreateTableEvent event) {
         TableId tableId = event.tableId();
         Schema originalSchema = event.getSchema();

Reply via email to