This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 10414a7904 [Fix][Transforms-V2] fix exclude filter mode logic (#9937)
10414a7904 is described below
commit 10414a790426377e9097a78a9dcc50f53149fc7f
Author: dy102 <[email protected]>
AuthorDate: Mon Oct 13 17:04:25 2025 +0900
[Fix][Transforms-V2] fix exclude filter mode logic (#9937)
---
.../seatunnel/e2e/transform/TestTableFilterIT.java | 12 ++
...table_filter_multi_table_with_exclude_mode.conf | 209 +++++++++++++++++++++
.../transform/table/TableFilterConfig.java | 29 +--
.../table/TableFilterMultiCatalogTransform.java | 2 +-
4 files changed, 230 insertions(+), 22 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestTableFilterIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestTableFilterIT.java
index f4d9216f50..d4d0b545f6 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestTableFilterIT.java
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestTableFilterIT.java
@@ -38,4 +38,16 @@ public class TestTableFilterIT extends TestSuiteBase {
Container.ExecResult execResult =
container.executeJob("/table_filter_multi_table.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
+
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Only support for seatunnel")
+ @TestTemplate
+ public void testFilterMultiTableWithExcludeMode(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/table_filter_multi_table_with_exclude_mode.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_filter_multi_table_with_exclude_mode.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_filter_multi_table_with_exclude_mode.conf
new file mode 100644
index 0000000000..0c4a7bad38
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_filter_multi_table_with_exclude_mode.conf
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ plugin_output = "source1"
+
+ tables_configs = [
+ {
+ row.num = 3
+ schema = {
+ table = "test.user_1"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ }
+ ]
+ }
+ },
+ {
+ row.num = 3
+ schema = {
+ table = "test.user_2"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ }
+ ]
+ }
+ },
+ {
+ row.num = 5
+ schema = {
+ table = "test.xyz"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "age"
+ type = "int"
+ }
+ ]
+ }
+ }
+ ]
+ }
+}
+transform {
+ TableFilter {
+ plugin_input = "source1"
+ plugin_output = "transform_a_1"
+
+ database_pattern = "test"
+ table_pattern = "user_\\d+"
+ }
+ TableRename {
+ plugin_input = "transform_a_1"
+ plugin_output = "transform_a_2"
+
+ prefix = "table_a_"
+ }
+
+
+
+ TableFilter {
+ plugin_input = "source1"
+ plugin_output = "transform_b_1"
+
+ database_pattern = "test"
+ table_pattern = "user_\\d+"
+ pattern_mode = "EXCLUDE"
+ }
+ TableRename {
+ plugin_input = "transform_b_1"
+ plugin_output = "transform_b_2"
+
+ prefix = "table_b_"
+ }
+}
+sink {
+ Assert {
+ plugin_input = "transform_a_2"
+
+ rules =
+ {
+ tables_configs = [
+ {
+ table_path = "test.table_a_user_1"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 3
+ }
+ ]
+ },
+ {
+ table_path = "test.table_a_user_2"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 3
+ }
+ ]
+ },
+ {
+ table_path = "test.table_a_xyz"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 0
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 0
+ }
+ ]
+ }
+ ]
+ }
+ }
+
+ Assert {
+ plugin_input = "transform_b_2"
+
+ rules =
+ {
+ tables_configs = [
+ {
+ table_path = "test.table_b_user_1"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 0
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 0
+ }
+ ]
+ },
+ {
+ table_path = "test.table_b_user_2"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 0
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 0
+ }
+ ]
+ },
+ {
+ table_path = "test.table_b_xyz"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 5
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterConfig.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterConfig.java
index 5a8a15396c..7498f46363 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterConfig.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterConfig.java
@@ -88,30 +88,17 @@ public class TableFilterConfig implements Serializable {
@JsonAlias("pattern_mode")
private PatternMode patternMode;
- public boolean isMatch(TablePath tablePath) {
+ public boolean isIncluded(TablePath tablePath) {
if (PatternMode.INCLUDE.equals(patternMode)) {
- if (databasePattern != null &&
!tablePath.getDatabaseName().matches(databasePattern)) {
- return false;
- }
- if (schemaPattern != null &&
!tablePath.getSchemaName().matches(schemaPattern)) {
- return false;
- }
- if (tablePattern != null &&
!tablePath.getTableName().matches(tablePattern)) {
- return false;
- }
- return true;
+ return isMatch(tablePath);
}
+ return !isMatch(tablePath);
+ }
- if (databasePattern != null &&
tablePath.getDatabaseName().matches(databasePattern)) {
- return false;
- }
- if (schemaPattern != null &&
tablePath.getSchemaName().matches(schemaPattern)) {
- return false;
- }
- if (tablePattern != null &&
tablePath.getTableName().matches(tablePattern)) {
- return false;
- }
- return true;
+ private boolean isMatch(TablePath tablePath) {
+ return (databasePattern == null ||
tablePath.getDatabaseName().matches(databasePattern))
+ && (schemaPattern == null ||
tablePath.getSchemaName().matches(schemaPattern))
+ && (tablePattern == null ||
tablePath.getTableName().matches(tablePattern));
}
public static TableFilterConfig of(ReadonlyConfig config) {
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
index ab920c8d0c..8c85b7c4c6 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
@@ -56,7 +56,7 @@ public class TableFilterMultiCatalogTransform extends
AbstractMultiCatalogMapTra
TableFilterConfig.PatternMode.INCLUDE.equals(
tableFilterConfig.getPatternMode());
} else {
- include = tableFilterConfig.isMatch(table.getTablePath());
+ include = tableFilterConfig.isIncluded(table.getTablePath());
}
return new TableFilterTransform(include, table);
}