This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit 803d438657a1051e1944046f703c5f57ba030b04
Author: yuxiqian <[email protected]>
AuthorDate: Thu Aug 22 20:52:06 2024 +0800

    [hotfix][cdc-runtime] Fix schema registry hanging in multiple parallelism
---
 .../flink/FlinkPipelineTransformITCase.java        |  10 +-
 .../flink/cdc/pipeline/tests/MysqlE2eITCase.java   |  17 +--
 .../tests/SchemaEvolvingTransformE2eITCase.java    |  24 +---
 .../cdc/pipeline/tests/TransformE2eITCase.java     |   4 +-
 .../schema/coordinator/SchemaManager.java          | 152 ++++++++++-----------
 .../schema/coordinator/SchemaRegistry.java         |  14 +-
 .../coordinator/SchemaRegistryRequestHandler.java  |  60 ++++----
 7 files changed, 130 insertions(+), 151 deletions(-)

diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index c489b2e81..322a3358e 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -301,7 +301,7 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",
 
                         // Alter column type stage
-                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, 
typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, 
oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
+                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, 
nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, 
meta=()}",
@@ -387,7 +387,7 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",
 
                         // Alter column type stage
-                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, 
typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, 
oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
+                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, 
nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, 
meta=()}",
@@ -473,7 +473,7 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5, 
Eve, 5 -> Eve], after=[5, Eva, 5 -> Eva], op=UPDATE, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[6, Fiona, 6 -> Fiona], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6, 
Fiona, 6 -> Fiona], after=[], op=DELETE, meta=()}",
-                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, 
typeMapping={name=VARCHAR(17)}, oldTypeMapping={name=STRING}}",
+                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, 
nameMapping={name=VARCHAR(17)}}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[7, Gem, 7 -> Gem], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[8, Helen, 8 -> Helen], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8, 
Helen, 8 -> Helen], after=[8, Harry, 8 -> Harry], op=UPDATE, meta=()}",
@@ -559,7 +559,7 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[3rd, 6, Fiona, 26, 3, 6 -> Fiona], after=[], op=DELETE, meta=()}",
 
                         // Alter column type stage
-                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, 
typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, 
oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
+                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, 
nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[4th, 7, Gem, 19.0, -1, 7 -> Gem], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[5th, 8, Helen, 18.0, -2, 8 -> Helen], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[5th, 8, Helen, 18.0, -2, 8 -> Helen], after=[5th, 8, Harry, 18.0, -3, 8 
-> Harry], op=UPDATE, meta=()}",
@@ -651,7 +651,7 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6 
-> Fiona, 3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",
 
                         // Alter column type stage
-                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, 
typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, 
oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
+                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, 
nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[7 -> Gem, 4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[8 -> Helen, 5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8 
-> Helen, 5th, 8, Helen, 18.0, -2], after=[8 -> Harry, 5th, 8, Harry, 18.0, 
-3], op=UPDATE, meta=()}",
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index 1f730be62..c16f6de9c 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -296,22 +296,11 @@ public class MysqlE2eITCase extends 
PipelineTestEnvironment {
             stat.execute("ALTER TABLE products DROP COLUMN new_column;");
             stat.execute(
                     "INSERT INTO products VALUES 
(default,'evangelion','Eva',2.1728, null, null, null);"); // 114
-
-            // Test TruncateTableEvent
-            stat.execute("TRUNCATE TABLE products;");
-
-            // Test DropTableEvent. It's all over.
-            stat.execute("DROP TABLE products;");
         } catch (SQLException e) {
             LOG.error("Update table for CDC failed.", e);
             throw e;
         }
 
-        waitUntilSpecificEvent(
-                String.format(
-                        "DropTableEvent{tableId=%s.products}",
-                        mysqlInventoryDatabase.getDatabaseName()));
-
         validateResult(
                 "DataChangeEvent{tableId=%s.products, before=[106, hammer, 
16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz 
carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}",
                 "DataChangeEvent{tableId=%s.products, before=[107, rocks, box 
of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted 
rocks, 5.1, null, null, null], op=UPDATE, meta=()}",
@@ -321,14 +310,12 @@ public class MysqlE2eITCase extends 
PipelineTestEnvironment {
                 "DataChangeEvent{tableId=%s.products, before=[110, jacket, 
water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, 
jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], 
op=UPDATE, meta=()}",
                 "DataChangeEvent{tableId=%s.products, before=[111, scooter, 
Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 
2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}",
                 "DataChangeEvent{tableId=%s.products, before=[111, scooter, 
Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, 
meta=()}",
-                "AlterColumnTypeEvent{tableId=%s.products, 
typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}}",
+                "AlterColumnTypeEvent{tableId=%s.products, 
nameMapping={new_col=BIGINT}}",
                 "DataChangeEvent{tableId=%s.products, before=[], after=[112, 
