This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 10414a7904 [Fix][Transforms-V2] fix exclude filter mode logic (#9937)
10414a7904 is described below

commit 10414a790426377e9097a78a9dcc50f53149fc7f
Author: dy102 <[email protected]>
AuthorDate: Mon Oct 13 17:04:25 2025 +0900

    [Fix][Transforms-V2] fix exclude filter mode logic (#9937)
---
 .../seatunnel/e2e/transform/TestTableFilterIT.java |  12 ++
 ...table_filter_multi_table_with_exclude_mode.conf | 209 +++++++++++++++++++++
 .../transform/table/TableFilterConfig.java         |  29 +--
 .../table/TableFilterMultiCatalogTransform.java    |   2 +-
 4 files changed, 230 insertions(+), 22 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestTableFilterIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestTableFilterIT.java
index f4d9216f50..d4d0b545f6 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestTableFilterIT.java
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestTableFilterIT.java
@@ -38,4 +38,16 @@ public class TestTableFilterIT extends TestSuiteBase {
         Container.ExecResult execResult = 
container.executeJob("/table_filter_multi_table.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
     }
+
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Only support for seatunnel")
+    @TestTemplate
+    public void testFilterMultiTableWithExcludeMode(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/table_filter_multi_table_with_exclude_mode.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_filter_multi_table_with_exclude_mode.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_filter_multi_table_with_exclude_mode.conf
new file mode 100644
index 0000000000..0c4a7bad38
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_filter_multi_table_with_exclude_mode.conf
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "source1"
+
+    tables_configs = [
+      {
+        row.num = 3
+        schema = {
+          table = "test.user_1"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            }
+          ]
+        }
+      },
+      {
+        row.num = 3
+        schema = {
+          table = "test.user_2"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            }
+          ]
+        }
+      },
+      {
+        row.num = 5
+        schema = {
+          table = "test.xyz"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "age"
+              type = "int"
+            }
+          ]
+        }
+      }
+    ]
+  }
+}
+transform {
+  TableFilter {
+    plugin_input = "source1"
+    plugin_output = "transform_a_1"
+
+    database_pattern = "test"
+    table_pattern = "user_\\d+"
+  }
+  TableRename {
+      plugin_input = "transform_a_1"
+      plugin_output = "transform_a_2"
+
+      prefix = "table_a_"
+    }
+
+
+
+  TableFilter {
+    plugin_input = "source1"
+    plugin_output = "transform_b_1"
+
+    database_pattern = "test"
+    table_pattern = "user_\\d+"
+    pattern_mode = "EXCLUDE"
+  }
+    TableRename {
+        plugin_input = "transform_b_1"
+        plugin_output = "transform_b_2"
+
+        prefix = "table_b_"
+      }
+}
+sink {
+  Assert {
+    plugin_input = "transform_a_2"
+
+    rules =
+      {
+        tables_configs = [
+          {
+            table_path = "test.table_a_user_1"
+            row_rules = [
+              {
+                rule_type = MAX_ROW
+                rule_value = 3
+              },
+              {
+                rule_type = MIN_ROW
+                rule_value = 3
+              }
+            ]
+          },
+          {
+              table_path = "test.table_a_user_2"
+              row_rules = [
+                {
+                  rule_type = MAX_ROW
+                  rule_value = 3
+                },
+                {
+                  rule_type = MIN_ROW
+                  rule_value = 3
+                }
+              ]
+          },
+        {
+          table_path = "test.table_a_xyz"
+          row_rules = [
+            {
+              rule_type = MAX_ROW
+              rule_value = 0
+            },
+            {
+              rule_type = MIN_ROW
+              rule_value = 0
+            }
+          ]
+        }
+        ]
+      }
+  }
+
+  Assert {
+      plugin_input = "transform_b_2"
+
+      rules =
+        {
+          tables_configs = [
+            {
+              table_path = "test.table_b_user_1"
+              row_rules = [
+                {
+                  rule_type = MAX_ROW
+                  rule_value = 0
+                },
+                {
+                  rule_type = MIN_ROW
+                  rule_value = 0
+                }
+              ]
+            },
+            {
+                table_path = "test.table_b_user_2"
+                row_rules = [
+                  {
+                    rule_type = MAX_ROW
+                    rule_value = 0
+                  },
+                  {
+                    rule_type = MIN_ROW
+                    rule_value = 0
+                  }
+                ]
+            },
+          {
+            table_path = "test.table_b_xyz"
+            row_rules = [
+              {
+                rule_type = MAX_ROW
+                rule_value = 5
+              },
+              {
+                rule_type = MIN_ROW
+                rule_value = 5
+              }
+            ]
+          }
+          ]
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterConfig.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterConfig.java
index 5a8a15396c..7498f46363 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterConfig.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterConfig.java
@@ -88,30 +88,17 @@ public class TableFilterConfig implements Serializable {
     @JsonAlias("pattern_mode")
     private PatternMode patternMode;
 
-    public boolean isMatch(TablePath tablePath) {
+    public boolean isIncluded(TablePath tablePath) {
         if (PatternMode.INCLUDE.equals(patternMode)) {
-            if (databasePattern != null && 
!tablePath.getDatabaseName().matches(databasePattern)) {
-                return false;
-            }
-            if (schemaPattern != null && 
!tablePath.getSchemaName().matches(schemaPattern)) {
-                return false;
-            }
-            if (tablePattern != null && 
!tablePath.getTableName().matches(tablePattern)) {
-                return false;
-            }
-            return true;
+            return isMatch(tablePath);
         }
+        return !isMatch(tablePath);
+    }
 
-        if (databasePattern != null && 
tablePath.getDatabaseName().matches(databasePattern)) {
-            return false;
-        }
-        if (schemaPattern != null && 
tablePath.getSchemaName().matches(schemaPattern)) {
-            return false;
-        }
-        if (tablePattern != null && 
tablePath.getTableName().matches(tablePattern)) {
-            return false;
-        }
-        return true;
+    private boolean isMatch(TablePath tablePath) {
+        return (databasePattern == null || 
tablePath.getDatabaseName().matches(databasePattern))
+                && (schemaPattern == null || 
tablePath.getSchemaName().matches(schemaPattern))
+                && (tablePattern == null || 
tablePath.getTableName().matches(tablePattern));
     }
 
     public static TableFilterConfig of(ReadonlyConfig config) {
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
index ab920c8d0c..8c85b7c4c6 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
@@ -56,7 +56,7 @@ public class TableFilterMultiCatalogTransform extends 
AbstractMultiCatalogMapTra
                     TableFilterConfig.PatternMode.INCLUDE.equals(
                             tableFilterConfig.getPatternMode());
         } else {
-            include = tableFilterConfig.isMatch(table.getTablePath());
+            include = tableFilterConfig.isIncluded(table.getTablePath());
         }
         return new TableFilterTransform(include, table);
     }

Reply via email to