This is an automated email from the ASF dual-hosted git repository.
ruanhang1993 pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/release-3.2 by this push:
new fc752fb3c [FLINK-36184][transform] Fix transform operator swallows
schema changes from tables not present in transform rules (#3591)
fc752fb3c is described below
commit fc752fb3c8979c5d311bb0c02dc2bf6a89ad7544
Author: yuxiqian <[email protected]>
AuthorDate: Fri Aug 30 11:41:39 2024 +0800
[FLINK-36184][transform] Fix transform operator swallows schema changes
from tables not present in transform rules (#3591)
---
.../cdc/pipeline/tests/SchemaEvolveE2eITCase.java | 88 ++++++++++++++++++++++
.../operators/transform/PostTransformOperator.java | 32 +++++---
.../operators/transform/PreTransformOperator.java | 29 ++++---
3 files changed, 129 insertions(+), 20 deletions(-)
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
index 64e3e9c57..934007bc8 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
@@ -242,6 +242,94 @@ public class SchemaEvolveE2eITCase extends
PipelineTestEnvironment {
() -> submitPipelineJob(pipelineJob, mysqlCdcJar,
valuesCdcJar, mysqlDriverJar));
}
+ @Test
+ public void testByDefaultTransform() throws Exception {
+ String dbName = schemaEvolveDatabase.getDatabaseName();
+
+ // We put a dummy transform block that matches nothing
+ // to ensure TransformOperator exists, so we could verify if
TransformOperator could
+ // correctly handle such "bypass" tables with schema changes.
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: %s\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.members\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "\n"
+ + "sink:\n"
+ + " type: values\n"
+ + "transform:\n"
+ + " - source-table: another.irrelevant\n"
+ + " projection: \"'irrelevant' AS tag\"\n"
+ + "\n"
+ + "pipeline:\n"
+ + " schema.change.behavior: evolve\n"
+ + " parallelism: %d",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ dbName,
+ parallelism);
+ Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+ Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
+ Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+ submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar,
mysqlDriverJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+ validateSnapshotData(dbName, "members");
+
+ LOG.info("Starting schema evolution");
+ String mysqlJdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%s/%s", MYSQL.getHost(),
MYSQL.getDatabasePort(), dbName);
+
+ try (Connection conn =
+ DriverManager.getConnection(
+ mysqlJdbcUrl, MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD);
+ Statement stmt = conn.createStatement()) {
+
+ waitForIncrementalStage(dbName, "members", stmt);
+
+ // triggers AddColumnEvent
+ stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER
age;");
+ stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);");
+
+ // triggers AlterColumnTypeEvent and RenameColumnEvent
+ stmt.execute("ALTER TABLE members CHANGE COLUMN age precise_age
DOUBLE;");
+
+ // triggers RenameColumnEvent
+ stmt.execute("ALTER TABLE members RENAME COLUMN gender TO
biological_sex;");
+
+ // triggers DropColumnEvent
+ stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
+ stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");
+ stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);");
+ }
+
+ List<String> expectedTaskManagerEvents =
+ Arrays.asList(
+ "AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER,
existedColumnName=age}]}",
+ "DataChangeEvent{tableId=%s.members, before=[],
after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
+ "AlterColumnTypeEvent{tableId=%s.members,
nameMapping={age=DOUBLE}}",
+ "RenameColumnEvent{tableId=%s.members,
nameMapping={age=precise_age}}",
+ "RenameColumnEvent{tableId=%s.members,
nameMapping={gender=biological_sex}}",
+ "DropColumnEvent{tableId=%s.members,
droppedColumnNames=[biological_sex]}",
+ "DataChangeEvent{tableId=%s.members, before=[],
after=[1013, Fiona, 16.0], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.members, before=[],
after=[1014, Gem, 17.0], op=INSERT, meta=()}");
+
+ List<String> expectedTmEvents =
+ expectedTaskManagerEvents.stream()
+ .map(s -> String.format(s, dbName, dbName))
+ .collect(Collectors.toList());
+
+ validateResult(expectedTmEvents, taskManagerConsumer);
+ }
+
private void testGenericSchemaEvolution(
String behavior,
boolean mergeTable,
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index 30c78d202..83dd07c53 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -258,17 +258,29 @@ public class PostTransformOperator extends
AbstractStreamOperator<Event>
.stream())
.map(ProjectionColumn::getColumnName)
.collect(Collectors.toSet());
- boolean hasAsterisk =
- transforms.stream()
- .filter(t -> t.getSelectors().isMatch(tableId))
- .anyMatch(
- t ->
- TransformParser.hasAsterisk(
- t.getProjection()
-
.map(TransformProjection::getProjection)
- .orElse(null)));
- hasAsteriskMap.put(tableId, hasAsterisk);
+ boolean notTransformed =
+ transforms.stream().noneMatch(t ->
t.getSelectors().isMatch(tableId));
+
+ if (notTransformed) {
+ // If this TableId isn't presented in any transform block, it
should behave like a
+ // "*" projection and should be regarded as asterisk-ful.
+ hasAsteriskMap.put(tableId, true);
+ } else {
+ boolean hasAsterisk =
+ transforms.stream()
+ .filter(t -> t.getSelectors().isMatch(tableId))
+ .anyMatch(
+ t ->
+ TransformParser.hasAsterisk(
+ t.getProjection()
+ .map(
+
TransformProjection
+
::getProjection)
+
.orElse(null)));
+
+ hasAsteriskMap.put(tableId, hasAsterisk);
+ }
projectedColumnsMap.put(
tableId,
createTableEvent.getSchema().getColumnNames().stream()
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index 4da0f0b99..5e5c4c708 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -293,17 +293,26 @@ public class PreTransformOperator extends
AbstractStreamOperator<Event>
.map(Column::getName)
.collect(Collectors.toSet());
- boolean hasAsterisk =
- transforms.stream()
- .filter(t -> t.getSelectors().isMatch(tableId))
- .anyMatch(
- t ->
- TransformParser.hasAsterisk(
- t.getProjection()
-
.map(TransformProjection::getProjection)
- .orElse(null)));
+ boolean notTransformed =
+ transforms.stream().noneMatch(t ->
t.getSelectors().isMatch(tableId));
- hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
+ if (notTransformed) {
+ // If this TableId isn't presented in any transform block, it
should behave like a "*"
+ // projection and should be regarded as asterisk-ful.
+ hasAsteriskMap.put(tableId, true);
+ } else {
+ boolean hasAsterisk =
+ transforms.stream()
+ .filter(t -> t.getSelectors().isMatch(tableId))
+ .anyMatch(
+ t ->
+ TransformParser.hasAsterisk(
+ t.getProjection()
+
.map(TransformProjection::getProjection)
+ .orElse(null)));
+
+ hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
+ }
referencedColumnsMap.put(
createTableEvent.tableId(),
createTableEvent.getSchema().getColumnNames().stream()