This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ffae2f71a2 [Improve][Transform] Remove can't find field exception
(#6691)
ffae2f71a2 is described below
commit ffae2f71a28c18ae3f4d97480ac3cbda5d12f79f
Author: xiaochen <[email protected]>
AuthorDate: Fri Apr 12 19:14:54 2024 +0800
[Improve][Transform] Remove can't find field exception (#6691)
---
.../transform/filter/FilterFieldTransform.java | 50 ++++++++--------------
1 file changed, 19 insertions(+), 31 deletions(-)
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
index aaf3168e1b..b18aed0542 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
@@ -35,7 +35,6 @@ import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@@ -43,24 +42,16 @@ import java.util.stream.Collectors;
public class FilterFieldTransform extends AbstractCatalogSupportTransform {
public static final String PLUGIN_NAME = "Filter";
private int[] inputValueIndex;
- private final String[] fields;
+ private final List<String> fields;
public FilterFieldTransform(
@NonNull ReadonlyConfig config, @NonNull CatalogTable
catalogTable) {
super(catalogTable);
SeaTunnelRowType seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
- fields = config.get(FilterFieldTransformConfig.KEY_FIELDS).toArray(new
String[0]);
+ fields = config.get(FilterFieldTransformConfig.KEY_FIELDS);
List<String> canNotFoundFields =
- Arrays.stream(fields)
- .filter(
- field -> {
- try {
- seaTunnelRowType.indexOf(field);
- return false;
- } catch (Exception e) {
- return true;
- }
- })
+ fields.stream()
+ .filter(field -> seaTunnelRowType.indexOf(field,
false) == -1)
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(canNotFoundFields)) {
@@ -77,34 +68,32 @@ public class FilterFieldTransform extends
AbstractCatalogSupportTransform {
@Override
protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
// todo reuse array container if not remove fields
- Object[] values = new Object[fields.length];
- for (int i = 0; i < fields.length; i++) {
+ Object[] values = new Object[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
values[i] = inputRow.getField(inputValueIndex[i]);
}
- return new SeaTunnelRow(values);
+ SeaTunnelRow outputRow = new SeaTunnelRow(values);
+ outputRow.setRowKind(inputRow.getRowKind());
+ outputRow.setTableId(inputRow.getTableId());
+ return outputRow;
}
@Override
protected TableSchema transformTableSchema() {
- List<String> filterFields = Arrays.asList(fields);
List<Column> outputColumns = new ArrayList<>();
SeaTunnelRowType seaTunnelRowType =
inputCatalogTable.getTableSchema().toPhysicalRowDataType();
- inputValueIndex = new int[filterFields.size()];
+ inputValueIndex = new int[fields.size()];
ArrayList<String> outputFieldNames = new ArrayList<>();
- for (int i = 0; i < filterFields.size(); i++) {
- String field = filterFields.get(i);
+ List<Column> inputColumns =
inputCatalogTable.getTableSchema().getColumns();
+ for (int i = 0; i < fields.size(); i++) {
+ String field = fields.get(i);
int inputFieldIndex = seaTunnelRowType.indexOf(field);
- if (inputFieldIndex == -1) {
- throw
TransformCommonError.cannotFindInputFieldError(getPluginName(), field);
- }
inputValueIndex[i] = inputFieldIndex;
- outputColumns.add(
-
inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).copy());
- outputFieldNames.add(
-
inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).getName());
+ outputColumns.add(inputColumns.get(inputFieldIndex).copy());
+ outputFieldNames.add(inputColumns.get(inputFieldIndex).getName());
}
List<ConstraintKey> outputConstraintKeys =
@@ -123,10 +112,9 @@ public class FilterFieldTransform extends
AbstractCatalogSupportTransform {
.collect(Collectors.toList());
PrimaryKey copiedPrimaryKey = null;
- if (inputCatalogTable.getTableSchema().getPrimaryKey() != null
- && outputFieldNames.containsAll(
-
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames())) {
- copiedPrimaryKey =
inputCatalogTable.getTableSchema().getPrimaryKey().copy();
+ PrimaryKey primaryKey =
inputCatalogTable.getTableSchema().getPrimaryKey();
+ if (primaryKey != null &&
outputFieldNames.containsAll(primaryKey.getColumnNames())) {
+ copiedPrimaryKey = primaryKey.copy();
}
return TableSchema.builder()