This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4ba66372d0 [Feature][Transform-V2 Filter] support exclude columns in
the filter transform (#6960)
4ba66372d0 is described below
commit 4ba66372d03ee08457781ec43628192fc2a9ed0f
Author: litiliu <[email protected]>
AuthorDate: Thu Jun 13 18:47:12 2024 +0800
[Feature][Transform-V2 Filter] support exclude columns in the filter
transform (#6960)
---
docs/en/transform-v2/filter.md | 35 +++-
docs/zh/transform-v2/filter.md | 31 ++-
.../exception/TransformCommonErrorCode.java | 2 +-
.../transform/filter/FilterFieldTransform.java | 69 ++++--
.../filter/FilterFieldTransformConfig.java | 14 +-
.../filter/FilterFieldTransformFactory.java | 6 +-
.../transform/exception/TransformErrorTest.java | 2 +-
.../transform/filter/FilterFieldTransformTest.java | 231 +++++++++++++++++++++
8 files changed, 355 insertions(+), 35 deletions(-)
diff --git a/docs/en/transform-v2/filter.md b/docs/en/transform-v2/filter.md
index 66c30f836b..f9f28b8398 100644
--- a/docs/en/transform-v2/filter.md
+++ b/docs/en/transform-v2/filter.md
@@ -8,13 +8,20 @@ Filter the field.
## Options
-| name | type | required | default value |
-|--------|-------|----------|---------------|
-| fields | array | yes | |
+| name | type | required | default value |
+|----------------|-------|----------|---------------|
+| include_fields | array | no | |
+| exclude_fields | array | no | |
-### fields [array]
+Notice, you must set one and only one of `include_fields` and `exclude_fields`
properties
-The list of fields that need to be kept. Fields not in the list will be deleted
+### include_fields [array]
+
+The list of fields that need to be kept. Fields not in the list will be
deleted.
+
+### exclude_fields [array]
+
+The list of fields that need to be deleted. Fields not in the list will be
kept.
### common options [string]
@@ -31,18 +38,32 @@ The data read from source is a table like this:
| Kin Dom | 20 | 123 |
| Joy Dom | 20 | 123 |
-We want to delete field `age`, we can add `Filter` Transform like this
+we want to keep the field named `name`, `card`, we can add a `Filter`
Transform like below:
+
+```
+transform {
+ Filter {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ include_fields = [name, card]
+ }
+}
+```
+
+Or we can delete the field named `age` by adding a `Filter` Transform with
`exclude_fields` field set like below:
```
transform {
Filter {
source_table_name = "fake"
result_table_name = "fake1"
- fields = [name, card]
+ exclude_fields = [age]
}
}
```
+It is useful when you want to delete a small number of fields from a large
table with tons of fields.
+
Then the data in result table `fake1` will like this
| name | card |
diff --git a/docs/zh/transform-v2/filter.md b/docs/zh/transform-v2/filter.md
index 706a72ead1..1f02c999a3 100644
--- a/docs/zh/transform-v2/filter.md
+++ b/docs/zh/transform-v2/filter.md
@@ -8,14 +8,21 @@
## 属性
-| 名称 | 类型 | 是否必须 | 默认值 |
-|--------|-------|------|-----|
-| fields | array | yes | |
+| 名称 | 类型 | 是否必须 | 默认值 |
+|----------------|-------|------|-----|
+| include_fields | array | no | |
+| exclude_fields | array | no | |
-### fields [array]
+### include_fields [array]
需要保留的字段列表。不在列表中的字段将被删除。
+### exclude_fields [array]
+
+需要删除的字段列表。不在列表中的字段将被保留。
+
+注意,`include_fields` 和 `exclude_fields` 两个属性中,必须设置一个且只能设置一个
+
### common options [string]
转换插件的常见参数, 请参考 [Transform Plugin](common-options.md) 了解详情
@@ -31,14 +38,26 @@
| Kin Dom | 20 | 123 |
| Joy Dom | 20 | 123 |
-我们想要删除字段 `age`,我们可以像这样添加 `Filter` 转换
+我们想要保留字段 `name`, `card`,我们可以像这样添加 `Filter` 转换:
+
+```
+transform {
+ Filter {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ include_fields = [name, card]
+ }
+}
+```
+
+我们也可以通过删除字段 `age` 来实现, 我们可以添加一个 `Filter` 转换,并设置exclude_fields:
```
transform {
Filter {
source_table_name = "fake"
result_table_name = "fake1"
- fields = [name, card]
+ exclude_fields = [age]
}
}
```
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
index 4a5eea66c7..dc5008ec04 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.transform.exception;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
-enum TransformCommonErrorCode implements SeaTunnelErrorCode {
+public enum TransformCommonErrorCode implements SeaTunnelErrorCode {
INPUT_FIELD_NOT_FOUND(
"TRANSFORM_COMMON-01",
"The input field '<field>' of '<transform>' transform not found in
upstream schema"),
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
index b18aed0542..ace54a5ec9 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.transform.filter;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
@@ -36,21 +38,40 @@ import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
@Slf4j
public class FilterFieldTransform extends AbstractCatalogSupportTransform {
public static final String PLUGIN_NAME = "Filter";
- private int[] inputValueIndex;
- private final List<String> fields;
+
+ private int[] inputValueIndexList;
+
+ private final List<String> includeFields;
+ private final List<String> excludeFields;
public FilterFieldTransform(
@NonNull ReadonlyConfig config, @NonNull CatalogTable
catalogTable) {
super(catalogTable);
SeaTunnelRowType seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
- fields = config.get(FilterFieldTransformConfig.KEY_FIELDS);
+ includeFields = config.get(FilterFieldTransformConfig.INCLUDE_FIELDS);
+ excludeFields = config.get(FilterFieldTransformConfig.EXCLUDE_FIELDS);
+ // exactly only one should be set
+ ConfigValidator.of(config)
+ .validate(
+ OptionRule.builder()
+ .exclusive(
+
FilterFieldTransformConfig.INCLUDE_FIELDS,
+
FilterFieldTransformConfig.EXCLUDE_FIELDS)
+ .build());
List<String> canNotFoundFields =
- fields.stream()
+ Stream.concat(
+ Optional.ofNullable(includeFields).orElse(new
ArrayList<>())
+ .stream(),
+ Optional.ofNullable(excludeFields).orElse(new
ArrayList<>())
+ .stream())
.filter(field -> seaTunnelRowType.indexOf(field,
false) == -1)
.collect(Collectors.toList());
@@ -67,10 +88,9 @@ public class FilterFieldTransform extends
AbstractCatalogSupportTransform {
@Override
protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
- // todo reuse array container if not remove fields
- Object[] values = new Object[fields.size()];
- for (int i = 0; i < fields.size(); i++) {
- values[i] = inputRow.getField(inputValueIndex[i]);
+ Object[] values = new Object[inputValueIndexList.length];
+ for (int i = 0; i < inputValueIndexList.length; i++) {
+ values[i] = inputRow.getField(inputValueIndexList[i]);
}
SeaTunnelRow outputRow = new SeaTunnelRow(values);
outputRow.setRowKind(inputRow.getRowKind());
@@ -85,15 +105,34 @@ public class FilterFieldTransform extends
AbstractCatalogSupportTransform {
SeaTunnelRowType seaTunnelRowType =
inputCatalogTable.getTableSchema().toPhysicalRowDataType();
- inputValueIndex = new int[fields.size()];
ArrayList<String> outputFieldNames = new ArrayList<>();
List<Column> inputColumns =
inputCatalogTable.getTableSchema().getColumns();
- for (int i = 0; i < fields.size(); i++) {
- String field = fields.get(i);
- int inputFieldIndex = seaTunnelRowType.indexOf(field);
- inputValueIndex[i] = inputFieldIndex;
- outputColumns.add(inputColumns.get(inputFieldIndex).copy());
- outputFieldNames.add(inputColumns.get(inputFieldIndex).getName());
+ // include
+ if (Objects.nonNull(includeFields)) {
+ inputValueIndexList = new int[includeFields.size()];
+ for (int i = 0; i < includeFields.size(); i++) {
+ String fieldName = includeFields.get(i);
+ int inputFieldIndex = seaTunnelRowType.indexOf(fieldName);
+ inputValueIndexList[i] = inputFieldIndex;
+ outputColumns.add(inputColumns.get(inputFieldIndex).copy());
+
outputFieldNames.add(inputColumns.get(inputFieldIndex).getName());
+ }
+ }
+
+ // exclude
+ if (Objects.nonNull(excludeFields)) {
+ inputValueIndexList = new int[inputColumns.size() -
excludeFields.size()];
+ int index = 0;
+ for (int i = 0; i < inputColumns.size(); i++) {
+ // if the field is not in the fields, then add it to the
outputColumns
+ if (!excludeFields.contains(inputColumns.get(i).getName())) {
+ String fieldName = inputColumns.get(i).getName();
+ int inputFieldIndex = seaTunnelRowType.indexOf(fieldName);
+ inputValueIndexList[index++] = inputFieldIndex;
+ outputColumns.add(inputColumns.get(i).copy());
+ outputFieldNames.add(inputColumns.get(i).getName());
+ }
+ }
}
List<ConstraintKey> outputConstraintKeys =
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformConfig.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformConfig.java
index adf7cca803..ebffe554dd 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformConfig.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformConfig.java
@@ -29,10 +29,16 @@ import java.util.List;
@Getter
@Setter
public class FilterFieldTransformConfig implements Serializable {
- public static final Option<List<String>> KEY_FIELDS =
- Options.key("fields")
+ public static final Option<List<String>> INCLUDE_FIELDS =
+ Options.key("include_fields")
.listType()
.noDefaultValue()
- .withDescription(
- "The list of fields that need to be kept. Fields
not in the list will be deleted");
+ .withDescription("The list of fields that need to be
kept.")
+ .withFallbackKeys("fields");
+
+ public static final Option<List<String>> EXCLUDE_FIELDS =
+ Options.key("exclude_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The list of fields that need to be
deleted");
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
index f562a7cc28..e8a63275fa 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
@@ -37,7 +37,11 @@ public class FilterFieldTransformFactory implements
TableTransformFactory {
@Override
public OptionRule optionRule() {
- return
OptionRule.builder().required(FilterFieldTransformConfig.KEY_FIELDS).build();
+ return OptionRule.builder()
+ .exclusive(
+ FilterFieldTransformConfig.INCLUDE_FIELDS,
+ FilterFieldTransformConfig.EXCLUDE_FIELDS)
+ .build();
}
@Override
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/exception/TransformErrorTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/exception/TransformErrorTest.java
index c305511809..772c913c74 100644
---
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/exception/TransformErrorTest.java
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/exception/TransformErrorTest.java
@@ -153,7 +153,7 @@ public class TransformErrorTest {
new HashMap<String, Object>() {
{
put(
-
FilterFieldTransformConfig.KEY_FIELDS.key(),
+
FilterFieldTransformConfig.INCLUDE_FIELDS.key(),
new ArrayList<String>() {
{
add("age");
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/filter/FilterFieldTransformTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/filter/FilterFieldTransformTest.java
new file mode 100644
index 0000000000..cc1719cc0a
--- /dev/null
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/filter/FilterFieldTransformTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.filter;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class FilterFieldTransformTest {
+
+ static List<String> filterKeys = Arrays.asList("key3", "key2");
+ static CatalogTable catalogTable;
+ static Object[] values;
+
+ @BeforeAll
+ static void setUp() {
+ catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("catalog", TablePath.DEFAULT),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "key1",
+ BasicType.STRING_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "key2",
+ BasicType.STRING_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "key3",
+ BasicType.STRING_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "key4",
+ BasicType.STRING_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "key5",
+ BasicType.STRING_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .build(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ "comment");
+ values = new Object[] {"value1", "value2", "value3", "value4",
"value5"};
+ SeaTunnelRow inputRow = new SeaTunnelRow(values);
+ }
+
+ @Test
+ void testConfig() {
+ // test both not set
+ try {
+ new FilterFieldTransform(ReadonlyConfig.fromMap(new HashMap<>()),
catalogTable);
+ } catch (Exception e) {
+ Assertions.assertEquals(
+ "ErrorCode:[API-02], ErrorDescription:[Option item
validate failed] - There are unconfigured options, these
options('include_fields', 'exclude_fields') are mutually exclusive, allowing
only one set(\"[] for a set\") of options to be configured.",
+ e.getMessage());
+ }
+
+ // test both include and exclude set
+ try {
+ new FilterFieldTransform(
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+
FilterFieldTransformConfig.INCLUDE_FIELDS.key(),
+ filterKeys);
+ put(
+
FilterFieldTransformConfig.EXCLUDE_FIELDS.key(),
+ filterKeys);
+ }
+ }),
+ catalogTable);
+ } catch (Exception e) {
+ Assertions.assertEquals(
+ "ErrorCode:[API-02], ErrorDescription:[Option item
validate failed] - These options('include_fields', 'exclude_fields') are
mutually exclusive, allowing only one set(\"[] for a set\") of options to be
configured.",
+ e.getMessage());
+ }
+
+ // not exception should be thrown now
+ new FilterFieldTransform(
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+
put(FilterFieldTransformConfig.INCLUDE_FIELDS.key(), filterKeys);
+ }
+ }),
+ catalogTable);
+
+ new FilterFieldTransform(
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+
put(FilterFieldTransformConfig.EXCLUDE_FIELDS.key(), filterKeys);
+ }
+ }),
+ catalogTable);
+ }
+
+ @Test
+ void testInclude() {
+ // default include
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FilterFieldTransformConfig.INCLUDE_FIELDS.key(),
filterKeys);
+
+ FilterFieldTransform filterFieldTransform =
+ new FilterFieldTransform(ReadonlyConfig.fromMap(configMap),
catalogTable);
+
+ // test output schema
+ TableSchema resultSchema = filterFieldTransform.transformTableSchema();
+ Assertions.assertNotNull(resultSchema);
+ Assertions.assertEquals(filterKeys.size(),
resultSchema.getColumns().size());
+ for (int i = 0; i < resultSchema.getColumns().size(); i++) {
+ Assertions.assertEquals(filterKeys.get(i),
resultSchema.getColumns().get(i).getName());
+ }
+
+ // test output row
+ SeaTunnelRow input = new SeaTunnelRow(values);
+ SeaTunnelRow output = filterFieldTransform.transformRow(input);
+ Assertions.assertNotNull(output);
+ Assertions.assertEquals(filterKeys.size(), output.getFields().length);
+ for (int i = 0; i < resultSchema.getFieldNames().length; i++) {
+ Integer originalIndex =
+ catalogTable
+ .getTableSchema()
+ .toPhysicalRowDataType()
+ .indexOf(resultSchema.getFieldNames()[i]);
+ // test the row's field value
+ Assertions.assertEquals(input.getFields()[originalIndex],
output.getFields()[i]);
+ }
+ }
+
+ @Test
+ void testExclude() {
+ // exclude
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FilterFieldTransformConfig.EXCLUDE_FIELDS.key(),
filterKeys);
+ FilterFieldTransform filterFieldTransform =
+ new FilterFieldTransform(ReadonlyConfig.fromMap(configMap),
catalogTable);
+
+ // test output schema
+ TableSchema resultSchema = filterFieldTransform.transformTableSchema();
+ Assertions.assertNotNull(resultSchema);
+ Assertions.assertEquals(
+ catalogTable.getTableSchema().getColumns().size() -
filterKeys.size(),
+ resultSchema.getColumns().size());
+ for (int i = 0; i <
catalogTable.getTableSchema().getFieldNames().length; i++) {
+ if
(!filterKeys.contains(catalogTable.getTableSchema().getFieldNames()[i])) {
+ int finalI = i;
+ Assertions.assertTrue(
+ resultSchema.getColumns().stream()
+ .anyMatch(
+ column ->
+ column.getName()
+ .equals(
+
catalogTable.getTableSchema()
+
.getFieldNames()[finalI])));
+ }
+ }
+
+ // test output row
+ SeaTunnelRow input = new SeaTunnelRow(values);
+ SeaTunnelRow output = filterFieldTransform.transformRow(input);
+ Assertions.assertNotNull(output);
+ Assertions.assertEquals(
+ catalogTable.getTableSchema().getColumns().size() -
filterKeys.size(),
+ output.getFields().length);
+ for (int i = 0; i < output.getFields().length; i++) {
+ if
(!filterKeys.contains(catalogTable.getTableSchema().getFieldNames()[i])) {
+ Integer originalIndex =
+ catalogTable
+ .getTableSchema()
+ .toPhysicalRowDataType()
+
.indexOf(catalogTable.getTableSchema().getFieldNames()[i]);
+ // test the row's field value
+ Assertions.assertEquals(input.getFields()[originalIndex],
output.getFields()[i]);
+ }
+ }
+ }
+}