derrida, forever 21, 2.1728, null, null, null, 2147483649], op=INSERT, 
meta=()}",
                 "RenameColumnEvent{tableId=%s.products, 
nameMapping={new_col=new_column}}",
                 "DataChangeEvent{tableId=%s.products, before=[], after=[113, 
dynazenon, SSSS, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}",
                 "DropColumnEvent{tableId=%s.products, 
droppedColumnNames=[new_column]}",
-                "DataChangeEvent{tableId=%s.products, before=[], after=[114, 
evangelion, Eva, 2.1728, null, null, null], op=INSERT, meta=()}",
-                "TruncateTableEvent{tableId=%s.products}",
-                "DropTableEvent{tableId=%s.products}");
+                "DataChangeEvent{tableId=%s.products, before=[], after=[114, 
evangelion, Eva, 2.1728, null, null, null], op=INSERT, meta=()}");
     }
 
     private void validateResult(String... expectedEvents) throws Exception {
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
index 1d1f79f0f..a4a21c992 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
@@ -97,13 +97,11 @@ public class SchemaEvolvingTransformE2eITCase extends 
PipelineTestEnvironment {
                 Arrays.asList(
                         "AddColumnEvent{tableId=%s.members, 
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, 
existedColumnName=age}]}",
                         "DataChangeEvent{tableId=%s.members, before=[], 
after=[1012 -> Eve, 1012, Eve, 17, 0, 1024144, age < 20], op=INSERT, meta=()}",
-                        "AlterColumnTypeEvent{tableId=%s.members, 
typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
+                        "AlterColumnTypeEvent{tableId=%s.members, 
nameMapping={age=DOUBLE}}",
                         "RenameColumnEvent{tableId=%s.members, 
nameMapping={gender=biological_sex}}",
                         "DropColumnEvent{tableId=%s.members, 
droppedColumnNames=[biological_sex]}",
                         "DataChangeEvent{tableId=%s.members, before=[], 
after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20], op=INSERT, 
meta=()}",
-                        "TruncateTableEvent{tableId=%s.members}",
-                        "DataChangeEvent{tableId=%s.members, before=[], 
after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20], op=INSERT, meta=()}",
-                        "DropTableEvent{tableId=%s.members}"));
+                        "DataChangeEvent{tableId=%s.members, before=[], 
after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20], op=INSERT, meta=()}"));
     }
 
     @Test
@@ -186,10 +184,9 @@ public class SchemaEvolvingTransformE2eITCase extends 
PipelineTestEnvironment {
                 Arrays.asList(
                         "AddColumnEvent{tableId=%s.members, 
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, 
existedColumnName=null}]}",
                         "DataChangeEvent{tableId=%s.members, before=[], 
after=[1012 -> Eve, 1012, Eve, 17, 1024144, age < 20, 0], op=INSERT, meta=()}",
-                        "AlterColumnTypeEvent{tableId=%s.members, 
typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
+                        "AlterColumnTypeEvent{tableId=%s.members, 
nameMapping={age=DOUBLE}}",
                         "AddColumnEvent{tableId=%s.members, 
addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, 
position=LAST, existedColumnName=null}]}",
                         "DataChangeEvent{tableId=%s.members, before=[], 
after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20, null, null], 
op=INSERT, meta=()}",
-                        "TruncateTableEvent{tableId=%s.members}",
                         "DataChangeEvent{tableId=%s.members, before=[], 
after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20, null, null], op=INSERT, 
meta=()}"));
     }
 
@@ -204,14 +201,12 @@ public class SchemaEvolvingTransformE2eITCase extends 
PipelineTestEnvironment {
                 Arrays.asList(
                         "AddColumnEvent{tableId=%s.members, 
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, 
existedColumnName=age}]}",
                         "DataChangeEvent{tableId=%s.members, before=[], 
after=[1012 -> Eve, 1012, Eve, 17, 0, 1024144, age < 20], op=INSERT, meta=()}",
-                        "AlterColumnTypeEvent{tableId=%s.members, 
typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
+                        "AlterColumnTypeEvent{tableId=%s.members, 
nameMapping={age=DOUBLE}}",
                         "RenameColumnEvent{tableId=%s.members, 
