This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new ee843e2f2 [FLINK-36114][cdc-runtime] Make SchemaRegistryRequestHandler 
thread safe by blocking  subsequent schemaChangeEvent
ee843e2f2 is described below

commit ee843e2f246c300ecfeaf5cfd529693fadf2625f
Author: yuxiqian <[email protected]>
AuthorDate: Thu Aug 22 19:19:21 2024 +0800

    [FLINK-36114][cdc-runtime] Make SchemaRegistryRequestHandler thread safe by 
blocking  subsequent schemaChangeEvent
    
    This closes #3563.
    
    Co-authored-by: Hongshun Wang <[email protected]>
---
 .../source/reader/MySqlPipelineRecordEmitter.java  |  62 ++--
 .../mysql/source/MySqlFullTypesITCase.java         |  26 +-
 .../mysql/source/MySqlPipelineITCase.java          |  30 +-
 .../mysql/testutils/MySqSourceTestUtils.java       |  19 ++
 .../cdc/pipeline/tests/MySqlToDorisE2eITCase.java  |  10 +-
 .../flink/cdc/pipeline/tests/MysqlE2eITCase.java   |  10 +-
 .../flink/cdc/pipeline/tests/RouteE2eITCase.java   |  35 ++-
 .../cdc/pipeline/tests/SchemaEvolveE2eITCase.java  |  10 +-
 .../cdc/pipeline/tests/TransformE2eITCase.java     |  44 ++-
 .../flink/cdc/pipeline/tests/UdfE2eITCase.java     |   6 +-
 .../tests/utils/PipelineTestEnvironment.java       |   2 +
 .../runtime/operators/schema/SchemaOperator.java   |  74 +++--
 .../schema/coordinator/SchemaManager.java          |  96 ++++++
 .../schema/coordinator/SchemaRegistry.java         |  10 +-
 .../coordinator/SchemaRegistryRequestHandler.java  | 349 +++++++++------------
 .../schema/event/RefreshPendingListsRequest.java   |  27 --
 .../schema/event/RefreshPendingListsResponse.java  |  26 --
 .../schema/event/ReleaseUpstreamRequest.java       |  32 --
 .../event/SchemaChangeProcessingResponse.java      |   4 +-
 .../schema/event/SchemaChangeResponse.java         |  75 ++++-
 ...sponse.java => SchemaChangeResultResponse.java} |   8 +-
 .../operators/EventOperatorTestHarness.java        |   1 -
 22 files changed, 540 insertions(+), 416 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index 3f7581347..909ed6c5b 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 import 
org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
 import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
+import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
 import org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils;
 import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
@@ -68,7 +69,7 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
 
     // Used when startup mode is not initial
     private boolean alreadySendCreateTableForBinlogSplit = false;
-    private final List<CreateTableEvent> createTableEventCache;
+    private List<CreateTableEvent> createTableEventCache;
 
     public MySqlPipelineRecordEmitter(
             DebeziumDeserializationSchema<Event> debeziumDeserializationSchema,
@@ -80,23 +81,7 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
                 sourceConfig.isIncludeSchemaChanges());
         this.sourceConfig = sourceConfig;
         this.alreadySendCreateTableTables = new HashSet<>();
-        this.createTableEventCache = new ArrayList<>();
-
-        if 
(!sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
-            try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
-                List<TableId> capturedTableIds = listTables(jdbc, 
sourceConfig.getTableFilters());
-                for (TableId tableId : capturedTableIds) {
-                    Schema schema = getSchema(jdbc, tableId);
-                    createTableEventCache.add(
-                            new CreateTableEvent(
-                                    
org.apache.flink.cdc.common.event.TableId.tableId(
-                                            tableId.catalog(), 
tableId.table()),
-                                    schema));
-                }
-            } catch (SQLException e) {
-                throw new RuntimeException("Cannot start emitter to fetch 
table schema.", e);
-            }
-        }
+        this.createTableEventCache = generateCreateTableEvent(sourceConfig);
     }
 
     @Override
@@ -104,6 +89,8 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
             SourceRecord element, SourceOutput<Event> output, MySqlSplitState 
splitState)
             throws Exception {
         if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) 
{
+            // In Snapshot phase of INITIAL startup mode, we lazily send 
CreateTableEvent to
+            // downstream to avoid checkpoint timeout.
             TableId tableId = 
splitState.asSnapshotSplitState().toMySqlSplit().getTableId();
             if (!alreadySendCreateTableTables.contains(tableId)) {
                 try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
@@ -111,11 +98,24 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
                     alreadySendCreateTableTables.add(tableId);
                 }
             }
-        } else if (splitState.isBinlogSplitState()
-                && !alreadySendCreateTableForBinlogSplit
-                && 
!sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
-            createTableEventCache.forEach(output::collect);
+        } else if (splitState.isBinlogSplitState() && 
!alreadySendCreateTableForBinlogSplit) {
             alreadySendCreateTableForBinlogSplit = true;
+            if 
(sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
+                // In Snapshot -> Binlog transition of INITIAL startup mode, 
ensure all table
+                // schemas have been sent to downstream. We use previously 
cached schema instead of
+                // re-request latest schema because there might be some 
pending schema change events
+                // in the queue, and that may accidentally emit evolved schema 
before corresponding
+                // schema change events.
+                createTableEventCache.stream()
+                        .filter(
+                                event ->
+                                        !alreadySendCreateTableTables.contains(
+                                                
MySqlSchemaUtils.toDbzTableId(event.tableId())))
+                        .forEach(output::collect);
+            } else {
+                // In Binlog only mode, we simply emit all schemas at once.
+                createTableEventCache.forEach(output::collect);
+            }
         }
         super.processElement(element, output, splitState);
     }
@@ -233,4 +233,22 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
         }
         return mySqlAntlrDdlParser;
     }
+
+    private List<CreateTableEvent> generateCreateTableEvent(MySqlSourceConfig 
sourceConfig) {
+        try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
+            List<CreateTableEvent> createTableEventCache = new ArrayList<>();
+            List<TableId> capturedTableIds = listTables(jdbc, 
sourceConfig.getTableFilters());
+            for (TableId tableId : capturedTableIds) {
+                Schema schema = getSchema(jdbc, tableId);
+                createTableEventCache.add(
+                        new CreateTableEvent(
+                                
org.apache.flink.cdc.common.event.TableId.tableId(
+                                        tableId.catalog(), tableId.table()),
+                                schema));
+            }
+            return createTableEventCache;
+        } catch (SQLException e) {
+            throw new RuntimeException("Cannot start emitter to fetch table 
schema.", e);
+        }
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
index 91351dabf..2bd1ae689 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
@@ -295,8 +295,9 @@ public class MySqlFullTypesITCase extends 
MySqlSourceTestBase {
                         .executeAndCollect();
 
         // skip CreateTableEvent
-        List<Event> snapshotResults = 
MySqSourceTestUtils.fetchResults(iterator, 2);
-        RecordData snapshotRecord = ((DataChangeEvent) 
snapshotResults.get(1)).after();
+        List<Event> snapshotResults =
+                MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 
1).f0;
+        RecordData snapshotRecord = ((DataChangeEvent) 
snapshotResults.get(0)).after();
 
         Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, 
recordType))
                 .isEqualTo(expectedSnapshot);
@@ -306,7 +307,8 @@ public class MySqlFullTypesITCase extends 
MySqlSourceTestBase {
             statement.execute("UPDATE precision_types SET time_6_c = null 
WHERE id = 1;");
         }
 
-        List<Event> streamResults = MySqSourceTestUtils.fetchResults(iterator, 
1);
+        List<Event> streamResults =
+                MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 
1).f0;
         RecordData streamRecord = ((DataChangeEvent) 
streamResults.get(0)).after();
         Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, 
