This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ffae2f71a2 [Improve][Transform] Remove can't find field exception  
(#6691)
ffae2f71a2 is described below

commit ffae2f71a28c18ae3f4d97480ac3cbda5d12f79f
Author: xiaochen <[email protected]>
AuthorDate: Fri Apr 12 19:14:54 2024 +0800

    [Improve][Transform] Remove can't find field exception  (#6691)
---
 .../transform/filter/FilterFieldTransform.java     | 50 ++++++++--------------
 1 file changed, 19 insertions(+), 31 deletions(-)

diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
index aaf3168e1b..b18aed0542 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
@@ -35,7 +35,6 @@ import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -43,24 +42,16 @@ import java.util.stream.Collectors;
 public class FilterFieldTransform extends AbstractCatalogSupportTransform {
     public static final String PLUGIN_NAME = "Filter";
     private int[] inputValueIndex;
-    private final String[] fields;
+    private final List<String> fields;
 
     public FilterFieldTransform(
             @NonNull ReadonlyConfig config, @NonNull CatalogTable 
catalogTable) {
         super(catalogTable);
         SeaTunnelRowType seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
-        fields = config.get(FilterFieldTransformConfig.KEY_FIELDS).toArray(new 
String[0]);
+        fields = config.get(FilterFieldTransformConfig.KEY_FIELDS);
         List<String> canNotFoundFields =
-                Arrays.stream(fields)
-                        .filter(
-                                field -> {
-                                    try {
-                                        seaTunnelRowType.indexOf(field);
-                                        return false;
-                                    } catch (Exception e) {
-                                        return true;
-                                    }
-                                })
+                fields.stream()
+                        .filter(field -> seaTunnelRowType.indexOf(field, 
false) == -1)
                         .collect(Collectors.toList());
 
         if (!CollectionUtils.isEmpty(canNotFoundFields)) {
@@ -77,34 +68,32 @@ public class FilterFieldTransform extends 
AbstractCatalogSupportTransform {
     @Override
     protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
         // todo reuse array container if not remove fields
-        Object[] values = new Object[fields.length];
-        for (int i = 0; i < fields.length; i++) {
+        Object[] values = new Object[fields.size()];
+        for (int i = 0; i < fields.size(); i++) {
             values[i] = inputRow.getField(inputValueIndex[i]);
         }
-        return new SeaTunnelRow(values);
+        SeaTunnelRow outputRow = new SeaTunnelRow(values);
+        outputRow.setRowKind(inputRow.getRowKind());
+        outputRow.setTableId(inputRow.getTableId());
+        return outputRow;
     }
 
     @Override
     protected TableSchema transformTableSchema() {
-        List<String> filterFields = Arrays.asList(fields);
         List<Column> outputColumns = new ArrayList<>();
 
         SeaTunnelRowType seaTunnelRowType =
                 inputCatalogTable.getTableSchema().toPhysicalRowDataType();
 
-        inputValueIndex = new int[filterFields.size()];
+        inputValueIndex = new int[fields.size()];
         ArrayList<String> outputFieldNames = new ArrayList<>();
-        for (int i = 0; i < filterFields.size(); i++) {
-            String field = filterFields.get(i);
+        List<Column> inputColumns = 
inputCatalogTable.getTableSchema().getColumns();
+        for (int i = 0; i < fields.size(); i++) {
+            String field = fields.get(i);
             int inputFieldIndex = seaTunnelRowType.indexOf(field);
-            if (inputFieldIndex == -1) {
-                throw 
TransformCommonError.cannotFindInputFieldError(getPluginName(), field);
-            }
             inputValueIndex[i] = inputFieldIndex;
-            outputColumns.add(
-                    
inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).copy());
-            outputFieldNames.add(
-                    
inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).getName());
+            outputColumns.add(inputColumns.get(inputFieldIndex).copy());
+            outputFieldNames.add(inputColumns.get(inputFieldIndex).getName());
         }
 
         List<ConstraintKey> outputConstraintKeys =
@@ -123,10 +112,9 @@ public class FilterFieldTransform extends 
AbstractCatalogSupportTransform {
                         .collect(Collectors.toList());
 
         PrimaryKey copiedPrimaryKey = null;
-        if (inputCatalogTable.getTableSchema().getPrimaryKey() != null
-                && outputFieldNames.containsAll(
-                        
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames())) {
-            copiedPrimaryKey = 
inputCatalogTable.getTableSchema().getPrimaryKey().copy();
+        PrimaryKey primaryKey = 
inputCatalogTable.getTableSchema().getPrimaryKey();
+        if (primaryKey != null && 
outputFieldNames.containsAll(primaryKey.getColumnNames())) {
+            copiedPrimaryKey = primaryKey.copy();
         }
 
         return TableSchema.builder()

Reply via email to