nameMapping={gender=biological_sex}}",
                         "DataChangeEvent{tableId=%s.members, before=[], 
after=[1013 -> Fiona, 1013, Fiona, 16.0, null, 1026169, age < 20], op=INSERT, 
meta=()}",
-                        "TruncateTableEvent{tableId=%s.members}",
                         "DataChangeEvent{tableId=%s.members, before=[], 
after=[1014 -> Gem, 1014, Gem, 17.0, null, 1028196, age < 20], op=INSERT, 
meta=()}"),
-                Arrays.asList(
-                        "Ignored schema change 
DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]} to 
table %s.members.",
-                        "Ignored schema change 
DropTableEvent{tableId=%s.members} to table %s.members."));
+                Collections.singletonList(
+                        "Ignored schema change 
DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]} to 
table %s.members."));
     }
 
     @Test
@@ -352,13 +347,6 @@ public class SchemaEvolvingTransformE2eITCase extends 
PipelineTestEnvironment {
             // triggers DropColumnEvent
             stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
             stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");
-
-            // triggers TruncateTableEvent
-            stmt.execute("TRUNCATE TABLE members;");
-            stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);");
-
-            // triggers DropTableEvent
-            stmt.execute("DROP TABLE members;");
         }
 
         List<String> expectedTmEvents =
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index 9c6130359..c1c8e8109 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -912,7 +912,7 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
         validateEvents(
                 "AddColumnEvent{tableId=%s.TABLEALPHA, 
addedColumns=[ColumnWithPosition{column=`LAST` VARCHAR(17), position=AFTER, 
existedColumnName=NAMEALPHA}]}",
                 "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], 
after=[3008, 8, 8, 80, 17, Jazz, Last, id -> 3008], op=INSERT, meta=()}",
-                "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, 
typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}}",
+                "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, 
nameMapping={CODENAME=DOUBLE}}",
                 "RenameColumnEvent{tableId=%s.TABLEALPHA, 
nameMapping={CODENAME=CODE_NAME}}",
                 "RenameColumnEvent{tableId=%s.TABLEALPHA, 
nameMapping={CODE_NAME=CODE_NAME_EX}}",
                 "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], 
after=[3009, 9, 9.0, 90, 18, Keka, Finale, id -> 3009], op=INSERT, meta=()}",
@@ -1019,7 +1019,7 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                 "AddColumnEvent{tableId=%s.TABLEALPHA, 
addedColumns=[ColumnWithPosition{column=`CODENAME` TINYINT, position=AFTER, 
existedColumnName=VERSION}]}",
                 "AddColumnEvent{tableId=%s.TABLEALPHA, 
addedColumns=[ColumnWithPosition{column=`FIRST` VARCHAR(17), position=BEFORE, 
existedColumnName=ID}]}",
                 "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3008 
<- id, First, 3008, 8, 8, 80, 17, Jazz], op=INSERT, meta=()}",
-                "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, 
typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}}",
+                "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, 
nameMapping={CODENAME=DOUBLE}}",
                 "RenameColumnEvent{tableId=%s.TABLEALPHA, 
nameMapping={CODENAME=CODE_NAME}}",
                 "RenameColumnEvent{tableId=%s.TABLEALPHA, 
nameMapping={CODE_NAME=CODE_NAME_EX}}",
                 "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3009 
<- id, 1st, 3009, 9, 9.0, 90, 18, Keka], op=INSERT, meta=()}",
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
index 60e70b7ed..75cc4eb32 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
@@ -19,10 +19,12 @@ package 
org.apache.flink.cdc.runtime.operators.schema.coordinator;
 
 import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