recordType))
                 .isEqualTo(expectedStreamRecord);
@@ -397,8 +399,9 @@ public class MySqlFullTypesITCase extends 
MySqlSourceTestBase {
                 };
 
         // skip CreateTableEvent
-        List<Event> snapshotResults = 
MySqSourceTestUtils.fetchResults(iterator, 2);
-        RecordData snapshotRecord = ((DataChangeEvent) 
snapshotResults.get(1)).after();
+        List<Event> snapshotResults =
+                MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 
1).f0;
+        RecordData snapshotRecord = ((DataChangeEvent) 
snapshotResults.get(0)).after();
         Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, 
COMMON_TYPES))
                 .isEqualTo(expectedSnapshot);
 
@@ -412,7 +415,8 @@ public class MySqlFullTypesITCase extends 
MySqlSourceTestBase {
         expectedSnapshot[44] = 
BinaryStringData.fromString("{\"key1\":\"value1\"}");
         Object[] expectedStreamRecord = expectedSnapshot;
 
-        List<Event> streamResults = MySqSourceTestUtils.fetchResults(iterator, 
1);
+        List<Event> streamResults =
+                MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 
1).f0;
         RecordData streamRecord = ((DataChangeEvent) 
streamResults.get(0)).after();
         Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, 
COMMON_TYPES))
                 .isEqualTo(expectedStreamRecord);
@@ -437,9 +441,10 @@ public class MySqlFullTypesITCase extends 
MySqlSourceTestBase {
                                 "Event-Source")
                         .executeAndCollect();
 
-        // skip CreateTableEvent
-        List<Event> snapshotResults = 
MySqSourceTestUtils.fetchResults(iterator, 2);
-        RecordData snapshotRecord = ((DataChangeEvent) 
snapshotResults.get(1)).after();
+        // skip CreateTableEvents
+        List<Event> snapshotResults =
+                MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 
1).f0;
+        RecordData snapshotRecord = ((DataChangeEvent) 
snapshotResults.get(0)).after();
 
         Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, 
recordType))
                 .isEqualTo(expectedSnapshot);
@@ -450,7 +455,8 @@ public class MySqlFullTypesITCase extends 
MySqlSourceTestBase {
                     "UPDATE time_types SET time_6_c = null, timestamp_def_c = 
default WHERE id = 1;");
         }
 
-        List<Event> streamResults = MySqSourceTestUtils.fetchResults(iterator, 
1);
+        List<Event> streamResults =
+                MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 
1).f0;
         RecordData streamRecord = ((DataChangeEvent) 
streamResults.get(0)).after();
         Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, 
recordType))
                 .isEqualTo(expectedStreamRecord);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 9aadcbef2..108a3d36d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -62,6 +62,7 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -226,12 +227,33 @@ public class MySqlPipelineITCase extends 
MySqlSourceTestBase {
                                         BinaryStringData.fromString("c-21")
                                     })));
         }
+        // In this configuration, several subtasks might emit their 
corresponding CreateTableEvent
+        // to downstream. Since it is not possible to predict how many 
CreateTableEvents should we
+        // expect, we simply filter them out from expected sets, and assert 
there's at least one.
         List<Event> actual =
-                fetchResults(events, 1 + expectedSnapshot.size() + 
expectedBinlog.size());
-        assertThat(actual.get(0)).isEqualTo(createTableEvent);
-        assertThat(actual.subList(1, 10))
+                fetchResultsExcept(
+                        events, expectedSnapshot.size() + 
expectedBinlog.size(), createTableEvent);
+        assertThat(actual.subList(0, expectedSnapshot.size()))
                 .containsExactlyInAnyOrder(expectedSnapshot.toArray(new 
Event[0]));
-        assertThat(actual.subList(10, 
actual.size())).isEqualTo(expectedBinlog);
+        assertThat(actual.subList(expectedSnapshot.size(), actual.size()))
+                .isEqualTo(expectedBinlog);
+    }
+
+    private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size, 
T sideEvent) {
+        List<T> result = new ArrayList<>(size);
+        List<T> sideResults = new ArrayList<>();
+        while (size > 0 && iter.hasNext()) {
+            T event = iter.next();
+            if (!event.equals(sideEvent)) {
+                result.add(event);
+                size--;
+            } else {
+                sideResults.add(sideEvent);
+            }
+        }
+        // Also ensure we've received at least one or many side events.
+        assertThat(sideResults).isNotEmpty();
+        return result;
     }
 
     @Test
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java
index 0708cd8cd..a76838d66 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.cdc.connectors.mysql.testutils;
 
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -38,6 +41,22 @@ public class MySqSourceTestUtils {
         return result;
     }
 
