This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new d0fe46cb6 [FLINK-38045] Build createTableEventCache using TableSchemas 
from split and Make transform operator stateless (#4056)
d0fe46cb6 is described below

commit d0fe46cb6f9aa61fda227f1968a8f8f008ecca45
Author: proletarians <75650402+proletari...@users.noreply.github.com>
AuthorDate: Fri Aug 15 10:58:37 2025 +0800

    [FLINK-38045] Build createTableEventCache using TableSchemas from split and 
Make transform operator stateless (#4056)
    
    Co-authored-by: wuzexian <shanqing....@alibaba-inc.com>
---
 .../cdc/composer/flink/FlinkPipelineComposer.java  |  10 +-
 .../flink/translator/TransformTranslator.java      |  22 +---
 .../source/reader/MySqlPipelineRecordEmitter.java  |  38 +++++-
 .../cdc/connectors/mysql/utils/MySqlTypeUtils.java |   3 +-
 .../mysql/source/MySqlPipelineITCase.java          | 128 ++++++++++++++++++++-
 .../cdc/connectors/values/ValuesDatabase.java      |  11 +-
 .../cdc/connectors/values/sink/ValuesDataSink.java |   2 +-
 .../values/sink/ValuesDataSinkOptions.java         |   2 +-
 .../event/DebeziumEventDeserializationSchema.java  |  22 ++++
 .../cdc/connectors/mysql/source/MySqlSource.java   |   4 +-
 .../mysql/source/reader/MySqlRecordEmitter.java    |   3 +
 .../mysql/source/reader/MySqlSourceReader.java     |  12 +-
 .../mysql/source/reader/MySqlSourceReaderTest.java |  12 +-
 .../tests/migration/YamlJobMigrationITCase.java    |  89 ++++++++++++++
 .../operators/transform/PreTransformOperator.java  |  94 ++-------------
 .../transform/PreTransformOperatorBuilder.java     |   9 +-
 16 files changed, 331 insertions(+), 130 deletions(-)

diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index f77fc4bbf..6de9045af 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.composer.flink;
 
 import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.pipeline.PipelineOptions;
@@ -186,9 +187,7 @@ public class FlinkPipelineComposer implements 
PipelineComposer {
                         pipelineDef.getTransforms(),
                         pipelineDef.getUdfs(),
                         pipelineDef.getModels(),
-                        dataSource.supportedMetadataColumns(),
-                        !isParallelMetadataSource && !isBatchMode,
-                        operatorUidGenerator);
+                        dataSource.supportedMetadataColumns());
 
         // PreTransform ---> PostTransform
         stream =
@@ -290,4 +289,9 @@ public class FlinkPipelineComposer implements 
PipelineComposer {
         }
         return Optional.of(container);
     }
+
+    @VisibleForTesting
+    public StreamExecutionEnvironment getEnv() {
+        return env;
+    }
 }
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
index 33bff17ee..fb8da7cea 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
@@ -50,30 +50,21 @@ public class TransformTranslator {
             List<TransformDef> transforms,
             List<UdfDef> udfFunctions,
             List<ModelDef> models,
-            SupportedMetadataColumn[] supportedMetadataColumns,
-            boolean shouldStoreSchemasInState,
-            OperatorUidGenerator operatorUidGenerator) {
+            SupportedMetadataColumn[] supportedMetadataColumns) {
         if (transforms.isEmpty()) {
             return input;
         }
         return input.transform(
-                        "Transform:Schema",
-                        new EventTypeInfo(),
-                        generatePreTransform(
-                                transforms,
-                                udfFunctions,
-                                models,
-                                supportedMetadataColumns,
-                                shouldStoreSchemasInState))
-                .uid(operatorUidGenerator.generateUid("pre-transform"));
+                "Transform:Schema",
+                new EventTypeInfo(),
+                generatePreTransform(transforms, udfFunctions, models, 
supportedMetadataColumns));
     }
 
     private PreTransformOperator generatePreTransform(
             List<TransformDef> transforms,
             List<UdfDef> udfFunctions,
             List<ModelDef> models,
-            SupportedMetadataColumn[] supportedMetadataColumns,
-            boolean shouldStoreSchemasInState) {
+            SupportedMetadataColumn[] supportedMetadataColumns) {
 
         PreTransformOperatorBuilder preTransformFunctionBuilder = 
PreTransformOperator.newBuilder();
         for (TransformDef transform : transforms) {
@@ -94,8 +85,7 @@ public class TransformTranslator {
                                 .map(this::udfDefToUDFTuple)
                                 .collect(Collectors.toList()))
                 .addUdfFunctions(
-                        
models.stream().map(this::modelToUDFTuple).collect(Collectors.toList()))
-                .shouldStoreSchemasInState(shouldStoreSchemasInState);
+                        
models.stream().map(this::modelToUDFTuple).collect(Collectors.toList()));
 
         return preTransformFunctionBuilder.build();
     }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index 649afb55c..3fa9cbb78 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -26,11 +26,14 @@ import 
