This is an automated email from the ASF dual-hosted git repository.

ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 06fc93956 [FLINK-36184][transform] Fix transform operator swallows 
schema changes from tables not present in transform rules (#3589)
06fc93956 is described below

commit 06fc9395693a78318bc3b42b76088e6d3201b79f
Author: yuxiqian <[email protected]>
AuthorDate: Fri Aug 30 11:41:05 2024 +0800

    [FLINK-36184][transform] Fix transform operator swallows schema changes 
from tables not present in transform rules (#3589)
---
 .../cdc/pipeline/tests/SchemaEvolveE2eITCase.java  | 96 ++++++++++++++++++++++
 .../operators/transform/PostTransformOperator.java | 32 +++++---
 .../operators/transform/PreTransformOperator.java  | 29 ++++---
 3 files changed, 137 insertions(+), 20 deletions(-)

diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
index 7551add08..43ff5305b 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
@@ -255,6 +255,102 @@ public class SchemaEvolveE2eITCase extends 
PipelineTestEnvironment {
                 () -> submitPipelineJob(pipelineJob, mysqlCdcJar, 
valuesCdcJar, mysqlDriverJar));
     }
 
+    @Test
+    public void testByDefaultTransform() throws Exception {
+        String dbName = schemaEvolveDatabase.getDatabaseName();
+
+        // We put a dummy transform block that matches nothing
+        // to ensure TransformOperator exists, so we could verify if 
TransformOperator could
+        // correctly handle such "bypass" tables with schema changes.
+        String pipelineJob =
+                String.format(
+                        "source:\n"
+                                + "  type: mysql\n"
+                                + "  hostname: %s\n"
+                                + "  port: 3306\n"
+                                + "  username: %s\n"
+                                + "  password: %s\n"
+                                + "  tables: %s.members\n"
+                                + "  server-id: 5400-5404\n"
+                                + "  server-time-zone: UTC\n"
+                                + "\n"
+                                + "sink:\n"
+                                + "  type: values\n"
+                                + "transform:\n"
+                                + "  - source-table: another.irrelevant\n"
+                                + "    projection: \"'irrelevant' AS tag\"\n"
+                                + "\n"
+                                + "pipeline:\n"
+                                + "  schema.change.behavior: evolve\n"
+                                + "  parallelism: %d",
+                        INTER_CONTAINER_MYSQL_ALIAS,
+                        MYSQL_TEST_USER,
+                        MYSQL_TEST_PASSWORD,
+                        dbName,
+                        parallelism);
+        Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+        Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+        submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, 
mysqlDriverJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        LOG.info("Pipeline job is running");
+        validateSnapshotData(dbName, "members");
+
+        LOG.info("Starting schema evolution");
+        String mysqlJdbcUrl =
+                String.format(
+                        "jdbc:mysql://%s:%s/%s", MYSQL.getHost(), 
MYSQL.getDatabasePort(), dbName);
+
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                mysqlJdbcUrl, MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+                Statement stmt = conn.createStatement()) {
+
+            waitForIncrementalStage(dbName, "members", stmt);
+
+            // triggers AddColumnEvent
+            stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER 
age;");
+            stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);");
+
+            // triggers AlterColumnTypeEvent and RenameColumnEvent
+            stmt.execute("ALTER TABLE members CHANGE COLUMN age precise_age 
DOUBLE;");
+
+            // triggers RenameColumnEvent
+            stmt.execute("ALTER TABLE members RENAME COLUMN gender TO 
biological_sex;");
+
+            // triggers DropColumnEvent
+            stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
+            stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");
+
+            // triggers TruncateTableEvent
+            stmt.execute("TRUNCATE TABLE members;");
+            stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);");
+
+            // triggers DropTableEvent
+            stmt.execute("DROP TABLE members;");
+        }
+
+        List<String> expectedTaskManagerEvents =
+                Arrays.asList(
+                        "AddColumnEvent{tableId=%s.members, 
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, 
existedColumnName=age}]}",
+                        "DataChangeEvent{tableId=%s.members, before=[], 
after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
+                        "AlterColumnTypeEvent{tableId=%s.members, 
typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
+                        "RenameColumnEvent{tableId=%s.members, 
nameMapping={age=precise_age}}",
+                        "RenameColumnEvent{tableId=%s.members, 
nameMapping={gender=biological_sex}}",
+                        "DropColumnEvent{tableId=%s.members, 
droppedColumnNames=[biological_sex]}",
+                        "DataChangeEvent{tableId=%s.members, before=[], 
after=[1013, Fiona, 16.0], op=INSERT, meta=()}",
+                        "TruncateTableEvent{tableId=%s.members}",
+                        "DataChangeEvent{tableId=%s.members, before=[], 
after=[1014, Gem, 17.0], op=INSERT, meta=()}",
+                        "DropTableEvent{tableId=%s.members}");
+
+        List<String> expectedTmEvents =
+                expectedTaskManagerEvents.stream()
+                        .map(s -> String.format(s, dbName, dbName))
+                        .collect(Collectors.toList());
+
+        validateResult(expectedTmEvents, taskManagerConsumer);
+    }
+
     private void testGenericSchemaEvolution(
             String behavior,
             boolean mergeTable,
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index 30c78d202..83dd07c53 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -258,17 +258,29 @@ public class PostTransformOperator extends 
AbstractStreamOperator<Event>
                                                     .stream())
                             .map(ProjectionColumn::getColumnName)
                             .collect(Collectors.toSet());