+    public static <T> Tuple2<List<T>, List<CreateTableEvent>> 
fetchResultsAndCreateTableEvent(
+            Iterator<T> iter, int size) {
+        List<T> result = new ArrayList<>(size);
+        List<CreateTableEvent> createTableEvents = new ArrayList<>();
+        while (size > 0 && iter.hasNext()) {
+            T event = iter.next();
+            if (event instanceof CreateTableEvent) {
+                createTableEvents.add((CreateTableEvent) event);
+            } else {
+                result.add(event);
+                size--;
+            }
+        }
+        return Tuple2.of(result, createTableEvents);
+    }
+
     public static String getServerId(int parallelism) {
         final Random random = new Random();
         int serverId = random.nextInt(100) + 5400;
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
index 0e58c5ed2..508eebdb7 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
@@ -194,12 +194,13 @@ public class MySqlToDorisE2eITCase extends 
PipelineTestEnvironment {
                                 + "  table.create.properties.replication_num: 
1\n"
                                 + "\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         mysqlInventoryDatabase.getDatabaseName(),
                         DORIS.getUsername(),
-                        DORIS.getPassword());
+                        DORIS.getPassword(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path dorisCdcConnector = 
TestUtils.getResource("doris-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -324,13 +325,14 @@ public class MySqlToDorisE2eITCase extends 
PipelineTestEnvironment {
                                 + "    projection: \\*, 'fine' AS FINE\n"
                                 + "    filter: id <> 3 AND id <> 4\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         complexDataTypesDatabase.getDatabaseName(),
                         DORIS.getUsername(),
                         DORIS.getPassword(),
-                        complexDataTypesDatabase.getDatabaseName());
+                        complexDataTypesDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path dorisCdcConnector = 
TestUtils.getResource("doris-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index e340468f3..1f730be62 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -102,11 +102,12 @@ public class MysqlE2eITCase extends 
PipelineTestEnvironment {
                                 + "  type: values\n"
                                 + "\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
-                        mysqlInventoryDatabase.getDatabaseName());
+                        mysqlInventoryDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -209,11 +210,12 @@ public class MysqlE2eITCase extends 
PipelineTestEnvironment {
                                 + "  type: values\n"
                                 + "\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
-                        mysqlInventoryDatabase.getDatabaseName());
+                        mysqlInventoryDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
index 0834ce6e7..d1af8560c 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
@@ -154,11 +154,12 @@ public class RouteE2eITCase extends 
PipelineTestEnvironment {
                                 + "  type: values\n"
                                 + "\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
-                        routeTestDatabase.getDatabaseName());
+                        routeTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -248,13 +249,14 @@ public class RouteE2eITCase extends 
PipelineTestEnvironment {
                                 + "    sink-table: %s.ALL\n"
                                 + "\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         routeTestDatabase.getDatabaseName(),
                         routeTestDatabase.getDatabaseName(),
-                        routeTestDatabase.getDatabaseName());
+                        routeTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -331,13 +333,14 @@ public class RouteE2eITCase extends 
PipelineTestEnvironment {
                                 + "    sink-table: NEW_%s.ALPHABET\n"
                                 + "\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         routeTestDatabase.getDatabaseName(),
                         routeTestDatabase.getDatabaseName(),
-                        routeTestDatabase.getDatabaseName());
+                        routeTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -428,7 +431,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment 
{
                                 + "    sink-table: NEW_%s.BETAGAMM\n"
                                 + "\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
@@ -436,7 +439,8 @@ public class RouteE2eITCase extends PipelineTestEnvironment 
{
                         routeTestDatabase.getDatabaseName(),
                         routeTestDatabase.getDatabaseName(),
                         routeTestDatabase.getDatabaseName(),
-                        routeTestDatabase.getDatabaseName());
+                        routeTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -535,7 +539,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment 
{
                                 + "    sink-table: NEW_%s.TABLEC\n"
                                 + "\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
@@ -545,7 +549,8 @@ public class RouteE2eITCase extends PipelineTestEnvironment 
{
                         routeTestDatabase.getDatabaseName(),
                         routeTestDatabase.getDatabaseName(),
                         routeTestDatabase.getDatabaseName(),
-                        routeTestDatabase.getDatabaseName());
+                        routeTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -647,14 +652,15 @@ public class RouteE2eITCase extends 
PipelineTestEnvironment {
                                 + "    sink-table: %s.ALL\n"
                                 + "\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         routeTestDatabase.getDatabaseName(),
                         routeTestDatabase.getDatabaseName(),
                         routeTestDatabase.getDatabaseName(),
-                        routeTestDatabase.getDatabaseName());
+                        routeTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -733,13 +739,14 @@ public class RouteE2eITCase extends 
PipelineTestEnvironment {
                                 + "    replace-symbol: <>\n"
                                 + "\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         routeTestDatabase.getDatabaseName(),
                         routeTestDatabase.getDatabaseName(),
-                        routeTestDatabase.getDatabaseName());
+                        routeTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
index 14c6a5912..92c2622c9 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
@@ -240,11 +240,12 @@ public class SchemaEvolveE2eITCase extends 
PipelineTestEnvironment {
                                 + "\n"
                                 + "pipeline:\n"
                                 + "  schema.change.behavior: unexpected\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
-                        schemaEvolveDatabase.getDatabaseName());
+                        schemaEvolveDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -310,13 +311,14 @@ public class SchemaEvolveE2eITCase extends 
PipelineTestEnvironment {
                                 + "\n"
                                 + "pipeline:\n"
                                 + "  schema.change.behavior: %s\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         dbName,
                         mergeTable ? "(members|new_members)" : "members",
-                        behavior);
+                        behavior,
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index 6f612ca95..ae516fc06 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -118,7 +118,7 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                                 + "  - source-table: %s.TABLEBETA\n"
                                 + "    projection: ID, VERSION\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
@@ -126,7 +126,8 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                         transformTestDatabase.getDatabaseName(),
                         transformTestDatabase.getDatabaseName(),
                         transformTestDatabase.getDatabaseName(),
-                        transformTestDatabase.getDatabaseName());
+                        transformTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -200,13 +201,14 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                                 + "    filter: ID <= 1008\n"
                                 + "\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         transformTestDatabase.getDatabaseName(),
                         transformTestDatabase.getDatabaseName(),
-                        transformTestDatabase.getDatabaseName());
+                        transformTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -287,7 +289,7 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                                 + "  - source-table: %s.TABLEBETA\n"
                                 + "    projection: ID, CONCAT('v', VERSION) AS 
VERSION, LOWER(NAMEBETA) AS NAME\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
@@ -295,7 +297,8 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                         transformTestDatabase.getDatabaseName(),
                         transformTestDatabase.getDatabaseName(),
                         transformTestDatabase.getDatabaseName(),
-                        transformTestDatabase.getDatabaseName());
+                        transformTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -365,13 +368,14 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                                 + "  - source-table: %s.TABLEBETA\n"
                                 + "    projection: \\*, CONCAT('v', VERSION) 
AS VERSION, LOWER(NAMEBETA) AS NAME\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         transformTestDatabase.getDatabaseName(),
                         transformTestDatabase.getDatabaseName(),
-                        transformTestDatabase.getDatabaseName());
+                        transformTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -446,13 +450,14 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                                 + "  - source-table: %s.TABLEBETA\n"
                                 + "    projection: \\*, __namespace_name__ || 
'.' || __schema_name__ || '.' || __table_name__ AS identifier_name, 
__data_event_type__ AS type\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         transformTestDatabase.getDatabaseName(),
                         transformTestDatabase.getDatabaseName(),
-                        transformTestDatabase.getDatabaseName());
+                        transformTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -530,12 +535,13 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                                 + "  - source-table: %s.TABLE\\.*\n"
                                 + "    projection: \\*, ID + 1000 as UID, 
VERSION AS NEWVERSION\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         transformTestDatabase.getDatabaseName(),
-                        transformTestDatabase.getDatabaseName());
+                        transformTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -613,13 +619,14 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                                 + "  - source-table: %s.TABLEBETA\n"
                                 + "    projection: ID, CAST(VERSION AS DOUBLE) 
+ 100 AS VERSION, CAST(AGEBETA AS VARCHAR) || ' - ' || NAMEBETA AS IDENTIFIER\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         transformTestDatabase.getDatabaseName(),
                         transformTestDatabase.getDatabaseName(),
-                        transformTestDatabase.getDatabaseName());
+                        transformTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -708,13 +715,14 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                                 + "    projection: ID, LOCALTIME as lcl_t, 
CURRENT_TIME as cur_t, CAST(CURRENT_TIMESTAMP AS TIMESTAMP) as cur_ts, 
CAST(NOW() AS TIMESTAMP) as now_ts, LOCALTIMESTAMP as lcl_ts, CURRENT_DATE as 
cur_dt\n"
                                 + "\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1\n"
+                                + "  parallelism: %d\n"
                                 + "  local-time-zone: America/Los_Angeles",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         transformTestDatabase.getDatabaseName(),