@@ -106,87 +108,73 @@ public class SchemaManager {
     public final boolean 
isOriginalSchemaChangeEventRedundant(SchemaChangeEvent event) {
         TableId tableId = event.tableId();
         Optional<Schema> latestSchema = getLatestOriginalSchema(tableId);
-        return Boolean.TRUE.equals(
-                SchemaChangeEventVisitor.visit(
-                        event,
-                        addColumnEvent -> {
-                            // It has not been applied if schema does not even 
exist
-                            if (!latestSchema.isPresent()) {
-                                return false;
-                            }
-                            List<Column> existedColumns = 
latestSchema.get().getColumns();
-
-                            // It has been applied only if all columns are 
present in existedColumns
-                            for (AddColumnEvent.ColumnWithPosition column :
-                                    addColumnEvent.getAddedColumns()) {
-                                if 
(!existedColumns.contains(column.getAddColumn())) {
-                                    return false;
-                                }
-                            }
-                            return true;
-                        },
-                        alterColumnTypeEvent -> {
-                            // It has not been applied if schema does not even 
exist
-                            if (!latestSchema.isPresent()) {
-                                return false;
-                            }
-                            Schema schema = latestSchema.get();
-
-                            // It has been applied only if all column types 
are set as expected
-                            for (Map.Entry<String, DataType> entry :
-                                    
alterColumnTypeEvent.getTypeMapping().entrySet()) {
-                                if 
(!schema.getColumn(entry.getKey()).isPresent()
-                                        || !schema.getColumn(entry.getKey())
-                                                .get()
-                                                .getType()
-                                                .equals(entry.getValue())) {
-                                    return false;
-                                }
-                            }
-                            return true;
-                        },
-                        createTableEvent -> {
-                            // It has been applied if such table already exists
-                            return latestSchema.isPresent();
-                        },
-                        dropColumnEvent -> {
-                            // It has not been applied if schema does not even 
exist
-                            if (!latestSchema.isPresent()) {
-                                return false;
-                            }
-                            List<String> existedColumnNames = 
latestSchema.get().getColumnNames();
-
-                            // It has been applied only if corresponding 
column types do not exist
-                            return 
dropColumnEvent.getDroppedColumnNames().stream()
-                                    .noneMatch(existedColumnNames::contains);
-                        },
-                        dropTableEvent -> {
-                            // It has been applied if such table does not exist
-                            return !latestSchema.isPresent();
-                        },
-                        renameColumnEvent -> {
-                            // It has been applied if such table already exists
-                            if (!latestSchema.isPresent()) {
-                                return false;
-                            }
-                            List<String> existedColumnNames = 
latestSchema.get().getColumnNames();
-
-                            // It has been applied only if all previous names 
do not exist, and all
-                            // new names already exist
-                            for (Map.Entry<String, String> entry :
-                                    
renameColumnEvent.getNameMapping().entrySet()) {
-                                if (existedColumnNames.contains(entry.getKey())
-                                        || 
!existedColumnNames.contains(entry.getValue())) {
-                                    return false;
-                                }
-                            }
-                            return true;
-                        },
-                        truncateTableEvent -> {
-                            // We have no way to ensure if a 
TruncateTableEvent has been applied
-                            // before. Just assume it's not.
-                            return false;
-                        }));
+
+        if (event instanceof AddColumnEvent) {
+            AddColumnEvent addColumnEvent = (AddColumnEvent) event;
+            if (!latestSchema.isPresent()) {
+                return false;
+            }
+            List<Column> existedColumns = latestSchema.get().getColumns();
+
+            // It has been applied only if all columns are present in 
existedColumns
+            for (AddColumnEvent.ColumnWithPosition column : 
addColumnEvent.getAddedColumns()) {
+                if (!existedColumns.contains(column.getAddColumn())) {
+                    return false;
+                }
+            }
+            return true;
+        } else if (event instanceof AlterColumnTypeEvent) {
+            AlterColumnTypeEvent alterColumnTypeEvent = (AlterColumnTypeEvent) 
event;
+            // It has not been applied if schema does not even exist
+            if (!latestSchema.isPresent()) {
+                return false;
+            }
+            Schema schema = latestSchema.get();
+
+            // It has been applied only if all column types are set as expected
+            for (Map.Entry<String, DataType> entry :
+                    alterColumnTypeEvent.getTypeMapping().entrySet()) {
+                if (!schema.getColumn(entry.getKey()).isPresent()
+                        || !schema.getColumn(entry.getKey())
+                                .get()
+                                .getType()
+                                .equals(entry.getValue())) {
+                    return false;
+                }
+            }
+            return true;
+        } else if (event instanceof CreateTableEvent) {
+            return latestSchema.isPresent();
+        } else if (event instanceof DropColumnEvent) {
+            DropColumnEvent dropColumnEvent = (DropColumnEvent) event;
+            if (!latestSchema.isPresent()) {
+                return false;
+            }
+            List<String> existedColumnNames = 
latestSchema.get().getColumnNames();
+
+            // It has been applied only if corresponding column types do not 
exist
+            return dropColumnEvent.getDroppedColumnNames().stream()
+                    .noneMatch(existedColumnNames::contains);
+        } else if (event instanceof RenameColumnEvent) {
+            RenameColumnEvent renameColumnEvent = (RenameColumnEvent) event;
+            // It has been applied if such table already exists
+            if (!latestSchema.isPresent()) {
+                return false;
+            }
+            List<String> existedColumnNames = 
latestSchema.get().getColumnNames();
+
+            // It has been applied only if all previous names do not exist, 
and all
+            // new names already exist
+            for (Map.Entry<String, String> entry : 
renameColumnEvent.getNameMapping().entrySet()) {
+                if (existedColumnNames.contains(entry.getKey())
+                        || !existedColumnNames.contains(entry.getValue())) {
+                    return false;
+                }
+            }
+            return true;
+        } else {
+            throw new RuntimeException("Unknown schema change event: " + 
event);
+        }
     }
 
     public final boolean schemaExists(
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
index 8ea3a1f93..617daed92 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
@@ -104,6 +104,12 @@ public class SchemaRegistry implements 
OperatorCoordinator, CoordinationRequestH
 
     private SchemaChangeBehavior schemaChangeBehavior;
 
+    /**
+     * Current parallelism. Use this to verify if Schema Registry has 
collected enough flush success
+     * events from sink operators.
+     */
+    private int currentParallelism;
+
     public SchemaRegistry(
             String operatorName,
             OperatorCoordinator.Context context,
@@ -135,7 +141,9 @@ public class SchemaRegistry implements OperatorCoordinator, 
CoordinationRequestH
     public void start() throws Exception {
         LOG.info("Starting SchemaRegistry for {}.", operatorName);
         this.failedReasons.clear();
-        LOG.info("Started SchemaRegistry for {}.", operatorName);
+        this.currentParallelism = context.currentParallelism();
+        LOG.info(
+                "Started SchemaRegistry for {}. Parallelism: {}", 
operatorName, currentParallelism);
     }
 
     @Override
@@ -155,7 +163,9 @@ public class SchemaRegistry implements OperatorCoordinator, 
CoordinationRequestH
                         flushSuccessEvent.getSubtask(),
                         flushSuccessEvent.getTableId().toString());
                 requestHandler.flushSuccess(
-                        flushSuccessEvent.getTableId(), 
flushSuccessEvent.getSubtask());
+                        flushSuccessEvent.getTableId(),
+                        flushSuccessEvent.getSubtask(),
+                        currentParallelism);
             } else if (event instanceof SinkWriterRegisterEvent) {
                 requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) 
