This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 803d438657a1051e1944046f703c5f57ba030b04 Author: yuxiqian <[email protected]> AuthorDate: Thu Aug 22 20:52:06 2024 +0800 [hotfix][cdc-runtime] Fix schema registry hanging in multiple parallelism --- .../flink/FlinkPipelineTransformITCase.java | 10 +- .../flink/cdc/pipeline/tests/MysqlE2eITCase.java | 17 +-- .../tests/SchemaEvolvingTransformE2eITCase.java | 24 +--- .../cdc/pipeline/tests/TransformE2eITCase.java | 4 +- .../schema/coordinator/SchemaManager.java | 152 ++++++++++----------- .../schema/coordinator/SchemaRegistry.java | 14 +- .../coordinator/SchemaRegistryRequestHandler.java | 60 ++++---- 7 files changed, 130 insertions(+), 151 deletions(-) diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index c489b2e81..322a3358e 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -301,7 +301,7 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", @@ -387,7 +387,7 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", @@ -473,7 +473,7 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5, Eve, 5 -> Eve], after=[5, Eva, 5 -> Eva], op=UPDATE, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6, Fiona, 6 -> Fiona], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6, Fiona, 6 -> Fiona], after=[], op=DELETE, meta=()}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={name=VARCHAR(17)}, oldTypeMapping={name=STRING}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={name=VARCHAR(17)}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7, Gem, 7 -> Gem], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8, Helen, 8 -> Helen], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8, Helen, 8 -> Helen], after=[8, Harry, 8 -> Harry], op=UPDATE, meta=()}", @@ -559,7 +559,7 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3, 6 -> Fiona], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1, 7 -> Gem], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2, 8 -> Helen], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2, 8 -> Helen], after=[5th, 8, Harry, 18.0, -3, 8 -> Harry], op=UPDATE, meta=()}", @@ -651,7 +651,7 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6 -> Fiona, 3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7 -> Gem, 4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8 -> Helen, 5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8 -> Helen, 5th, 8, Helen, 18.0, -2], after=[8 -> Harry, 5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java index 1f730be62..c16f6de9c 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java @@ -296,22 +296,11 @@ public class MysqlE2eITCase extends PipelineTestEnvironment { stat.execute("ALTER TABLE products DROP COLUMN new_column;"); stat.execute( "INSERT INTO products VALUES (default,'evangelion','Eva',2.1728, null, null, null);"); // 114 - - // Test TruncateTableEvent - stat.execute("TRUNCATE TABLE products;"); - - // Test DropTableEvent. It's all over. - stat.execute("DROP TABLE products;"); } catch (SQLException e) { LOG.error("Update table for CDC failed.", e); throw e; } - waitUntilSpecificEvent( - String.format( - "DropTableEvent{tableId=%s.products}", - mysqlInventoryDatabase.getDatabaseName())); - validateResult( "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}", "DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}", @@ -321,14 +310,12 @@ public class MysqlE2eITCase extends PipelineTestEnvironment { "DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}", "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}", "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}", - "AlterColumnTypeEvent{tableId=%s.products, typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}}", + "AlterColumnTypeEvent{tableId=%s.products, nameMapping={new_col=BIGINT}}", "DataChangeEvent{tableId=%s.products, before=[], after=[112, derrida, forever 21, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}", "RenameColumnEvent{tableId=%s.products, nameMapping={new_col=new_column}}", "DataChangeEvent{tableId=%s.products, before=[], after=[113, dynazenon, SSSS, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}", "DropColumnEvent{tableId=%s.products, droppedColumnNames=[new_column]}", - "DataChangeEvent{tableId=%s.products, before=[], after=[114, evangelion, Eva, 2.1728, null, null, null], op=INSERT, meta=()}", - "TruncateTableEvent{tableId=%s.products}", - "DropTableEvent{tableId=%s.products}"); + "DataChangeEvent{tableId=%s.products, before=[], after=[114, evangelion, Eva, 2.1728, null, null, null], op=INSERT, meta=()}"); } private void validateResult(String... expectedEvents) throws Exception { diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java index 1d1f79f0f..a4a21c992 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java @@ -97,13 +97,11 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 0, 1024144, age < 20], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}", "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", "DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20], op=INSERT, meta=()}", - "TruncateTableEvent{tableId=%s.members}", - "DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20], op=INSERT, meta=()}", - "DropTableEvent{tableId=%s.members}")); + "DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20], op=INSERT, meta=()}")); } @Test @@ -186,10 +184,9 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 1024144, age < 20, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}", "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20, null, null], op=INSERT, meta=()}", - "TruncateTableEvent{tableId=%s.members}", "DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20, null, null], op=INSERT, meta=()}")); } @@ -204,14 +201,12 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 0, 1024144, age < 20], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}", "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, null, 1026169, age < 20], op=INSERT, meta=()}", - "TruncateTableEvent{tableId=%s.members}", "DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, null, 1028196, age < 20], op=INSERT, meta=()}"), - Arrays.asList( - "Ignored schema change DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]} to table %s.members.", - "Ignored schema change DropTableEvent{tableId=%s.members} to table %s.members.")); + Collections.singletonList( + "Ignored schema change DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]} to table %s.members.")); } @Test @@ -352,13 +347,6 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment { // triggers DropColumnEvent stmt.execute("ALTER TABLE members DROP COLUMN biological_sex"); stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);"); - - // triggers TruncateTableEvent - stmt.execute("TRUNCATE TABLE members;"); - stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);"); - - // triggers DropTableEvent - stmt.execute("DROP TABLE members;"); } List<String> expectedTmEvents = diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index 9c6130359..c1c8e8109 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -912,7 +912,7 @@ public class TransformE2eITCase extends PipelineTestEnvironment { validateEvents( "AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`LAST` VARCHAR(17), position=AFTER, existedColumnName=NAMEALPHA}]}", "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3008, 8, 8, 80, 17, Jazz, Last, id -> 3008], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}}", + "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, nameMapping={CODENAME=DOUBLE}}", "RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODENAME=CODE_NAME}}", "RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODE_NAME=CODE_NAME_EX}}", "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3009, 9, 9.0, 90, 18, Keka, Finale, id -> 3009], op=INSERT, meta=()}", @@ -1019,7 +1019,7 @@ public class TransformE2eITCase extends PipelineTestEnvironment { "AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`CODENAME` TINYINT, position=AFTER, existedColumnName=VERSION}]}", "AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`FIRST` VARCHAR(17), position=BEFORE, existedColumnName=ID}]}", "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3008 <- id, First, 3008, 8, 8, 80, 17, Jazz], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}}", + "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, nameMapping={CODENAME=DOUBLE}}", "RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODENAME=CODE_NAME}}", "RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODE_NAME=CODE_NAME_EX}}", "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3009 <- id, 1st, 3009, 9, 9.0, 90, 18, Keka], op=INSERT, meta=()}", diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java index 60e70b7ed..75cc4eb32 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java @@ -19,10 +19,12 @@ package org.apache.flink.cdc.runtime.operators.schema.coordinator; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; @@ -106,87 +108,73 @@ public class SchemaManager { public final boolean isOriginalSchemaChangeEventRedundant(SchemaChangeEvent event) { TableId tableId = event.tableId(); Optional<Schema> latestSchema = getLatestOriginalSchema(tableId); - return Boolean.TRUE.equals( - SchemaChangeEventVisitor.visit( - event, - addColumnEvent -> { - // It has not been applied if schema does not even exist - if (!latestSchema.isPresent()) { - return false; - } - List<Column> existedColumns = latestSchema.get().getColumns(); - - // It has been applied only if all columns are present in existedColumns - for (AddColumnEvent.ColumnWithPosition column : - addColumnEvent.getAddedColumns()) { - if (!existedColumns.contains(column.getAddColumn())) { - return false; - } - } - return true; - }, - alterColumnTypeEvent -> { - // It has not been applied if schema does not even exist - if (!latestSchema.isPresent()) { - return false; - } - Schema schema = latestSchema.get(); - - // It has been applied only if all column types are set as expected - for (Map.Entry<String, DataType> entry : - alterColumnTypeEvent.getTypeMapping().entrySet()) { - if (!schema.getColumn(entry.getKey()).isPresent() - || !schema.getColumn(entry.getKey()) - .get() - .getType() - .equals(entry.getValue())) { - return false; - } - } - return true; - }, - createTableEvent -> { - // It has been applied if such table already exists - return latestSchema.isPresent(); - }, - dropColumnEvent -> { - // It has not been applied if schema does not even exist - if (!latestSchema.isPresent()) { - return false; - } - List<String> existedColumnNames = latestSchema.get().getColumnNames(); - - // It has been applied only if corresponding column types do not exist - return dropColumnEvent.getDroppedColumnNames().stream() - .noneMatch(existedColumnNames::contains); - }, - dropTableEvent -> { - // It has been applied if such table does not exist - return !latestSchema.isPresent(); - }, - renameColumnEvent -> { - // It has been applied if such table already exists - if (!latestSchema.isPresent()) { - return false; - } - List<String> existedColumnNames = latestSchema.get().getColumnNames(); - - // It has been applied only if all previous names do not exist, and all - // new names already exist - for (Map.Entry<String, String> entry : - renameColumnEvent.getNameMapping().entrySet()) { - if (existedColumnNames.contains(entry.getKey()) - || !existedColumnNames.contains(entry.getValue())) { - return false; - } - } - return true; - }, - truncateTableEvent -> { - // We have no way to ensure if a TruncateTableEvent has been applied - // before. Just assume it's not. - return false; - })); + + if (event instanceof AddColumnEvent) { + AddColumnEvent addColumnEvent = (AddColumnEvent) event; + if (!latestSchema.isPresent()) { + return false; + } + List<Column> existedColumns = latestSchema.get().getColumns(); + + // It has been applied only if all columns are present in existedColumns + for (AddColumnEvent.ColumnWithPosition column : addColumnEvent.getAddedColumns()) { + if (!existedColumns.contains(column.getAddColumn())) { + return false; + } + } + return true; + } else if (event instanceof AlterColumnTypeEvent) { + AlterColumnTypeEvent alterColumnTypeEvent = (AlterColumnTypeEvent) event; + // It has not been applied if schema does not even exist + if (!latestSchema.isPresent()) { + return false; + } + Schema schema = latestSchema.get(); + + // It has been applied only if all column types are set as expected + for (Map.Entry<String, DataType> entry : + alterColumnTypeEvent.getTypeMapping().entrySet()) { + if (!schema.getColumn(entry.getKey()).isPresent() + || !schema.getColumn(entry.getKey()) + .get() + .getType() + .equals(entry.getValue())) { + return false; + } + } + return true; + } else if (event instanceof CreateTableEvent) { + return latestSchema.isPresent(); + } else if (event instanceof DropColumnEvent) { + DropColumnEvent dropColumnEvent = (DropColumnEvent) event; + if (!latestSchema.isPresent()) { + return false; + } + List<String> existedColumnNames = latestSchema.get().getColumnNames(); + + // It has been applied only if corresponding column types do not exist + return dropColumnEvent.getDroppedColumnNames().stream() + .noneMatch(existedColumnNames::contains); + } else if (event instanceof RenameColumnEvent) { + RenameColumnEvent renameColumnEvent = (RenameColumnEvent) event; + // It has been applied if such table already exists + if (!latestSchema.isPresent()) { + return false; + } + List<String> existedColumnNames = latestSchema.get().getColumnNames(); + + // It has been applied only if all previous names do not exist, and all + // new names already exist + for (Map.Entry<String, String> entry : renameColumnEvent.getNameMapping().entrySet()) { + if (existedColumnNames.contains(entry.getKey()) + || !existedColumnNames.contains(entry.getValue())) { + return false; + } + } + return true; + } else { + throw new RuntimeException("Unknown schema change event: " + event); + } } public final boolean schemaExists( diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java index 8ea3a1f93..617daed92 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java @@ -104,6 +104,12 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH private SchemaChangeBehavior schemaChangeBehavior; + /** + * Current parallelism. Use this to verify if Schema Registry has collected enough flush success + * events from sink operators. + */ + private int currentParallelism; + public SchemaRegistry( String operatorName, OperatorCoordinator.Context context, @@ -135,7 +141,9 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH public void start() throws Exception { LOG.info("Starting SchemaRegistry for {}.", operatorName); this.failedReasons.clear(); - LOG.info("Started SchemaRegistry for {}.", operatorName); + this.currentParallelism = context.currentParallelism(); + LOG.info( + "Started SchemaRegistry for {}. Parallelism: {}", operatorName, currentParallelism); } @Override @@ -155,7 +163,9 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH flushSuccessEvent.getSubtask(), flushSuccessEvent.getTableId().toString()); requestHandler.flushSuccess( - flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask()); + flushSuccessEvent.getTableId(), + flushSuccessEvent.getSubtask(), + currentParallelism); } else if (event instanceof SinkWriterRegisterEvent) { requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) event).getSubtask()); } else { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 99019a6b4..9262a6a90 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -25,7 +25,6 @@ import org.apache.flink.cdc.common.event.DropColumnEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; -import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; @@ -48,11 +47,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; @@ -103,8 +102,8 @@ public class SchemaRegistryRequestHandler implements Closeable { this.schemaDerivation = schemaDerivation; this.schemaChangeBehavior = schemaChangeBehavior; - this.activeSinkWriters = new HashSet<>(); - this.flushedSinkWriters = new HashSet<>(); + this.activeSinkWriters = ConcurrentHashMap.newKeySet(); + this.flushedSinkWriters = ConcurrentHashMap.newKeySet(); this.schemaChangeThreadPool = Executors.newSingleThreadExecutor(); this.currentDerivedSchemaChangeEvents = new ArrayList<>(); @@ -122,7 +121,7 @@ public class SchemaRegistryRequestHandler implements Closeable { SchemaChangeRequest request) { if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE, RequestStatus.WAITING_FOR_FLUSH)) { LOG.info( - "Received schema change event request {} from table {}. Start to buffer requests for others.", + "Received schema change event request {} from table {}. SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.", request.getSchemaChangeEvent(), request.getTableId().toString()); SchemaChangeEvent event = request.getSchemaChangeEvent(); @@ -134,7 +133,11 @@ public class SchemaRegistryRequestHandler implements Closeable { Preconditions.checkState( schemaChangeStatus.compareAndSet( RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE), - "Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was duplicated."); + "Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was duplicated, not " + + schemaChangeStatus.get()); + LOG.info( + "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.", + request); return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate())); } schemaManager.applyOriginalSchemaChange(event); @@ -149,22 +152,13 @@ public class SchemaRegistryRequestHandler implements Closeable { Preconditions.checkState( schemaChangeStatus.compareAndSet( RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE), - "Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was ignored."); + "Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was ignored, not " + + schemaChangeStatus.get()); + LOG.info( + "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.", + request); return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored())); } - - // Backfill pre-schema info for sink applying - derivedSchemaChangeEvents.forEach( - e -> { - if (e instanceof SchemaChangeEventWithPreSchema) { - SchemaChangeEventWithPreSchema pe = (SchemaChangeEventWithPreSchema) e; - if (!pe.hasPreSchema()) { - schemaManager - .getLatestEvolvedSchema(pe.tableId()) - .ifPresent(pe::fillPreSchema); - } - } - }); currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents); return CompletableFuture.completedFuture( wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents))); @@ -220,7 +214,11 @@ public class SchemaRegistryRequestHandler implements Closeable { } Preconditions.checkState( schemaChangeStatus.compareAndSet(RequestStatus.APPLYING, RequestStatus.FINISHED), - "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes"); + "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not " + + schemaChangeStatus.get()); + LOG.info( + "SchemaChangeStatus switched from APPLYING to FINISHED for request {}.", + currentDerivedSchemaChangeEvents); } /** @@ -239,13 +237,21 @@ public class SchemaRegistryRequestHandler implements Closeable { * @param tableId the subtask in SchemaOperator and table that the FlushEvent is about * @param sinkSubtask the sink subtask succeed flushing */ - public void flushSuccess(TableId tableId, int sinkSubtask) { + public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) { flushedSinkWriters.add(sinkSubtask); + if (activeSinkWriters.size() < parallelism) { + LOG.info( + "Not all active sink writers have been registered. Current {}, expected {}.", + activeSinkWriters.size(), + parallelism); + return; + } if (flushedSinkWriters.equals(activeSinkWriters)) { Preconditions.checkState( schemaChangeStatus.compareAndSet( RequestStatus.WAITING_FOR_FLUSH, RequestStatus.APPLYING), - "Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents"); + "Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not " + + schemaChangeStatus); LOG.info( "All sink subtask have flushed for table {}. Start to apply schema change.", tableId.toString()); @@ -259,6 +265,10 @@ public class SchemaRegistryRequestHandler implements Closeable { !schemaChangeStatus.get().equals(RequestStatus.IDLE), "Illegal schemaChangeStatus: should not be IDLE before getting schema change request results."); if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED, RequestStatus.IDLE)) { + LOG.info( + "SchemaChangeStatus switched from FINISHED to IDLE for request {}", + currentDerivedSchemaChangeEvents); + // This request has been finished, return it and prepare for the next request List<SchemaChangeEvent> finishedEvents = clearCurrentSchemaChangeRequest(); return CompletableFuture.supplyAsync( @@ -379,10 +389,6 @@ public class SchemaRegistryRequestHandler implements Closeable { } return events; } - case DROP_TABLE: - // We don't drop any tables in Lenient mode. - LOG.info("A drop table event {} has been ignored in Lenient mode.", event); - return Collections.emptyList(); default: return Collections.singletonList(event); }