-                        transformTestDatabase.getDatabaseName());
+                        transformTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -781,7 +789,9 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
     }
 
     boolean extractDataLines(String line) {
-        if (!line.startsWith("DataChangeEvent{")) {
+        // In multiple parallelism mode, a prefix with subTaskId (like '1> ') 
will be appended.
+        // Should trim it before extracting data fields.
+        if (!line.startsWith("DataChangeEvent{", 3)) {
             return false;
         }
         Stream.of("before", "after")
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
index 938e2d98e..9f5b98298 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
@@ -128,7 +128,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment {
                                 + "    projection: ID, VERSION, answer() AS 
ANS, typeof(ID) AS TYP\n"
                                 + "\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1\n"
+                                + "  parallelism: %d\n"
                                 + "  user-defined-function:\n"
                                 + "    - name: addone\n"
                                 + "      classpath: 
org.apache.flink.cdc.udf.examples.%s.AddOneFunctionClass\n"
@@ -146,6 +146,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment {
                         transformRenameDatabase.getDatabaseName(),
                         transformRenameDatabase.getDatabaseName(),
                         transformRenameDatabase.getDatabaseName(),
+                        parallelism,
                         language,
                         language,
                         language,
@@ -267,7 +268,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment {
                                 + "    projection: ID, VERSION, typeof(ID) AS 
TYP\n"
                                 + "\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1\n"
+                                + "  parallelism: %d\n"
                                 + "  user-defined-function:\n"
                                 + "    - name: addone\n"
                                 + "      classpath: 
org.apache.flink.udf.examples.%s.AddOneFunctionClass\n"
@@ -281,6 +282,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment {
                         transformRenameDatabase.getDatabaseName(),
                         transformRenameDatabase.getDatabaseName(),
                         transformRenameDatabase.getDatabaseName(),
+                        parallelism,
                         language,
                         language,
                         language);
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
index 72576bf17..048e23bd7 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -67,6 +67,8 @@ public abstract class PipelineTestEnvironment extends 
TestLogger {
 
     @Parameterized.Parameter public String flinkVersion;
 
+    public Integer parallelism = 4;
+
     // 
------------------------------------------------------------------------------------------
     // Flink Variables
     // 
------------------------------------------------------------------------------------------
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
index a700fd39c..1b4f50e89 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
@@ -39,20 +39,17 @@ import org.apache.flink.cdc.common.types.DataTypeRoot;
 import org.apache.flink.cdc.common.utils.ChangeEventUtils;
 import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse;
 import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest;
+import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
 import 
org.apache.flink.cdc.runtime.operators.schema.metrics.SchemaOperatorMetrics;
 import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
-import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -120,6 +117,7 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
     private final SchemaChangeBehavior schemaChangeBehavior;
 
     private transient SchemaOperatorMetrics schemaOperatorMetrics;
+    private transient int subTaskId;
 
     @VisibleForTesting
     public SchemaOperator(List<RouteRule> routingRules) {
@@ -153,6 +151,7 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
         schemaOperatorMetrics =
                 new SchemaOperatorMetrics(
                         getRuntimeContext().getMetricGroup(), 
schemaChangeBehavior);
+        subTaskId = getRuntimeContext().getIndexOfThisSubtask();
     }
 
     @Override
@@ -218,17 +217,6 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
                                 });
     }
 
-    @Override
-    public void initializeState(StateInitializationContext context) throws 
Exception {
-        if (context.isRestored()) {
-            // Multiple operators may appear during a restart process,
-            // only clear the pendingSchemaChanges when the first operator 
starts.
-            if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
-                sendRequestToCoordinator(new RefreshPendingListsRequest());
-            }
-        }
-    }
-
     /**
      * This method is guaranteed to not be called concurrently with other 
methods of the operator.
      */
@@ -248,7 +236,11 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
     private void processSchemaChangeEvents(SchemaChangeEvent event)
             throws InterruptedException, TimeoutException, ExecutionException {
         TableId tableId = event.tableId();
-        LOG.info("Table {} received SchemaChangeEvent and start to be 
blocked.", tableId);
+        LOG.info(
+                "{}> Table {} received SchemaChangeEvent {} and start to be 
blocked.",
+                subTaskId,
+                tableId,
+                event);
         handleSchemaChangeEvent(tableId, event);
         // Update caches
         originalSchema.put(tableId, getLatestOriginalSchema(tableId));
@@ -412,36 +404,62 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
                             schemaChangeEvent));
         }
 
-        // The request will need to send a FlushEvent or block until flushing 
finished
+        // The request will block if another schema change event is being 
handled
         SchemaChangeResponse response = requestSchemaChange(tableId, 
schemaChangeEvent);
-        if (!response.getSchemaChangeEvents().isEmpty()) {
-            LOG.info(
-                    "Sending the FlushEvent for table {} in subtask {}.",
-                    tableId,
-                    getRuntimeContext().getIndexOfThisSubtask());
+        if (response.isAccepted()) {
+            LOG.info("{}> Sending the FlushEvent for table {}.", subTaskId, 
tableId);
             output.collect(new StreamRecord<>(new FlushEvent(tableId)));
             List<SchemaChangeEvent> expectedSchemaChangeEvents = 
response.getSchemaChangeEvents();
             
schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());
 
             // The request will block until flushing finished in each sink 
writer
-            ReleaseUpstreamResponse schemaEvolveResponse = 
requestReleaseUpstream();
+            SchemaChangeResultResponse schemaEvolveResponse = 
requestSchemaChangeResult();
             List<SchemaChangeEvent> finishedSchemaChangeEvents =
                     schemaEvolveResponse.getFinishedSchemaChangeEvents();
 
             // Update evolved schema changes based on apply results
             finishedSchemaChangeEvents.forEach(e -> output.collect(new 
StreamRecord<>(e)));
+        } else if (response.isDuplicate()) {
+            LOG.info(
+                    "{}> Schema change event {} has been handled in another 
subTask already.",
+                    subTaskId,
+                    schemaChangeEvent);
+        } else if (response.isIgnored()) {
+            LOG.info(
+                    "{}> Schema change event {} has been ignored. No schema 
evolution needed.",
+                    subTaskId,
+                    schemaChangeEvent);
+        } else {
+            throw new IllegalStateException("Unexpected response status " + 
response);
         }
     }
 
     private SchemaChangeResponse requestSchemaChange(
-            TableId tableId, SchemaChangeEvent schemaChangeEvent) {
-        return sendRequestToCoordinator(new SchemaChangeRequest(tableId, 
schemaChangeEvent));
+            TableId tableId, SchemaChangeEvent schemaChangeEvent)
+            throws InterruptedException, TimeoutException {
+        long schemaEvolveTimeOutMillis = System.currentTimeMillis() + 
rpcTimeOutInMillis;
+        while (true) {
+            SchemaChangeResponse response =
+                    sendRequestToCoordinator(new SchemaChangeRequest(tableId, 
schemaChangeEvent));
+            if (response.isRegistryBusy()) {
+                if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
+                    LOG.info(
+                            "{}> Schema Registry is busy now, waiting for next 
request...",
+                            subTaskId);
+                    Thread.sleep(1000);
+                } else {
+                    throw new TimeoutException("TimeOut when requesting schema 
change");
+                }
+            } else {
+                return response;
+            }
+        }
     }
 