event).getSubtask());
             } else {
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index 99019a6b4..9262a6a90 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -25,7 +25,6 @@ import org.apache.flink.cdc.common.event.DropColumnEvent;
 import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEventType;
-import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema;
 import org.apache.flink.cdc.common.event.TableId;
 import 
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
@@ -48,11 +47,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
@@ -103,8 +102,8 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
         this.schemaDerivation = schemaDerivation;
         this.schemaChangeBehavior = schemaChangeBehavior;
 
-        this.activeSinkWriters = new HashSet<>();
-        this.flushedSinkWriters = new HashSet<>();
+        this.activeSinkWriters = ConcurrentHashMap.newKeySet();
+        this.flushedSinkWriters = ConcurrentHashMap.newKeySet();
         this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
 
         this.currentDerivedSchemaChangeEvents = new ArrayList<>();
@@ -122,7 +121,7 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
             SchemaChangeRequest request) {
         if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE, 
RequestStatus.WAITING_FOR_FLUSH)) {
             LOG.info(
-                    "Received schema change event request {} from table {}. 
Start to buffer requests for others.",
+                    "Received schema change event request {} from table {}. 
SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will 
be blocked.",
                     request.getSchemaChangeEvent(),
                     request.getTableId().toString());
             SchemaChangeEvent event = request.getSchemaChangeEvent();
@@ -134,7 +133,11 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
                 Preconditions.checkState(
                         schemaChangeStatus.compareAndSet(
                                 RequestStatus.WAITING_FOR_FLUSH, 
RequestStatus.IDLE),
-                        "Illegal schemaChangeStatus state: should still in 
WAITING_FOR_FLUSH state if event was duplicated.");
+                        "Illegal schemaChangeStatus state: should still in 
WAITING_FOR_FLUSH state if event was duplicated, not "
+                                + schemaChangeStatus.get());
+                LOG.info(
+                        "SchemaChangeStatus switched from WAITING_FOR_FLUSH to 
IDLE for request {} due to duplicated request.",
+                        request);
                 return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate()));
             }
             schemaManager.applyOriginalSchemaChange(event);