-            boolean hasAsterisk =
-                    transforms.stream()
-                            .filter(t -> t.getSelectors().isMatch(tableId))
-                            .anyMatch(
-                                    t ->
-                                            TransformParser.hasAsterisk(
-                                                    t.getProjection()
-                                                            
.map(TransformProjection::getProjection)
-                                                            .orElse(null)));
 
-            hasAsteriskMap.put(tableId, hasAsterisk);
+            boolean notTransformed =
+                    transforms.stream().noneMatch(t -> 
t.getSelectors().isMatch(tableId));
+
+            if (notTransformed) {
+                // If this TableId isn't presented in any transform block, it 
should behave like a
+                // "*" projection and should be regarded as asterisk-ful.
+                hasAsteriskMap.put(tableId, true);
+            } else {
+                boolean hasAsterisk =
+                        transforms.stream()
+                                .filter(t -> t.getSelectors().isMatch(tableId))
+                                .anyMatch(
+                                        t ->
+                                                TransformParser.hasAsterisk(
+                                                        t.getProjection()
+                                                                .map(
+                                                                        
TransformProjection
+                                                                               
 ::getProjection)
+                                                                
.orElse(null)));
+
+                hasAsteriskMap.put(tableId, hasAsterisk);
+            }
             projectedColumnsMap.put(
                     tableId,
                     createTableEvent.getSchema().getColumnNames().stream()
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index 5d8798068..3b050f993 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -300,17 +300,26 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
                         .map(Column::getName)
                         .collect(Collectors.toSet());
 
-        boolean hasAsterisk =
-                transforms.stream()
-                        .filter(t -> t.getSelectors().isMatch(tableId))
-                        .anyMatch(
-                                t ->
-                                        TransformParser.hasAsterisk(
-                                                t.getProjection()
-                                                        
.map(TransformProjection::getProjection)
-                                                        .orElse(null)));
+        boolean notTransformed =
+                transforms.stream().noneMatch(t -> 
t.getSelectors().isMatch(tableId));
 
-        hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
+        if (notTransformed) {
+            // If this TableId isn't presented in any transform block, it 
should behave like a "*"
+            // projection and should be regarded as asterisk-ful.
+            hasAsteriskMap.put(tableId, true);
+        } else {
+            boolean hasAsterisk =
+                    transforms.stream()
+                            .filter(t -> t.getSelectors().isMatch(tableId))
+                            .anyMatch(
+                                    t ->
+                                            TransformParser.hasAsterisk(
+                                                    t.getProjection()
+                                                            
.map(TransformProjection::getProjection)
+                                                            .orElse(null)));
+
+            hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
+        }
         referencedColumnsMap.put(
                 createTableEvent.tableId(),
                 createTableEvent.getSchema().getColumnNames().stream()

Reply via email to