-    private ReleaseUpstreamResponse requestReleaseUpstream()
+    private SchemaChangeResultResponse requestSchemaChangeResult()
             throws InterruptedException, TimeoutException {
         CoordinationResponse coordinationResponse =
-                sendRequestToCoordinator(new ReleaseUpstreamRequest());
+                sendRequestToCoordinator(new SchemaChangeResultRequest());
         long nextRpcTimeOutMillis = System.currentTimeMillis() + 
rpcTimeOutInMillis;
         while (coordinationResponse instanceof SchemaChangeProcessingResponse) 
{
             if (System.currentTimeMillis() < nextRpcTimeOutMillis) {
@@ -451,7 +469,7 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
                 throw new TimeoutException("TimeOut when requesting release 
upstream");
             }
         }
-        return ((ReleaseUpstreamResponse) coordinationResponse);
+        return ((SchemaChangeResultResponse) coordinationResponse);
     }
 
     private <REQUEST extends CoordinationRequest, RESPONSE extends 
CoordinationResponse>
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
index b87ef5b12..60e70b7ed 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
@@ -18,11 +18,15 @@
 package org.apache.flink.cdc.runtime.operators.schema.coordinator;
 
 import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.utils.SchemaUtils;
 import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
 import org.apache.flink.cdc.runtime.serializer.schema.SchemaSerializer;
@@ -39,6 +43,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
@@ -93,6 +98,97 @@ public class SchemaManager {
         return behavior;
     }
 
+    /**
+     * This function checks if the given schema change event has been applied 
already. If so, it
+     * will be ignored to avoid sending duplicate evolved schema change events 
to sink metadata
+     * applier.
+     */
+    public final boolean 
isOriginalSchemaChangeEventRedundant(SchemaChangeEvent event) {
+        TableId tableId = event.tableId();
+        Optional<Schema> latestSchema = getLatestOriginalSchema(tableId);
+        return Boolean.TRUE.equals(
+                SchemaChangeEventVisitor.visit(
+                        event,
+                        addColumnEvent -> {
+                            // It has not been applied if schema does not even 
exist
+                            if (!latestSchema.isPresent()) {
+                                return false;
+                            }
+                            List<Column> existedColumns = 
latestSchema.get().getColumns();
+
+                            // It has been applied only if all columns are 
present in existedColumns
+                            for (AddColumnEvent.ColumnWithPosition column :
+                                    addColumnEvent.getAddedColumns()) {
+                                if 
(!existedColumns.contains(column.getAddColumn())) {
+                                    return false;
+                                }
+                            }
+                            return true;
+                        },
+                        alterColumnTypeEvent -> {
+                            // It has not been applied if schema does not even 
exist
+                            if (!latestSchema.isPresent()) {
+                                return false;
+                            }
+                            Schema schema = latestSchema.get();
+
+                            // It has been applied only if all column types 
are set as expected
+                            for (Map.Entry<String, DataType> entry :
+                                    
alterColumnTypeEvent.getTypeMapping().entrySet()) {
+                                if 
(!schema.getColumn(entry.getKey()).isPresent()
+                                        || !schema.getColumn(entry.getKey())
+                                                .get()
+                                                .getType()
+                                                .equals(entry.getValue())) {
+                                    return false;
+                                }
+                            }
+                            return true;
+                        },
+                        createTableEvent -> {
+                            // It has been applied if such table already exists
+                            return latestSchema.isPresent();
+                        },
+                        dropColumnEvent -> {
+                            // It has not been applied if schema does not even 
exist
+                            if (!latestSchema.isPresent()) {
+                                return false;
+                            }
+                            List<String> existedColumnNames = 
latestSchema.get().getColumnNames();
+
+                            // It has been applied only if corresponding 
column types do not exist
+                            return 
dropColumnEvent.getDroppedColumnNames().stream()
+                                    .noneMatch(existedColumnNames::contains);
+                        },
+                        dropTableEvent -> {
+                            // It has been applied if such table does not exist
+                            return !latestSchema.isPresent();
+                        },
+                        renameColumnEvent -> {
+                            // It has been applied if such table already exists
+                            if (!latestSchema.isPresent()) {
+                                return false;
+                            }
+                            List<String> existedColumnNames = 
latestSchema.get().getColumnNames();
+
+                            // It has been applied only if all previous names 
do not exist, and all
+                            // new names already exist
+                            for (Map.Entry<String, String> entry :
+                                    
renameColumnEvent.getNameMapping().entrySet()) {
+                                if (existedColumnNames.contains(entry.getKey())
+                                        || 
!existedColumnNames.contains(entry.getValue())) {
+                                    return false;
+                                }
+                            }
+                            return true;
+                        },
+                        truncateTableEvent -> {
+                            // We have no way to ensure if a 
TruncateTableEvent has been applied
+                            // before. Just assume it's not.
+                            return false;
+                        }));
+    }
+
     public final boolean schemaExists(
             Map<TableId, SortedMap<Integer, Schema>> schemaMap, TableId 
tableId) {
         return schemaMap.containsKey(tableId) && 
!schemaMap.get(tableId).isEmpty();
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
index 9087ae4b3..8ea3a1f93 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
@@ -29,8 +29,6 @@ import 
org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaReque
 import 
org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaResponse;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.GetOriginalSchemaRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.GetOriginalSchemaResponse;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
 import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent;
@@ -201,18 +199,14 @@ public class SchemaRegistry implements 
OperatorCoordinator, CoordinationRequestH
             if (request instanceof SchemaChangeRequest) {
                 SchemaChangeRequest schemaChangeRequest = 
(SchemaChangeRequest) request;
                 return 
requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
-            } else if (request instanceof ReleaseUpstreamRequest) {
-                return requestHandler.handleReleaseUpstreamRequest();
+            } else if (request instanceof SchemaChangeResultRequest) {
+                return requestHandler.getSchemaChangeResult();
             } else if (request instanceof GetEvolvedSchemaRequest) {
                 return CompletableFuture.completedFuture(
                         
wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request))));
             } else if (request instanceof GetOriginalSchemaRequest) {
                 return CompletableFuture.completedFuture(
                         
wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request)));
-            } else if (request instanceof SchemaChangeResultRequest) {
-                return requestHandler.getSchemaChangeResult();
-            } else if (request instanceof RefreshPendingListsRequest) {
-                return requestHandler.refreshPendingLists();
             } else {
                 throw new IllegalArgumentException(
                         "Unrecognized CoordinationRequest type: " + request);
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index a84ee8d63..99019a6b4 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -33,41 +33,36 @@ import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.cdc.common.types.DataType;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsResponse;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
+import org.apache.flink.cdc.common.utils.Preconditions;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse;
 import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
+import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.NotThreadSafe;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.RequestStatus.RECEIVED_RELEASE_REQUEST;
 import static 
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap;
 
 /** A handler to deal with all requests and events for {@link SchemaRegistry}. 
*/
 @Internal
-@NotThreadSafe
 public class SchemaRegistryRequestHandler implements Closeable {
     private static final Logger LOG = 
LoggerFactory.getLogger(SchemaRegistryRequestHandler.class);
 
@@ -81,21 +76,18 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
     private final SchemaDerivation schemaDerivation;
 
     /**
-     * Not applied SchemaChangeRequest before receiving all flush success 
events for its table from
-     * sink writers.
+     * Atomic flag indicating if current RequestHandler could accept more 
schema changes for now.
      */
-    private final List<PendingSchemaChange> pendingSchemaChanges;
+    private final AtomicReference<RequestStatus> schemaChangeStatus;
 
-    private final List<SchemaChangeEvent> finishedSchemaChanges;
-    private final List<SchemaChangeEvent> ignoredSchemaChanges;
+    private volatile Throwable currentChangeException;
+    private volatile List<SchemaChangeEvent> currentDerivedSchemaChangeEvents;
+    private volatile List<SchemaChangeEvent> currentFinishedSchemaChanges;
+    private volatile List<SchemaChangeEvent> currentIgnoredSchemaChanges;
 
     /** Sink writers which have sent flush success events for the request. */
     private final Set<Integer> flushedSinkWriters;
 
-    /** Status of the execution of current schema change request. */
-    private volatile boolean isSchemaChangeApplying;
-    /** Actual exception if failed to apply schema change. */
-    private volatile Throwable schemaChangeException;
     /** Executor service to execute schema change. */
     private final ExecutorService schemaChangeThreadPool;
 
@@ -107,16 +99,81 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
             SchemaDerivation schemaDerivation,
             SchemaChangeBehavior schemaChangeBehavior) {
         this.metadataApplier = metadataApplier;
-        this.activeSinkWriters = new HashSet<>();
-        this.flushedSinkWriters = new HashSet<>();
-        this.pendingSchemaChanges = new LinkedList<>();
-        this.finishedSchemaChanges = new LinkedList<>();
-        this.ignoredSchemaChanges = new LinkedList<>();
         this.schemaManager = schemaManager;
         this.schemaDerivation = schemaDerivation;
-        this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
-        this.isSchemaChangeApplying = false;
         this.schemaChangeBehavior = schemaChangeBehavior;
+
+        this.activeSinkWriters = new HashSet<>();
+        this.flushedSinkWriters = new HashSet<>();
+        this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
+
+        this.currentDerivedSchemaChangeEvents = new ArrayList<>();
+        this.currentFinishedSchemaChanges = new ArrayList<>();
+        this.currentIgnoredSchemaChanges = new ArrayList<>();
+        this.schemaChangeStatus = new AtomicReference<>(RequestStatus.IDLE);
+    }
+
+    /**
+     * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks 
flushing.
+     *
+     * @param request the received SchemaChangeRequest
+     */
+    public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
+            SchemaChangeRequest request) {
+        if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE, 
RequestStatus.WAITING_FOR_FLUSH)) {
+            LOG.info(
+                    "Received schema change event request {} from table {}. 
Start to buffer requests for others.",
+                    request.getSchemaChangeEvent(),
+                    request.getTableId().toString());
+            SchemaChangeEvent event = request.getSchemaChangeEvent();
+
+            // If this schema change event has been requested by another 
subTask, ignore it.
+            if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
+                LOG.info("Event {} has been addressed before, ignoring it.", 
event);
+                clearCurrentSchemaChangeRequest();
+                Preconditions.checkState(
+                        schemaChangeStatus.compareAndSet(
+                                RequestStatus.WAITING_FOR_FLUSH, 
RequestStatus.IDLE),
+                        "Illegal schemaChangeStatus state: should still in 
WAITING_FOR_FLUSH state if event was duplicated.");
+                return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate()));
+            }
+            schemaManager.applyOriginalSchemaChange(event);
+            List<SchemaChangeEvent> derivedSchemaChangeEvents =
+                    
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
+
+            // If this schema change event is filtered out by LENIENT mode or 
merging table route
+            // strategies, ignore it.
+            if (derivedSchemaChangeEvents.isEmpty()) {
+                LOG.info("Event {} is omitted from sending to downstream, 
ignoring it.", event);
+                clearCurrentSchemaChangeRequest();
+                Preconditions.checkState(
+                        schemaChangeStatus.compareAndSet(
+                                RequestStatus.WAITING_FOR_FLUSH, 
RequestStatus.IDLE),
+                        "Illegal schemaChangeStatus state: should still in 
WAITING_FOR_FLUSH state if event was ignored.");
+                return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
+            }
+
+            // Backfill pre-schema info for sink applying
+            derivedSchemaChangeEvents.forEach(
+                    e -> {
+                        if (e instanceof SchemaChangeEventWithPreSchema) {
+                            SchemaChangeEventWithPreSchema pe = 
(SchemaChangeEventWithPreSchema) e;
+                            if (!pe.hasPreSchema()) {
+                                schemaManager
+                                        .getLatestEvolvedSchema(pe.tableId())
+                                        .ifPresent(pe::fillPreSchema);
+                            }
+                        }
+                    });
+            currentDerivedSchemaChangeEvents = new 
ArrayList<>(derivedSchemaChangeEvents);
+            return CompletableFuture.completedFuture(
+                    
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
+        } else {
+            LOG.info(
+                    "Schema Registry is busy processing a schema change 
request, could not handle request {} for now.",
+                    request);
+            return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
+        }
     }
 
     /**
@@ -127,27 +184,22 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
      */
     private void applySchemaChange(
             TableId tableId, List<SchemaChangeEvent> 
derivedSchemaChangeEvents) {
-        isSchemaChangeApplying = true;
-        schemaChangeException = null;
-        finishedSchemaChanges.clear();
-        ignoredSchemaChanges.clear();
-
         for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {
             if (changeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) {
                 if (schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
-                    ignoredSchemaChanges.add(changeEvent);
+                    currentIgnoredSchemaChanges.add(changeEvent);
                     continue;
                 }
             }
             if 
(!metadataApplier.acceptsSchemaEvolutionType(changeEvent.getType())) {
                 LOG.info("Ignored schema change {} to table {}.", changeEvent, 
tableId);
-                ignoredSchemaChanges.add(changeEvent);
+                currentIgnoredSchemaChanges.add(changeEvent);
             } else {
                 try {
                     metadataApplier.applySchemaChange(changeEvent);
-                    LOG.debug("Applied schema change {} to table {}.", 
changeEvent, tableId);
+                    LOG.info("Applied schema change {} to table {}.", 
changeEvent, tableId);
                     schemaManager.applyEvolvedSchemaChange(changeEvent);
-                    finishedSchemaChanges.add(changeEvent);
+                    currentFinishedSchemaChanges.add(changeEvent);
                 } catch (Throwable t) {
                     LOG.error(
                             "Failed to apply schema change {} to table {}. 
Caused by: {}",
@@ -155,7 +207,7 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
                             tableId,
                             t);
                     if (!shouldIgnoreException(t)) {
-                        schemaChangeException = t;
+                        currentChangeException = t;
                         break;
                     } else {
                         LOG.warn(
@@ -166,74 +218,9 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
                 }
             }
         }
-
-        PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
-        if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {
-            startNextSchemaChangeRequest();
-        }
-        isSchemaChangeApplying = false;
-    }
-
-    /**
-     * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks 
flushing.
-     *
-     * @param request the received SchemaChangeRequest
-     */
-    public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
-            SchemaChangeRequest request) {
-        if (pendingSchemaChanges.isEmpty()) {
-            LOG.info(
-                    "Received schema change event request from table {}. Start 
to buffer requests for others.",
-                    request.getTableId().toString());
-            SchemaChangeEvent event = request.getSchemaChangeEvent();
-            if (event instanceof CreateTableEvent
-                    && 
schemaManager.originalSchemaExists(request.getTableId())) {
-                return CompletableFuture.completedFuture(
-                        wrap(new 
SchemaChangeResponse(Collections.emptyList())));
-            }
-            schemaManager.applyOriginalSchemaChange(event);
-            List<SchemaChangeEvent> derivedSchemaChangeEvents =
-                    
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
-            derivedSchemaChangeEvents.forEach(
-                    e -> {
-                        if (e instanceof SchemaChangeEventWithPreSchema) {
-                            SchemaChangeEventWithPreSchema pe = 
(SchemaChangeEventWithPreSchema) e;
-                            if (!pe.hasPreSchema()) {
-                                schemaManager
-                                        .getLatestEvolvedSchema(pe.tableId())
-                                        .ifPresent(pe::fillPreSchema);
-                            }
-                        }
-                    });
-            CompletableFuture<CoordinationResponse> response =
-                    CompletableFuture.completedFuture(
-                            wrap(new 
SchemaChangeResponse(derivedSchemaChangeEvents)));
-            if (!derivedSchemaChangeEvents.isEmpty()) {
-                PendingSchemaChange pendingSchemaChange =
-                        new PendingSchemaChange(request, response);
-                pendingSchemaChange.derivedSchemaChangeEvents = 
derivedSchemaChangeEvents;
-                pendingSchemaChanges.add(pendingSchemaChange);
-                pendingSchemaChanges.get(0).startToWaitForReleaseRequest();
-            }
-            return response;
-        } else {
-            LOG.info("There are already processing requests. Wait for 
processing.");
-            CompletableFuture<CoordinationResponse> response = new 
CompletableFuture<>();
-            pendingSchemaChanges.add(new PendingSchemaChange(request, 
response));
-            return response;
-        }
-    }
-
-    /** Handle the {@link ReleaseUpstreamRequest} and wait for all sink 
subtasks flushing. */
-    public CompletableFuture<CoordinationResponse> 
handleReleaseUpstreamRequest() {
-        CompletableFuture<CoordinationResponse> response =
-                pendingSchemaChanges.get(0).getResponseFuture();
-        if (response.isDone() && !isSchemaChangeApplying) {
-            startNextSchemaChangeRequest();
-        } else {
-            pendingSchemaChanges.get(0).receiveReleaseRequest();
-        }
-        return response;
+        Preconditions.checkState(
+                schemaChangeStatus.compareAndSet(RequestStatus.APPLYING, 
RequestStatus.FINISHED),
+                "Illegal schemaChangeStatus state: should be APPLYING before 
applySchemaChange finishes");
     }
 
     /**
@@ -252,76 +239,33 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
      * @param tableId the subtask in SchemaOperator and table that the 
FlushEvent is about
      * @param sinkSubtask the sink subtask succeed flushing
      */
-    public void flushSuccess(TableId tableId, int sinkSubtask) throws 
InterruptedException {
+    public void flushSuccess(TableId tableId, int sinkSubtask) {
         flushedSinkWriters.add(sinkSubtask);
         if (flushedSinkWriters.equals(activeSinkWriters)) {
+            Preconditions.checkState(
+                    schemaChangeStatus.compareAndSet(
+                            RequestStatus.WAITING_FOR_FLUSH, 
RequestStatus.APPLYING),
+                    "Illegal schemaChangeStatus state: should be 
WAITING_FOR_FLUSH before collecting enough FlushEvents");
             LOG.info(
                     "All sink subtask have flushed for table {}. Start to 
apply schema change.",
                     tableId.toString());
-            PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
             schemaChangeThreadPool.submit(
-                    () -> applySchemaChange(tableId, 
waitFlushSuccess.derivedSchemaChangeEvents));
-            Thread.sleep(1000);
-
-            if (schemaChangeException != null) {
-                throw new RuntimeException("Failed to apply schema change.", 
schemaChangeException);
-            }
-
-            if (isSchemaChangeApplying) {
-                waitFlushSuccess
-                        .getResponseFuture()
-                        .complete(wrap(new SchemaChangeProcessingResponse()));
-            } else {
-                waitFlushSuccess
-                        .getResponseFuture()
-                        .complete(wrap(new 
ReleaseUpstreamResponse(finishedSchemaChanges)));
-            }
+                    () -> applySchemaChange(tableId, 
currentDerivedSchemaChangeEvents));
         }
     }
 
-    private void startNextSchemaChangeRequest() {
-        pendingSchemaChanges.remove(0);
-        flushedSinkWriters.clear();
-        while (!pendingSchemaChanges.isEmpty()) {
-            PendingSchemaChange pendingSchemaChange = 
pendingSchemaChanges.get(0);
-            SchemaChangeRequest request = pendingSchemaChange.changeRequest;
-            if (request.getSchemaChangeEvent() instanceof CreateTableEvent
-                    && 
schemaManager.evolvedSchemaExists(request.getTableId())) {
-                pendingSchemaChange
-                        .getResponseFuture()
-                        .complete(wrap(new 
SchemaChangeResponse(Collections.emptyList())));
-                pendingSchemaChanges.remove(0);
-            } else {
-                List<SchemaChangeEvent> derivedSchemaChangeEvents =
-                        
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
-                pendingSchemaChange
-                        .getResponseFuture()
-                        .complete(wrap(new 
SchemaChangeResponse(derivedSchemaChangeEvents)));
-                if (!derivedSchemaChangeEvents.isEmpty()) {
-                    pendingSchemaChange.derivedSchemaChangeEvents = 
derivedSchemaChangeEvents;
-                    pendingSchemaChange.startToWaitForReleaseRequest();
-                    break;
-                }
-            }
-        }
-    }
-
-    public CompletableFuture<CoordinationResponse> refreshPendingLists() {
-        pendingSchemaChanges.clear();
-        flushedSinkWriters.clear();
-        return CompletableFuture.completedFuture(wrap(new 
RefreshPendingListsResponse()));
-    }
-
     public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
-        if (schemaChangeException != null) {
-            throw new RuntimeException("Failed to apply schema change.", 
schemaChangeException);
-        }
-
-        if (isSchemaChangeApplying) {
-            return CompletableFuture.supplyAsync(() -> wrap(new 
SchemaChangeProcessingResponse()));
-        } else {
+        Preconditions.checkState(
+                !schemaChangeStatus.get().equals(RequestStatus.IDLE),
+                "Illegal schemaChangeStatus: should not be IDLE before getting 
schema change request results.");
+        if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED, 
RequestStatus.IDLE)) {
+            // This request has been finished, return it and prepare for the 
next request
+            List<SchemaChangeEvent> finishedEvents = 
clearCurrentSchemaChangeRequest();
             return CompletableFuture.supplyAsync(
-                    () -> wrap(new 
ReleaseUpstreamResponse(finishedSchemaChanges)));
+                    () -> wrap(new 
SchemaChangeResultResponse(finishedEvents)));
+        } else {
+            // Still working on schema change request, waiting it
+            return CompletableFuture.supplyAsync(() -> wrap(new 
SchemaChangeProcessingResponse()));
         }
     }
 
@@ -445,57 +389,54 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
     }
 
     private boolean shouldIgnoreException(Throwable throwable) {
-
         // In IGNORE mode, will never try to apply schema change events
-        // In EVOLVE and and LENIENT mode, such failure will not be tolerated
+        // In EVOLVE and LENIENT mode, such failure will not be tolerated
         // In EXCEPTION mode, an exception will be thrown once captured
         return (throwable instanceof UnsupportedSchemaChangeEventException)
                 && (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE);
     }
 
-    private static class PendingSchemaChange {
-        private final SchemaChangeRequest changeRequest;
-        private List<SchemaChangeEvent> derivedSchemaChangeEvents;
-        private CompletableFuture<CoordinationResponse> responseFuture;
-        private RequestStatus status;
-
-        public PendingSchemaChange(
-                SchemaChangeRequest changeRequest,
-                CompletableFuture<CoordinationResponse> responseFuture) {
-            this.changeRequest = changeRequest;
-            this.responseFuture = responseFuture;
-            this.status = RequestStatus.PENDING;
-        }
-
-        public SchemaChangeRequest getChangeRequest() {
-            return changeRequest;
-        }
-
-        public CompletableFuture<CoordinationResponse> getResponseFuture() {
-            return responseFuture;
-        }
-
-        public RequestStatus getStatus() {
-            return status;
-        }
-
-        public void startToWaitForReleaseRequest() {
-            if (!responseFuture.isDone()) {
-                throw new IllegalStateException(
-                        "Cannot start to wait for flush success before the 
SchemaChangeRequest is done.");
-            }
-            this.responseFuture = new CompletableFuture<>();
-            this.status = RequestStatus.WAIT_RELEASE_REQUEST;
-        }
-
-        public void receiveReleaseRequest() {
-            this.status = RECEIVED_RELEASE_REQUEST;
+    private List<SchemaChangeEvent> clearCurrentSchemaChangeRequest() {
+        if (currentChangeException != null) {
+            throw new RuntimeException("Failed to apply schema change.", 
currentChangeException);
         }
+        List<SchemaChangeEvent> finishedSchemaChanges =
+                new ArrayList<>(currentFinishedSchemaChanges);
+        flushedSinkWriters.clear();
+        currentDerivedSchemaChangeEvents.clear();
+        currentFinishedSchemaChanges.clear();
+        currentIgnoredSchemaChanges.clear();
+        currentChangeException = null;
+        return finishedSchemaChanges;
     }
 
-    enum RequestStatus {
-        PENDING,
-        WAIT_RELEASE_REQUEST,
-        RECEIVED_RELEASE_REQUEST
+    // Schema change event state could transfer in the following way:
+    //
+    //      -------- B --------
+    //      |                 |
+    //      v                 |
+    //  --------           ---------------------
+    //  | IDLE | --- A --> | WAITING_FOR_FLUSH |
+    //  --------           ---------------------
+    //     ^                        |
+    //      E                       C
+    //       \                      v
+    //  ------------          ------------
+    //  | FINISHED | <-- D -- | APPLYING |
+    //  ------------          ------------
+    //
+    //  A: When a request came to an idling request handler.
+    //  B: When current request is duplicate or ignored by LENIENT / routed 
table merging
+    // strategies.
+    //  C: When schema registry collected enough flush success events, and 
actually started to apply
+    // schema changes.
+    //  D: When schema change application finishes (successfully or with 
exceptions)
+    //  E: When current schema change request result has been retrieved by 
SchemaOperator, and ready
+    // for the next request.
+    private enum RequestStatus {
+        IDLE,
+        WAITING_FOR_FLUSH,
+        APPLYING,
+        FINISHED
     }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsRequest.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsRequest.java
deleted file mode 100644
index a0496c935..000000000
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsRequest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.schema.event;
-
-import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler;
-import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
-
-/** Request to refresh the pendingSchemaChanges of {@link 
SchemaRegistryRequestHandler}. */
-public class RefreshPendingListsRequest implements CoordinationRequest {
-
-    private static final long serialVersionUID = 1L;
-}
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsResponse.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsResponse.java
deleted file mode 100644
index ff0221deb..000000000
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsResponse.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.schema.event;
-
-import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
-
-/** Response to refresh the pendingSchemaChanges of {@link 
RefreshPendingListsRequest}. */
-public class RefreshPendingListsResponse implements CoordinationResponse {
-
-    private static final long serialVersionUID = 1L;
-}
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamRequest.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamRequest.java
deleted file mode 100644
index 3da85c184..000000000
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamRequest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.schema.event;
-
-import org.apache.flink.cdc.common.event.FlushEvent;
-import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
-import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
-import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
-
-/**
- * The request from {@link SchemaOperator} to {@link SchemaRegistry} to 
request to release upstream
- * after sending {@link FlushEvent}.
- */
-public class ReleaseUpstreamRequest implements CoordinationRequest {
-
-    private static final long serialVersionUID = 1L;
-}
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java
index b679ab9e5..bf8d84f35 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java
@@ -23,8 +23,8 @@ import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 
 /**
- * The response for {@link SchemaChangeResultRequest} or {@link 
ReleaseUpstreamRequest} from {@link
- * SchemaRegistry} to {@link SchemaOperator} if not apply {@link 
SchemaChangeEvent} in time.
+ * The response for {@link SchemaChangeResultRequest} from {@link 
SchemaRegistry} to {@link
+ * SchemaOperator} if not apply {@link SchemaChangeEvent} in time.
  */
 public class SchemaChangeProcessingResponse implements CoordinationResponse {
 
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java
index 142de431e..63d57139b 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
 import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
@@ -38,8 +39,44 @@ public class SchemaChangeResponse implements 
CoordinationResponse {
      */
     private final List<SchemaChangeEvent> schemaChangeEvents;
 
-    public SchemaChangeResponse(List<SchemaChangeEvent> schemaChangeEvents) {
+    private final ResponseCode responseCode;
+
+    public static SchemaChangeResponse accepted(List<SchemaChangeEvent> 
schemaChangeEvents) {
+        return new SchemaChangeResponse(schemaChangeEvents, 
ResponseCode.ACCEPTED);
+    }
+
+    public static SchemaChangeResponse busy() {
+        return new SchemaChangeResponse(Collections.emptyList(), 
ResponseCode.BUSY);
+    }
+
+    public static SchemaChangeResponse duplicate() {
+        return new SchemaChangeResponse(Collections.emptyList(), 
ResponseCode.DUPLICATE);
+    }
+
+    public static SchemaChangeResponse ignored() {
+        return new SchemaChangeResponse(Collections.emptyList(), 
ResponseCode.IGNORED);
+    }
+
+    private SchemaChangeResponse(
+            List<SchemaChangeEvent> schemaChangeEvents, ResponseCode 
responseCode) {
         this.schemaChangeEvents = schemaChangeEvents;
+        this.responseCode = responseCode;
+    }
+
+    public boolean isAccepted() {
+        return ResponseCode.ACCEPTED.equals(responseCode);
+    }
+
+    public boolean isRegistryBusy() {
+        return ResponseCode.BUSY.equals(responseCode);
+    }
+
+    public boolean isDuplicate() {
+        return ResponseCode.DUPLICATE.equals(responseCode);
+    }
+
+    public boolean isIgnored() {
+        return ResponseCode.IGNORED.equals(responseCode);
     }
 
     public List<SchemaChangeEvent> getSchemaChangeEvents() {
@@ -55,11 +92,43 @@ public class SchemaChangeResponse implements 
CoordinationResponse {
             return false;
         }
         SchemaChangeResponse response = (SchemaChangeResponse) o;
-        return schemaChangeEvents.equals(response.schemaChangeEvents);
+        return Objects.equals(schemaChangeEvents, response.schemaChangeEvents)
+                && responseCode == response.responseCode;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(schemaChangeEvents);
+        return Objects.hash(schemaChangeEvents, responseCode);
+    }
+
+    @Override
+    public String toString() {
+        return "SchemaChangeResponse{"
+                + "schemaChangeEvents="
+                + schemaChangeEvents
+                + ", responseCode="
+                + responseCode
+                + '}';
+    }
+
+    /**
+     * Schema Change Response status code.
+     *
+     * <p>- Accepted: Requested schema change request has been accepted 
exclusively. Any other
+     * schema change requests will be blocked.
+     *
+     * <p>- Busy: Schema registry is currently busy processing another schema 
change request.
+     *
+     * <p>- Duplicate: This schema change request has been submitted before, 
possibly by another
+     * paralleled subTask.
+     *
+     * <p>- Ignored: This schema change request has been assessed, but no 
actual evolution is
+     * required. Possibly caused by LENIENT mode or merging table strategies.
+     */
+    public enum ResponseCode {
+        ACCEPTED,
+        BUSY,
+        DUPLICATE,
+        IGNORED
     }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultResponse.java
similarity index 87%
rename from 
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java
rename to 
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultResponse.java
index bea880962..7039ef086 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultResponse.java
@@ -26,10 +26,10 @@ import java.util.List;
 import java.util.Objects;
 
 /**
- * The response for {@link ReleaseUpstreamRequest} from {@link SchemaRegistry} 
to {@link
+ * The response for {@link SchemaChangeResultRequest} from {@link 
SchemaRegistry} to {@link
  * SchemaOperator}.
  */
-public class ReleaseUpstreamResponse implements CoordinationResponse {
+public class SchemaChangeResultResponse implements CoordinationResponse {
 
     private static final long serialVersionUID = 1L;
 
@@ -39,7 +39,7 @@ public class ReleaseUpstreamResponse implements 
CoordinationResponse {
      */
     private final List<SchemaChangeEvent> finishedSchemaChangeEvents;
 
-    public ReleaseUpstreamResponse(List<SchemaChangeEvent> 
finishedSchemaChangeEvents) {
+    public SchemaChangeResultResponse(List<SchemaChangeEvent> 
finishedSchemaChangeEvents) {
         this.finishedSchemaChangeEvents = finishedSchemaChangeEvents;
     }
 
@@ -63,7 +63,7 @@ public class ReleaseUpstreamResponse implements 
CoordinationResponse {
         if (object == null || getClass() != object.getClass()) {
             return false;
         }
-        ReleaseUpstreamResponse that = (ReleaseUpstreamResponse) object;
+        SchemaChangeResultResponse that = (SchemaChangeResultResponse) object;
         return Objects.equals(finishedSchemaChangeEvents, 
that.finishedSchemaChangeEvents);
     }
 
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
index 354304599..60952b424 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
@@ -163,7 +163,6 @@ public class EventOperatorTestHarness<OP extends 
AbstractStreamOperator<E>, E ex
     }
 
     public void registerTableSchema(TableId tableId, Schema schema) {
-        schemaRegistry.handleApplyOriginalSchemaChangeEvent(new 
CreateTableEvent(tableId, schema));
         schemaRegistry.handleCoordinationRequest(
                 new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, 
schema)));
         schemaRegistry.handleApplyEvolvedSchemaChangeRequest(new 
CreateTableEvent(tableId, schema));

Reply via email to