org.apache.flink.cdc.connectors.mysql.schema.MySqlFieldDefinition;
 import org.apache.flink.cdc.connectors.mysql.schema.MySqlTableDefinition;
 import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 import 
org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
+import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
+import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
 import org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils;
 import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
 import org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils;
 import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
 
 import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
@@ -40,6 +43,7 @@ import 
io.debezium.relational.RelationalDatabaseConnectorConfig;
 import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import io.debezium.relational.Tables;
+import io.debezium.relational.history.TableChanges;
 import io.debezium.text.ParsingException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -77,7 +81,9 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
     private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
     private boolean isBounded = false;
 
-    private Map<TableId, CreateTableEvent> createTableEventCache;
+    private final DebeziumDeserializationSchema<Event> 
debeziumDeserializationSchema;
+
+    private final Map<TableId, CreateTableEvent> createTableEventCache;
 
     public MySqlPipelineRecordEmitter(
             DebeziumDeserializationSchema<Event> debeziumDeserializationSchema,
@@ -87,12 +93,32 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
                 debeziumDeserializationSchema,
                 sourceReaderMetrics,
                 sourceConfig.isIncludeSchemaChanges());
+        this.debeziumDeserializationSchema = debeziumDeserializationSchema;
         this.sourceConfig = sourceConfig;
         this.alreadySendCreateTableTables = new HashSet<>();
-        this.createTableEventCache = generateCreateTableEvent(sourceConfig);
+        this.createTableEventCache =
+                ((DebeziumEventDeserializationSchema) 
debeziumDeserializationSchema)
+                        .getCreateTableEventCache();
         this.isBounded = 
StartupOptions.snapshot().equals(sourceConfig.getStartupOptions());
     }
 
+    @Override
+    public void applySplit(MySqlSplit split) {
+        if ((isBounded) && createTableEventCache.isEmpty() && split instanceof 
MySqlSnapshotSplit) {
+            // TableSchemas in MySqlSnapshotSplit only contains one table.
+            
createTableEventCache.putAll(generateCreateTableEvent(sourceConfig));
+        } else {
+            for (TableChanges.TableChange tableChange : 
split.getTableSchemas().values()) {
+                CreateTableEvent createTableEvent =
+                        new CreateTableEvent(
+                                toCdcTableId(tableChange.getId()),
+                                buildSchemaFromTable(tableChange.getTable()));
+                ((DebeziumEventDeserializationSchema) 
debeziumDeserializationSchema)
+                        .applyChangeEvent(createTableEvent);
+            }
+        }
+    }
+
     @Override
     protected void processElement(
             SourceRecord element, SourceOutput<Event> output, MySqlSplitState 
splitState)
@@ -130,6 +156,11 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
         super.processElement(element, output, splitState);
     }
 
