This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch release-3.5
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/release-3.5 by this push:
     new 9ea822418 [BP-3.5][FLINK-38393][runtime/transform] Loosen schema 
consistency check for unreachable rules (#4130)
9ea822418 is described below

commit 9ea822418232a88d72b74077aeace91ecc02834f
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Mon Sep 22 14:33:27 2025 +0800

    [BP-3.5][FLINK-38393][runtime/transform] Loosen schema consistency check 
for unreachable rules (#4130)
    
    Signed-off-by: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
---
 .../flink/FlinkPipelineTransformITCase.java        | 68 ++++++++++++++++++++++
 .../operators/transform/PostTransformOperator.java | 19 ++++--
 2 files changed, 83 insertions(+), 4 deletions(-)

diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index 9d5c814d6..47a7ead5b 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -1137,6 +1137,74 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3, null, null, null, null, null, null], op=INSERT, meta=()}");
     }
 
+    @ParameterizedTest
+    @EnumSource
+    void testTransformMergingIncompatibleRules(ValuesDataSink.SinkApi 
apiVersion) {
+        Assertions.assertThatThrownBy(
+                        () ->
+                                runGenericTransformTest(
+                                        apiVersion,
+                                        Arrays.asList(
+                                                new TransformDef(
+                                                        "\\.*.\\.*.mytable1",
+                                                        "*, 'rule_1_matched' 
AS rule_1_matched",
+                                                        "id > 0",
+                                                        null,
+                                                        "id",
+                                                        null,
+                                                        null,
+                                                        null),
+                                                new TransformDef(
+                                                        "\\.*.\\.*.\\.*",
+                                                        "*, 'rule_fallback' AS 
rule_fallback",
+                                                        null,
+                                                        null,
+                                                        "id",
+                                                        null,
+                                                        null,
+                                                        null)),
+                                        Collections.emptyList()))
+                .rootCause()
+                .isExactlyInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Trying to merge transformed schemas [columns={`id` 
INT,`name` STRING,`age` INT,`rule_1_matched` STRING}, primaryKeys=id, 
partitionKeys=id, options=(), columns={`id` INT,`name` STRING,`age` 
INT,`rule_fallback` STRING}, primaryKeys=id, partitionKeys=id, options=()], but 
got more than one column name views: [[id, name, age, rule_1_matched], [id, 
name, age, rule_fallback]]");
+    }
+
+    @ParameterizedTest
+    @EnumSource
+    void testTransformWithFallbackRules(ValuesDataSink.SinkApi apiVersion) 
throws Exception {
+        runGenericTransformTest(
+                apiVersion,
+                Arrays.asList(
+                        new TransformDef(
+                                "\\.*.\\.*.mytable1",
+                                "*, 'rule_1_matched' AS rule_1_matched",
+                                null,
+                                null,
+                                "id",
+                                null,
+                                null,
+                                null),
+                        new TransformDef(
+                                "\\.*.\\.*.\\.*",
+                                "*, 'rule_fallback' AS rule_fallback",
+                                null,
+                                null,
+                                "id",
+                                null,
+                                null,
+                                null)),
+                Arrays.asList(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`rule_1_matched` 
STRING}, primaryKeys=id, partitionKeys=id, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18, rule_1_matched], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20, rule_1_matched], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20, rule_1_matched], after=[2, Bob, 30, rule_1_matched], op=UPDATE, 
meta=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING,`rule_fallback` STRING}, primaryKeys=id, 
partitionKeys=id, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student, rule_fallback], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student, rule_fallback], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student, rule_fallback], after=[], op=DELETE, meta=()}"));
+    }
+
     void runGenericTransformTest(
             ValuesDataSink.SinkApi sinkApi,
             List<TransformDef> transformDefs,
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index a9661db2c..1439df897 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -331,7 +331,7 @@ public class PostTransformOperator extends 
AbstractStreamOperator<Event>
             }
         }
 
-        // Return original event if no transform predicate is satisfied/.
+        // Events with no matching filters satisfied won't be emitted to 
downstream.
         return Optional.empty();
     }
 
@@ -397,9 +397,20 @@ public class PostTransformOperator extends 
AbstractStreamOperator<Event>
 
     /** Obtain effective transformers based on given {@link TableId}. */
     private List<PostTransformer> getEffectiveTransformers(TableId tableId) {
-        return transformers.stream()
-                .filter(trans -> trans.getSelectors().isMatch(tableId))
-                .collect(Collectors.toList());
+        List<PostTransformer> effectiveTransformers = new ArrayList<>();
+        for (PostTransformer transformer : transformers) {
+            if (transformer.getSelectors().isMatch(tableId)) {
+                effectiveTransformers.add(transformer);
+
+                // Transform module works with "First-match" rule. If we have 
met an uncondition
+                // transform rule (without any filtering expression), then any 
following transform
+                // rule will not be effective.
+                if (!transformer.getFilter().isPresent()) {
+                    break;
+                }
+            }
+        }
+        return effectiveTransformers;
     }
 
     /**

Reply via email to