This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch release-3.5 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/release-3.5 by this push: new 9ea822418 [BP-3.5][FLINK-38393][runtime/transform] Loosen schema consistency check for unreachable rules (#4130) 9ea822418 is described below commit 9ea822418232a88d72b74077aeace91ecc02834f Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com> AuthorDate: Mon Sep 22 14:33:27 2025 +0800 [BP-3.5][FLINK-38393][runtime/transform] Loosen schema consistency check for unreachable rules (#4130) Signed-off-by: yuxiqian <34335406+yuxiq...@users.noreply.github.com> --- .../flink/FlinkPipelineTransformITCase.java | 68 ++++++++++++++++++++++ .../operators/transform/PostTransformOperator.java | 19 ++++-- 2 files changed, 83 insertions(+), 4 deletions(-) diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 9d5c814d6..47a7ead5b 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -1137,6 +1137,74 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, null, null, null, null, null, null], op=INSERT, meta=()}"); } + @ParameterizedTest + @EnumSource + void testTransformMergingIncompatibleRules(ValuesDataSink.SinkApi apiVersion) { + Assertions.assertThatThrownBy( + () -> + runGenericTransformTest( + apiVersion, + Arrays.asList( + new TransformDef( + "\\.*.\\.*.mytable1", + "*, 'rule_1_matched' AS rule_1_matched", + "id > 0", + null, + "id", + null, + null, + null), + new TransformDef( + "\\.*.\\.*.\\.*", + "*, 'rule_fallback' AS rule_fallback", + null, + null, + "id", + null, + null, + null)), + Collections.emptyList())) + .rootCause() + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Trying to merge transformed schemas [columns={`id` INT,`name` STRING,`age` INT,`rule_1_matched` STRING}, primaryKeys=id, partitionKeys=id, options=(), columns={`id` INT,`name` STRING,`age` INT,`rule_fallback` STRING}, primaryKeys=id, partitionKeys=id, options=()], but got more than one column name views: [[id, name, age, rule_1_matched], [id, name, age, rule_fallback]]"); + } + + @ParameterizedTest + @EnumSource + void testTransformWithFallbackRules(ValuesDataSink.SinkApi apiVersion) throws Exception { + runGenericTransformTest( + apiVersion, + Arrays.asList( + new TransformDef( + "\\.*.\\.*.mytable1", + "*, 'rule_1_matched' AS rule_1_matched", + null, + null, + "id", + null, + null, + null), + new TransformDef( + "\\.*.\\.*.\\.*", + "*, 'rule_fallback' AS rule_fallback", + null, + null, + "id", + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`rule_1_matched` STRING}, primaryKeys=id, partitionKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, rule_1_matched], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, rule_1_matched], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, rule_1_matched], after=[2, Bob, 30, rule_1_matched], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`rule_fallback` STRING}, primaryKeys=id, partitionKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, rule_fallback], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, rule_fallback], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, rule_fallback], after=[], op=DELETE, meta=()}")); + } + void runGenericTransformTest( ValuesDataSink.SinkApi sinkApi, List<TransformDef> transformDefs, diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index a9661db2c..1439df897 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -331,7 +331,7 @@ public class PostTransformOperator extends AbstractStreamOperator<Event> } } - // Return original event if no transform predicate is satisfied/. + // Events with no matching filters satisfied won't be emitted to downstream. return Optional.empty(); } @@ -397,9 +397,20 @@ public class PostTransformOperator extends AbstractStreamOperator<Event> /** Obtain effective transformers based on given {@link TableId}. */ private List<PostTransformer> getEffectiveTransformers(TableId tableId) { - return transformers.stream() - .filter(trans -> trans.getSelectors().isMatch(tableId)) - .collect(Collectors.toList()); + List<PostTransformer> effectiveTransformers = new ArrayList<>(); + for (PostTransformer transformer : transformers) { + if (transformer.getSelectors().isMatch(tableId)) { + effectiveTransformers.add(transformer); + + // Transform module works with "First-match" rule. If we have met an uncondition + // transform rule (without any filtering expression), then any following transform + // rule will not be effective. + if (!transformer.getFilter().isPresent()) { + break; + } + } + } + return effectiveTransformers; } /**