+    private org.apache.flink.cdc.common.event.TableId toCdcTableId(TableId 
dbzTableId) {
+        return org.apache.flink.cdc.common.event.TableId.tableId(
+                dbzTableId.catalog(), dbzTableId.table());
+    }
+
     private void sendCreateTableEvent(
             JdbcConnection jdbc, TableId tableId, SourceOutput<Event> output) {
         Schema schema = getSchema(jdbc, tableId);
@@ -204,7 +235,10 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
 
     private Schema parseDDL(String ddlStatement, TableId tableId) {
         Table table = parseDdl(ddlStatement, tableId);
+        return buildSchemaFromTable(table);
+    }
 
+    private Schema buildSchemaFromTable(Table table) {
         List<Column> columns = table.columns();
         Schema.Builder tableBuilder = Schema.newBuilder();
         for (int i = 0; i < columns.size(); i++) {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java
index 0c59ec976..ceb5cd825 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java
@@ -129,7 +129,8 @@ public class MySqlTypeUtils {
         String typeName = column.typeName();
         switch (typeName) {
             case BIT:
-                return column.length() == 1
+                // column.length() might be -1
+                return column.length() <= 1
                         ? DataTypes.BOOLEAN()
                         : DataTypes.BINARY((column.length() + 7) / 8);
             case BOOL:
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index df5e930ad..b6760ce26 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -60,6 +60,8 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.testcontainers.lifecycle.Startables;
 
 import java.sql.Connection;
@@ -399,7 +401,131 @@ class MySqlPipelineITCase extends MySqlSourceTestBase {
     }
 
     @Test
-    void testExcludeTables() throws Exception {
+    void testLatestOffsetStartupMode() throws Exception {
+        inventoryDatabase.createAndInitialize();
+        MySqlSourceConfigFactory configFactory =
+                new MySqlSourceConfigFactory()
+                        .hostname(MYSQL8_CONTAINER.getHost())
+                        .port(MYSQL8_CONTAINER.getDatabasePort())
+                        .username(TEST_USER)
+                        .password(TEST_PASSWORD)
+                        .databaseList(inventoryDatabase.getDatabaseName())
+                        .tableList(inventoryDatabase.getDatabaseName() + 
"\\.products")
+                        .startupOptions(StartupOptions.latest())
+                        .serverId(getServerId(env.getParallelism()))
+                        .serverTimeZone("UTC")
+                        
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
+
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider) new 
MySqlDataSource(configFactory).getEventSourceProvider();
+        CloseableIterator<Event> events =
+                env.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                MySqlDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+        Thread.sleep(10_000);
+        TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), 
"products");
+
+        List<Event> expectedBinlog = new ArrayList<>();
+        try (Connection connection = inventoryDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            expectedBinlog.addAll(executeAlterAndProvideExpected(tableId, 
statement));
+
+            RowType rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(),
+                                DataTypes.VARCHAR(255).notNull(),
+                                DataTypes.FLOAT(),
+                                DataTypes.VARCHAR(45),
+                                DataTypes.VARCHAR(55)
+                            },
+                            new String[] {"id", "name", "weight", "col1", 
"col2"});
+            BinaryRecordDataGenerator generator = new 
BinaryRecordDataGenerator(rowType);
+            // insert more data
+            statement.execute(
+                    String.format(
+                            "INSERT INTO `%s`.`products` VALUES 
(default,'scooter',5.5,'c-10','c-20');",
+                            inventoryDatabase.getDatabaseName())); // 110
+            expectedBinlog.add(
+                    DataChangeEvent.insertEvent(
+                            tableId,
+                            generator.generate(
+                                    new Object[] {
+                                        110,
+                                        BinaryStringData.fromString("scooter"),
+                                        5.5f,
+                                        BinaryStringData.fromString("c-10"),
+                                        BinaryStringData.fromString("c-20")
+                                    })));
+            statement.execute(
+                    String.format(
+                            "INSERT INTO `%s`.`products` VALUES 
(default,'football',6.6,'c-11','c-21');",
+                            inventoryDatabase.getDatabaseName())); // 111
+            expectedBinlog.add(
+                    DataChangeEvent.insertEvent(
+                            tableId,
+                            generator.generate(
+                                    new Object[] {
+                                        111,
+                                        
BinaryStringData.fromString("football"),
+                                        6.6f,
+                                        BinaryStringData.fromString("c-11"),
+                                        BinaryStringData.fromString("c-21")
+                                    })));
+            statement.execute(
+                    String.format(
+                            "UPDATE `%s`.`products` SET `col1`='c-12', 
`col2`='c-22' WHERE id=110;",
+                            inventoryDatabase.getDatabaseName()));
+            expectedBinlog.add(
+                    DataChangeEvent.updateEvent(
+                            tableId,
+                            generator.generate(
+                                    new Object[] {
+                                        110,
+                                        BinaryStringData.fromString("scooter"),
+                                        5.5f,
+                                        BinaryStringData.fromString("c-10"),
+                                        BinaryStringData.fromString("c-20")
+                                    }),
+                            generator.generate(
+                                    new Object[] {
+                                        110,
+                                        BinaryStringData.fromString("scooter"),
+                                        5.5f,
+                                        BinaryStringData.fromString("c-12"),
+                                        BinaryStringData.fromString("c-22")
+                                    })));
+            statement.execute(
+                    String.format(
+                            "DELETE FROM `%s`.`products` WHERE `id` = 111;",
+                            inventoryDatabase.getDatabaseName()));
+            expectedBinlog.add(
+                    DataChangeEvent.deleteEvent(
+                            tableId,
+                            generator.generate(
+                                    new Object[] {
+                                        111,
+                                        
BinaryStringData.fromString("football"),
+                                        6.6f,
+                                        BinaryStringData.fromString("c-11"),
+                                        BinaryStringData.fromString("c-21")
+                                    })));
+        }
+        // In this configuration, several subtasks might emit their 
corresponding CreateTableEvent
+        // to downstream. Since it is not possible to predict how many 
CreateTableEvents should we
+        // expect, we simply filter them out from expected sets, and assert 
there's at least one.
+
+        Event createTableEvent = getProductsCreateTableEvent(tableId);
+        List<Event> actual = fetchResultsExcept(events, expectedBinlog.size(), 
createTableEvent);
+        assertThat(actual).isEqualTo(expectedBinlog);
+    }
+
+    @ParameterizedTest(name = "batchEmit: {0}")
+    @ValueSource(booleans = {true, false})
+    void testExcludeTables(boolean inBatch) throws Exception {
         inventoryDatabase.createAndInitialize();
         String databaseName = inventoryDatabase.getDatabaseName();
         MySqlSourceConfigFactory configFactory =
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
index 0004961f3..01c8c7bde 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
@@ -81,8 +81,15 @@ public class ValuesDatabase {
 
         private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;
 
+        private final boolean materializedInMemory;
+
         public ValuesMetadataApplier() {
+            this(true);
+        }
+
+        public ValuesMetadataApplier(boolean materializedInMemory) {
             this.enabledSchemaEvolutionTypes = 
getSupportedSchemaEvolutionTypes();
+            this.materializedInMemory = materializedInMemory;
         }
 
         @Override
@@ -109,7 +116,9 @@ public class ValuesDatabase {
 
         @Override
         public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
-            applySchemaChangeEvent(schemaChangeEvent);
+            if (materializedInMemory) {
+                applySchemaChangeEvent(schemaChangeEvent);
+            }
         }
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSink.java
index 7d4ee3dad..93871f2a4 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSink.java
@@ -80,7 +80,7 @@ public class ValuesDataSink implements DataSink, Serializable 
{
         if (errorOnSchemaChange) {
             return new ValuesDatabase.ErrorOnChangeMetadataApplier();
         } else {
-            return new ValuesDatabase.ValuesMetadataApplier();
+            return new 
ValuesDatabase.ValuesMetadataApplier(materializedInMemory);
         }
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java
index 4830d102b..94ced1e3e 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java
@@ -28,7 +28,7 @@ public class ValuesDataSinkOptions {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "True if the DataChangeEvent need to be 
materialized in memory.");
+                            "True if the SchemaChangeEvent and DataChangeEvent 
need to be materialized in memory.");
 
     public static final ConfigOption<Boolean> PRINT_ENABLED =
             ConfigOptions.key("print.enabled")
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
index ad7ec458e..338f03d23 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
@@ -24,6 +24,8 @@ import 
org.apache.flink.cdc.common.data.LocalZonedTimestampData;
 import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.ChangeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.TableId;
@@ -59,6 +61,7 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -82,10 +85,13 @@ public abstract class DebeziumEventDeserializationSchema 
extends SourceRecordEve
     /** Changelog Mode to use for encoding changes in Flink internal data 
structure. */
     protected final DebeziumChangelogMode changelogMode;
 
+    private final Map<io.debezium.relational.TableId, CreateTableEvent> 
createTableEventCache;
+
     public DebeziumEventDeserializationSchema(
             SchemaDataTypeInference schemaDataTypeInference, 
DebeziumChangelogMode changelogMode) {
         this.schemaDataTypeInference = schemaDataTypeInference;
         this.changelogMode = changelogMode;
+        this.createTableEventCache = new HashMap<>();
     }
 
     @Override
@@ -435,4 +441,20 @@ public abstract class DebeziumEventDeserializationSchema 
extends SourceRecordEve
             }
         };
     }
+
+    public Map<io.debezium.relational.TableId, CreateTableEvent> 
getCreateTableEventCache() {
+        return createTableEventCache;
+    }
+
+    public void applyChangeEvent(ChangeEvent changeEvent) {
+        org.apache.flink.cdc.common.event.TableId flinkTableId = 
changeEvent.tableId();
+
+        io.debezium.relational.TableId debeziumTableId =
+                new io.debezium.relational.TableId(
+                        flinkTableId.getNamespace(),
+                        flinkTableId.getSchemaName(),
+                        flinkTableId.getTableName());
+
+        createTableEventCache.put(debeziumTableId, (CreateTableEvent) 
changeEvent);
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
index d68138c07..cb06cc45a 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
@@ -48,7 +48,6 @@ import 
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReaderCont
 import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer;
-import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
 import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
 import 
org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
 import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
@@ -289,7 +288,6 @@ public class MySqlSource<T>
     @FunctionalInterface
     interface RecordEmitterSupplier<T> extends Serializable {
 
-        RecordEmitter<SourceRecords, T, MySqlSplitState> get(
-                MySqlSourceReaderMetrics metrics, MySqlSourceConfig 
sourceConfig);
+        MySqlRecordEmitter<T> get(MySqlSourceReaderMetrics metrics, 
MySqlSourceConfig sourceConfig);
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
index e3c504113..449e7f608 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.mysql.source.reader;
 import org.apache.flink.api.connector.source.SourceOutput;
 import 
org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
 import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
+import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
 import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
 import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
@@ -120,6 +121,8 @@ public class MySqlRecordEmitter<T> implements 
RecordEmitter<SourceRecords, T, My
         debeziumDeserializationSchema.deserialize(element, outputCollector);
     }
 
+    public void applySplit(MySqlSplit split) {}
+
     private void reportMetrics(SourceRecord element) {
 
         Long messageTimestamp = RecordUtils.getMessageTimestamp(element);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
index d9bfa18e7..e1d7e7dce 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
@@ -43,7 +43,6 @@ import 
org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
 import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils;
 import org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.base.source.reader.RecordEmitter;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
@@ -82,13 +81,14 @@ public class MySqlSourceReader<T>
     private final Map<String, MySqlBinlogSplit> uncompletedBinlogSplits;
     private final int subtaskId;
     private final MySqlSourceReaderContext mySqlSourceReaderContext;
-    private final MySqlPartition partition;
     private volatile MySqlBinlogSplit suspendedBinlogSplit;
+    private final MySqlRecordEmitter<T> recordEmitter;
+    private final MySqlPartition partition;
 
     public MySqlSourceReader(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> 
elementQueue,
             Supplier<MySqlSplitReader> splitReaderSupplier,
-            RecordEmitter<SourceRecords, T, MySqlSplitState> recordEmitter,
+            MySqlRecordEmitter<T> recordEmitter,
             Configuration config,
             MySqlSourceReaderContext context,
             MySqlSourceConfig sourceConfig) {
@@ -98,6 +98,7 @@ public class MySqlSourceReader<T>
                 recordEmitter,
                 config,
                 context.getSourceReaderContext());
+        this.recordEmitter = recordEmitter;
         this.sourceConfig = sourceConfig;
         this.finishedUnackedSplits = new HashMap<>();
         this.uncompletedBinlogSplits = new HashMap<>();
@@ -117,6 +118,7 @@ public class MySqlSourceReader<T>
 
     @Override
     protected MySqlSplitState initializedState(MySqlSplit split) {
+        recordEmitter.applySplit(split);
         if (split.isSnapshotSplit()) {
             return new MySqlSnapshotSplitState(split.asSnapshotSplit());
         } else {
@@ -377,7 +379,7 @@ public class MySqlSourceReader<T>
             FinishedSnapshotSplitsReportEvent reportEvent =
                     new FinishedSnapshotSplitsReportEvent(finishedOffsets);
             context.sendSourceEventToCoordinator(reportEvent);
-            LOG.debug(
+            LOG.info(
                     "Source reader {} reports offsets of finished snapshot 
splits {}.",
                     subtaskId,
                     finishedOffsets);
@@ -436,7 +438,7 @@ public class MySqlSourceReader<T>
                         binlogSplit.splitId(),
                         MySqlBinlogSplit.appendFinishedSplitInfos(
                                 binlogSplit, newAddedMetadataGroup));
-                LOG.info(
+                LOG.debug(
                         "Source reader {} fills metadata of group {} to binlog 
split",
                         subtaskId,
                         newAddedMetadataGroup.size());
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
index 3f0cd2a2e..7b8dfdcdb 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
@@ -563,16 +563,16 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
         final Method metricGroupMethod = 
readerContext.getClass().getMethod("metricGroup");
         metricGroupMethod.setAccessible(true);
         final MetricGroup metricGroup = (MetricGroup) 
metricGroupMethod.invoke(readerContext);
-        final RecordEmitter<SourceRecords, SourceRecord, MySqlSplitState> 
recordEmitter =
+        final MySqlRecordEmitter<SourceRecord> recordEmitter =
                 limit > 0
                         ? new MysqlLimitedRecordEmitter(
                                 new ForwardDeserializeSchema(),
-                                new MySqlSourceReaderMetrics(metricGroup),
+                                new 
MySqlSourceReaderMetrics(readerContext.metricGroup()),
                                 configuration.isIncludeSchemaChanges(),
                                 limit)
                         : new MySqlRecordEmitter<>(
                                 new ForwardDeserializeSchema(),
-                                new MySqlSourceReaderMetrics(metricGroup),
+                                new 
MySqlSourceReaderMetrics(readerContext.metricGroup()),
                                 configuration.isIncludeSchemaChanges());
         final MySqlSourceReaderContext mySqlSourceReaderContext =
                 new MySqlSourceReaderContext(readerContext);
@@ -723,8 +723,7 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
      * A implementation of {@link RecordEmitter} which only emit records in 
given limit number, this
      * class is used for test purpose.
      */
-    private static class MysqlLimitedRecordEmitter
-            implements RecordEmitter<SourceRecords, SourceRecord, 
MySqlSplitState> {
+    private static class MysqlLimitedRecordEmitter extends 
MySqlRecordEmitter<SourceRecord> {
 
         private static final Logger LOG = 
LoggerFactory.getLogger(MySqlRecordEmitter.class);
         private static final FlinkJsonTableChangeSerializer 
TABLE_CHANGE_SERIALIZER =
@@ -741,6 +740,7 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
                 MySqlSourceReaderMetrics sourceReaderMetrics,
                 boolean includeSchemaChanges,
                 int limit) {
+            super(debeziumDeserializationSchema, sourceReaderMetrics, 
includeSchemaChanges);
             this.debeziumDeserializationSchema = debeziumDeserializationSchema;
             this.sourceReaderMetrics = sourceReaderMetrics;
             this.includeSchemaChanges = includeSchemaChanges;
@@ -766,7 +766,7 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
             }
         }
 
-        private void processElement(
+        protected void processElement(
                 SourceRecord element, SourceOutput<SourceRecord> output, 
MySqlSplitState splitState)
                 throws Exception {
             if (isWatermarkEvent(element)) {
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
index ca349dfd2..6c929ee28 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
@@ -187,6 +187,95 @@ class YamlJobMigrationITCase extends 
PipelineTestEnvironment {
         cancelJob(newJobID);
     }
 
+    @ParameterizedTest(name = "{0} -> SNAPSHOT")
+    @EnumSource(names = {"SNAPSHOT"})
+    void 
testStartingJobFromSavepointWithSchemaChange(TarballFetcher.CdcVersion 
migrateFromVersion)
+            throws Exception {
+        TarballFetcher.fetch(jobManager, migrateFromVersion);
+        runInContainerAsRoot(jobManager, "chmod", "0777", "-R", "/tmp/cdc/");
+
+        LOG.info("Successfully fetched CDC {}.", migrateFromVersion);
+
+        String content =
+                String.format(
+                        "source:\n"
+                                + "  type: mysql\n"
+                                + "  hostname: %s\n"
+                                + "  port: %d\n"
+                                + "  username: %s\n"
+                                + "  password: %s\n"
+                                + "  tables: %s.\\.*\n"
+                                + "  server-id: 5400-5404\n"
+                                + "  server-time-zone: UTC\n"
+                                + "\n"
+                                + "sink:\n"
+                                + "  type: values\n"
+                                + "  materialized.in.memory: false"
+                                + "\n"
+                                + "pipeline:\n"
+                                + "  parallelism: %d\n"
+                                + "use.legacy.json.format: true\n",
+                        INTER_CONTAINER_MYSQL_ALIAS,
+                        MySqlContainer.MYSQL_PORT,
+                        MYSQL_TEST_USER,
+                        MYSQL_TEST_PASSWORD,
+                        mysqlInventoryDatabase.getDatabaseName(),
+                        4);
+        JobID jobID = submitPipelineJob(migrateFromVersion, content);
+        Assertions.assertThat(jobID).isNotNull();
+        LOG.info("Submitted Job ID is {} ", jobID);
+
+        validateResult(
+                dbNameFormatter,
+                "CreateTableEvent{tableId=%s.customers, schema=columns={`id` 
INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` 
VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}",
+                "CreateTableEvent{tableId=%s.products, schema=columns={`id` 
INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` 
VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` 
STRING}, primaryKeys=id, options=()}",
+                "DataChangeEvent{tableId=%s.customers, before=[], after=[101, 
user_1, Shanghai, 123567891234], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.customers, before=[], after=[102, 
user_2, Shanghai, 123567891234], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.customers, before=[], after=[103, 
user_3, Shanghai, 123567891234], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.customers, before=[], after=[104, 
user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[101, 
scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, 
{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[102, 
car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, 
{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[103, 
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 
0.8, red, {\"key3\": \"value3\"}, 
{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[104, 
hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, 
{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[105, 
hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, 
{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[106, 
hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[107, 
rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[108, 
jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, 
meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[109, 
spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}");
+        LOG.info("Snapshot stage finished successfully.");
+
+        generateIncrementalEventsPhaseOne();
+        validateResult(
+                dbNameFormatter,
+                "DataChangeEvent{tableId=%s.products, before=[106, hammer, 
16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz 
carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[107, rocks, box 
of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted 
rocks, 5.1, null, null, null], op=UPDATE, meta=()}");
+        LOG.info("Incremental stage 1 finished successfully.");
+
+        String savepointPath = stopJobWithSavepoint(jobID);
+        LOG.info("Stopped Job {} and created a savepoint at {}.", jobID, 
savepointPath);
+        // Modify schema and make some data changes.
+        generateIncrementalEventsPhaseTwo();
+        JobID newJobID = submitPipelineJob(content, savepointPath, true);
+        LOG.info("Reincarnated Job {} has been submitted successfully.", 
newJobID);
+        validateResult(
+                dbNameFormatter,
+                "CreateTableEvent{tableId=%s.products, schema=columns={`id` 
INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` 
VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` 
STRING}, primaryKeys=id, options=()}",
+                "DataChangeEvent{tableId=%s.products, before=[107, rocks, box 
of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted 
rocks, 5.1, null, null, null], op=UPDATE, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[106, hammer, 
16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz 
carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}",
+                "AddColumnEvent{tableId=%s.products, 
addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, 
existedColumnName=null}]}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[110, 
jacket, water resistent white wind breaker, 0.2, null, null, null, 1], 
op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[111, 
scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, 
meta=()}");
+        LOG.info("Incremental stage 2 finished successfully.");
+
+        generateIncrementalEventsPhaseThree();
+        validateResult(
+                dbNameFormatter,
+                "DataChangeEvent{tableId=%s.products, before=[110, jacket, 
water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, 
jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], 
op=UPDATE, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[111, scooter, 
Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 
2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[111, scooter, 
Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, 
meta=()}");
+        cancelJob(newJobID);
+    }
+
     private void generateIncrementalEventsPhaseOne() {
         executeMySqlStatements(
                 mysqlInventoryDatabase,
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index 97293ab5e..61af9e370 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -17,9 +17,6 @@
 
 package org.apache.flink.cdc.runtime.operators.transform;
 
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
@@ -50,15 +47,13 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
@@ -70,16 +65,11 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
 
     private static final long serialVersionUID = 1L;
 
-    /** All tables which have been sent {@link CreateTableEvent} to 
downstream. */
-    private final Set<TableId> alreadySentCreateTableEvents;
-
     private final List<TransformRule> transformRules;
     private final Map<TableId, PreTransformChangeInfo> 
preTransformChangeInfoMap;
     private final List<Tuple2<Selectors, SchemaMetadataTransform>> 
schemaMetadataTransformers;
     private final List<Tuple3<String, String, Map<String, String>>> 
udfFunctions;
-    private final boolean shouldStoreSchemasInState;
 
-    private transient ListState<byte[]> state;
     private transient List<PreTransformer> transforms;
     private transient List<UserDefinedFunctionDescriptor> udfDescriptors;
     private transient Map<TableId, PreTransformProcessor> 
preTransformProcessorMap;
@@ -91,17 +81,14 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
 
     PreTransformOperator(
             List<TransformRule> transformRules,
-            List<Tuple3<String, String, Map<String, String>>> udfFunctions,
-            boolean shouldStoreSchemasInState) {
-        this.preTransformChangeInfoMap = new ConcurrentHashMap<>();
-        this.alreadySentCreateTableEvents = new HashSet<>();
-        this.preTransformProcessorMap = new ConcurrentHashMap<>();
+            List<Tuple3<String, String, Map<String, String>>> udfFunctions) {
+        this.preTransformChangeInfoMap = new HashMap<>();
+        this.preTransformProcessorMap = new HashMap<>();
         this.schemaMetadataTransformers = new ArrayList<>();
         this.chainingStrategy = ChainingStrategy.ALWAYS;
 
         this.transformRules = transformRules;
         this.udfFunctions = udfFunctions;
-        this.shouldStoreSchemasInState = shouldStoreSchemasInState;
     }
 
     @Override
@@ -115,8 +102,6 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
                         .map(udf -> new UserDefinedFunctionDescriptor(udf.f0, 
udf.f1, udf.f2))
                         .collect(Collectors.toList());
 
-        // Initialize data fields in advance because they might be accessed in
-        // `::initializeState` function when restoring from a previous state.
         this.transforms = new ArrayList<>();
         for (TransformRule transformRule : transformRules) {
             String tableInclusions = transformRule.getTableInclusions();
@@ -144,54 +129,17 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
-        if (!shouldStoreSchemasInState) {
-            // Skip schema persistency if we're in the distributed schema mode 
or the batch
-            // execution mode.
-            return;
-        }
-        OperatorStateStore stateStore = context.getOperatorStateStore();
-        ListStateDescriptor<byte[]> descriptor =
-                new ListStateDescriptor<>("originalSchemaState", byte[].class);
-        state = stateStore.getUnionListState(descriptor);
-        if (context.isRestored()) {
-            for (byte[] serializedTableInfo : state.get()) {
-                PreTransformChangeInfo stateTableChangeInfo =
-                        PreTransformChangeInfo.SERIALIZER.deserialize(
-                                PreTransformChangeInfo.SERIALIZER.getVersion(),
-                                serializedTableInfo);
-                preTransformChangeInfoMap.put(
-                        stateTableChangeInfo.getTableId(), 
stateTableChangeInfo);
-
-                CreateTableEvent restoredCreateTableEvent =
-                        new CreateTableEvent(
-                                stateTableChangeInfo.getTableId(),
-                                
stateTableChangeInfo.getPreTransformedSchema());
-                // hasAsteriskMap needs to be recalculated after restoring 
from a checkpoint.
-                cacheTransformRuleInfo(restoredCreateTableEvent);
-            }
-        }
+        // Historically, transform operator maintains internal state to help 
transforming schemas
+        // correctly after restart.
+        // However, it is not required after
+        // [FLINK-38045] got merged, since
+        // MySQL source will emit correct CreateTableEvents from state.
     }
 
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
-        if (!shouldStoreSchemasInState) {
-            // Same reason in this#initializeState.
-            return;
-        }
-        state.update(
-                new ArrayList<>(
-                        preTransformChangeInfoMap.values().stream()
-                                .map(
-                                        tableChangeInfo -> {
-                                            try {
-                                                return 
PreTransformChangeInfo.SERIALIZER.serialize(
-                                                        tableChangeInfo);
-                                            } catch (IOException e) {
-                                                throw new RuntimeException(e);
-                                            }
-                                        })
-                                .collect(Collectors.toList())));
+        // Do nothing, the reason is the same as this#initializeState.
     }
 
     @Override
@@ -204,7 +152,6 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
     public void close() throws Exception {
         super.close();
         clearOperator();
-        this.state = null;
     }
 
     @Override
@@ -239,7 +186,6 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
             // which may be different with the schema currently being 
processed.
             if 
(!preTransformProcessorMap.containsKey(createTableEvent.tableId())) {
                 output.collect(new 
StreamRecord<>(cacheCreateTable(createTableEvent)));
-                alreadySentCreateTableEvents.add(createTableEvent.tableId());
             }
         } else if (event instanceof DropTableEvent) {
             preTransformProcessorMap.remove(((DropTableEvent) 
event).tableId());
@@ -247,33 +193,16 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
         } else if (event instanceof TruncateTableEvent) {
             output.collect(new StreamRecord<>(event));
         } else if (event instanceof SchemaChangeEvent) {
-            lazilyEmitCreateTableEvent(event);
             SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
             preTransformProcessorMap.remove(schemaChangeEvent.tableId());
             cacheChangeSchema(schemaChangeEvent)
                     .ifPresent(e -> output.collect(new StreamRecord<>(e)));
         } else if (event instanceof DataChangeEvent) {
-            lazilyEmitCreateTableEvent(event);
             output.collect(new 
StreamRecord<>(processDataChangeEvent(((DataChangeEvent) event))));
         }
     }
 
-    /** Emit related CreateTableEvent for the first time when meeting 
ChangeEvent. */
-    private void lazilyEmitCreateTableEvent(Event event) {
-        ChangeEvent changeEvent = (ChangeEvent) event;
-        if (!alreadySentCreateTableEvents.contains(changeEvent.tableId())) {
-            PreTransformChangeInfo stateTableChangeInfo =
-                    preTransformChangeInfoMap.get(changeEvent.tableId());
-            CreateTableEvent createTableEvent =
-                    new CreateTableEvent(
-                            stateTableChangeInfo.getTableId(),
-                            stateTableChangeInfo.getPreTransformedSchema());
-            output.collect(new StreamRecord<>(createTableEvent));
-            alreadySentCreateTableEvents.add(changeEvent.tableId());
-        }
-    }
-
-    private SchemaChangeEvent cacheCreateTable(CreateTableEvent event) {
+    private CreateTableEvent cacheCreateTable(CreateTableEvent event) {
         TableId tableId = event.tableId();
         Schema originalSchema = event.getSchema();
         event = transformCreateTableEvent(event);
@@ -354,6 +283,7 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
                                 tableId,
                                 transformSchemaMetaData(
                                         createTableEvent.getSchema(), 
transform.f1));
+                break;
             }
         }
 
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java
index e458b6416..bb4df65a5 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java
@@ -29,7 +29,6 @@ import java.util.Map;
 /** Builder of {@link PreTransformOperator}. */
 public class PreTransformOperatorBuilder {
     private final List<TransformRule> transformRules = new ArrayList<>();
-    private boolean shouldStoreSchemasInState;
 
     private final List<Tuple3<String, String, Map<String, String>>> 
udfFunctions =
             new ArrayList<>();
@@ -77,13 +76,7 @@ public class PreTransformOperatorBuilder {
         return this;
     }
 
-    public PreTransformOperatorBuilder shouldStoreSchemasInState(
-            boolean shouldStoreSchemasInState) {
-        this.shouldStoreSchemasInState = shouldStoreSchemasInState;
-        return this;
-    }
-
     public PreTransformOperator build() {
-        return new PreTransformOperator(transformRules, udfFunctions, 
shouldStoreSchemasInState);
+        return new PreTransformOperator(transformRules, udfFunctions);
     }
 }

Reply via email to