@@ -149,22 +152,13 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
                 Preconditions.checkState(
                         schemaChangeStatus.compareAndSet(
                                 RequestStatus.WAITING_FOR_FLUSH, 
RequestStatus.IDLE),
-                        "Illegal schemaChangeStatus state: should still in 
WAITING_FOR_FLUSH state if event was ignored.");
+                        "Illegal schemaChangeStatus state: should still in 
WAITING_FOR_FLUSH state if event was ignored, not "
+                                + schemaChangeStatus.get());
+                LOG.info(
+                        "SchemaChangeStatus switched from WAITING_FOR_FLUSH to 
IDLE for request {} due to ignored request.",
+                        request);
                 return 
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
             }
-
-            // Backfill pre-schema info for sink applying
-            derivedSchemaChangeEvents.forEach(
-                    e -> {
-                        if (e instanceof SchemaChangeEventWithPreSchema) {
-                            SchemaChangeEventWithPreSchema pe = 
(SchemaChangeEventWithPreSchema) e;
-                            if (!pe.hasPreSchema()) {
-                                schemaManager
-                                        .getLatestEvolvedSchema(pe.tableId())
-                                        .ifPresent(pe::fillPreSchema);
-                            }
-                        }
-                    });
             currentDerivedSchemaChangeEvents = new 
ArrayList<>(derivedSchemaChangeEvents);
             return CompletableFuture.completedFuture(
                     
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
@@ -220,7 +214,11 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
         }
         Preconditions.checkState(
                 schemaChangeStatus.compareAndSet(RequestStatus.APPLYING, 
RequestStatus.FINISHED),
-                "Illegal schemaChangeStatus state: should be APPLYING before 
applySchemaChange finishes");
+                "Illegal schemaChangeStatus state: should be APPLYING before 
applySchemaChange finishes, not "
+                        + schemaChangeStatus.get());
+        LOG.info(
+                "SchemaChangeStatus switched from APPLYING to FINISHED for 
request {}.",
+                currentDerivedSchemaChangeEvents);
     }
 
     /**
@@ -239,13 +237,21 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
      * @param tableId the subtask in SchemaOperator and table that the 
FlushEvent is about
      * @param sinkSubtask the sink subtask succeed flushing
      */
-    public void flushSuccess(TableId tableId, int sinkSubtask) {
+    public void flushSuccess(TableId tableId, int sinkSubtask, int 
parallelism) {
         flushedSinkWriters.add(sinkSubtask);
+        if (activeSinkWriters.size() < parallelism) {
+            LOG.info(
+                    "Not all active sink writers have been registered. Current 
{}, expected {}.",
+                    activeSinkWriters.size(),
+                    parallelism);
+            return;
+        }
         if (flushedSinkWriters.equals(activeSinkWriters)) {
             Preconditions.checkState(
                     schemaChangeStatus.compareAndSet(
                             RequestStatus.WAITING_FOR_FLUSH, 
RequestStatus.APPLYING),
-                    "Illegal schemaChangeStatus state: should be 
WAITING_FOR_FLUSH before collecting enough FlushEvents");
+                    "Illegal schemaChangeStatus state: should be 
WAITING_FOR_FLUSH before collecting enough FlushEvents, not "
+                            + schemaChangeStatus);
             LOG.info(
                     "All sink subtask have flushed for table {}. Start to 
apply schema change.",
                     tableId.toString());
@@ -259,6 +265,10 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
                 !schemaChangeStatus.get().equals(RequestStatus.IDLE),
                 "Illegal schemaChangeStatus: should not be IDLE before getting 
schema change request results.");
         if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED, 
RequestStatus.IDLE)) {
+            LOG.info(
+                    "SchemaChangeStatus switched from FINISHED to IDLE for 
request {}",
+                    currentDerivedSchemaChangeEvents);
+
             // This request has been finished, return it and prepare for the 
next request
             List<SchemaChangeEvent> finishedEvents = 
clearCurrentSchemaChangeRequest();
             return CompletableFuture.supplyAsync(
@@ -379,10 +389,6 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
                     }
                     return events;
                 }
-            case DROP_TABLE:
-                // We don't drop any tables in Lenient mode.
-                LOG.info("A drop table event {} has been ignored in Lenient 
mode.", event);
-                return Collections.emptyList();
             default:
                 return Collections.singletonList(event);
         }


Reply via email to