This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9944684355 [Fix] Fix PrimaryKey in transform (#5704)
9944684355 is described below

commit 994468435576501b59307c6282f093f1ee6935a2
Author: Eric <[email protected]>
AuthorDate: Wed Oct 25 19:28:45 2023 +0800

    [Fix] Fix PrimaryKey in transform (#5704)
---
 .../fieldmapper/FieldMapperTransform.java          | 23 +++++--
 .../transform/filter/FilterFieldTransform.java     | 28 +++++++--
 .../seatunnel/transform/sql/SQLTransform.java      | 70 ++++++++--------------
 3 files changed, 65 insertions(+), 56 deletions(-)

diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
index 4eed0b72e1..f90c498219 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
@@ -92,6 +92,7 @@ public class FieldMapperTransform extends 
AbstractCatalogSupportTransform {
         List<Column> outputColumns = new ArrayList<>(fieldMapper.size());
         needReaderColIndex = new ArrayList<>(fieldMapper.size());
         ArrayList<String> inputFieldNames = 
Lists.newArrayList(seaTunnelRowType.getFieldNames());
+        ArrayList<String> outputFieldNames = new ArrayList<>();
         fieldMapper.forEach(
                 (key, value) -> {
                     int fieldIndex = inputFieldNames.indexOf(key);
@@ -110,18 +111,32 @@ public class FieldMapperTransform extends 
AbstractCatalogSupportTransform {
                                     oldColumn.getDefaultValue(),
                                     oldColumn.getComment());
                     outputColumns.add(outputColumn);
+                    outputFieldNames.add(outputColumn.getName());
                     needReaderColIndex.add(fieldIndex);
                 });
 
         List<ConstraintKey> outputConstraintKeys =
                 inputCatalogTable.getTableSchema().getConstraintKeys().stream()
+                        .filter(
+                                key -> {
+                                    List<String> constraintColumnNames =
+                                            key.getColumnNames().stream()
+                                                    .map(
+                                                            
ConstraintKey.ConstraintKeyColumn
+                                                                    
::getColumnName)
+                                                    
.collect(Collectors.toList());
+                                    return 
outputFieldNames.containsAll(constraintColumnNames);
+                                })
                         .map(ConstraintKey::copy)
                         .collect(Collectors.toList());
 
-        PrimaryKey copiedPrimaryKey =
-                inputCatalogTable.getTableSchema().getPrimaryKey() == null
-                        ? null
-                        : 
inputCatalogTable.getTableSchema().getPrimaryKey().copy();
+        PrimaryKey copiedPrimaryKey = null;
+        if (inputCatalogTable.getTableSchema().getPrimaryKey() != null
+                && outputFieldNames.containsAll(
+                        
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames())) {
+            copiedPrimaryKey = 
inputCatalogTable.getTableSchema().getPrimaryKey().copy();
+        }
+
         return TableSchema.builder()
                 .primaryKey(copiedPrimaryKey)
                 .columns(outputColumns)
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
index 615b749c6d..011713a982 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
@@ -87,6 +87,7 @@ public class FilterFieldTransform extends 
AbstractCatalogSupportTransform {
                 inputCatalogTable.getTableSchema().toPhysicalRowDataType();
 
         inputValueIndex = new int[filterFields.size()];
+        ArrayList<String> outputFieldNames = new ArrayList<>();
         for (int i = 0; i < filterFields.size(); i++) {
             String field = filterFields.get(i);
             int inputFieldIndex = seaTunnelRowType.indexOf(field);
@@ -97,21 +98,36 @@ public class FilterFieldTransform extends 
AbstractCatalogSupportTransform {
             inputValueIndex[i] = inputFieldIndex;
             outputColumns.add(
                     
inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).copy());
+            outputFieldNames.add(
+                    
inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).getName());
         }
 
-        List<ConstraintKey> copyConstraintKeys =
+        List<ConstraintKey> outputConstraintKeys =
                 inputCatalogTable.getTableSchema().getConstraintKeys().stream()
+                        .filter(
+                                key -> {
+                                    List<String> constraintColumnNames =
+                                            key.getColumnNames().stream()
+                                                    .map(
+                                                            
ConstraintKey.ConstraintKeyColumn
+                                                                    
::getColumnName)
+                                                    
.collect(Collectors.toList());
+                                    return 
outputFieldNames.containsAll(constraintColumnNames);
+                                })
                         .map(ConstraintKey::copy)
                         .collect(Collectors.toList());
 
-        PrimaryKey copiedPrimaryKey =
-                inputCatalogTable.getTableSchema().getPrimaryKey() == null
-                        ? null
-                        : 
inputCatalogTable.getTableSchema().getPrimaryKey().copy();
+        PrimaryKey copiedPrimaryKey = null;
+        if (inputCatalogTable.getTableSchema().getPrimaryKey() != null
+                && outputFieldNames.containsAll(
+                        
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames())) {
+            copiedPrimaryKey = 
inputCatalogTable.getTableSchema().getPrimaryKey().copy();
+        }
+
         return TableSchema.builder()
                 .columns(outputColumns)
                 .primaryKey(copiedPrimaryKey)
-                .constraintKey(copyConstraintKeys)
+                .constraintKey(outputConstraintKeys)
                 .build();
     }
 
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
index fca94e5af8..2fcbaa05ee 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
@@ -25,7 +25,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -38,7 +37,9 @@ import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType.ZETA;
 
@@ -119,55 +120,32 @@ public class SQLTransform extends 
AbstractCatalogSupportTransform {
         tryOpen();
         List<String> inputColumnsMapping = new ArrayList<>();
         SeaTunnelRowType outRowType = 
sqlEngine.typeMapping(inputColumnsMapping);
+        List<String> outputColumns = Arrays.asList(outRowType.getFieldNames());
 
         TableSchema.Builder builder = TableSchema.builder();
-        if (inputCatalogTable.getTableSchema().getPrimaryKey() != null) {
-            List<String> outPkColumnNames = new ArrayList<>();
-            for (String pkColumnName :
-                    
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames()) {
-                for (int i = 0; i < inputColumnsMapping.size(); i++) {
-                    if (pkColumnName.equals(inputColumnsMapping.get(i))) {
-                        outPkColumnNames.add(outRowType.getFieldName(i));
-                    }
-                }
-            }
-            if (!outPkColumnNames.isEmpty()) {
-                builder.primaryKey(
-                        PrimaryKey.of(
-                                
inputCatalogTable.getTableSchema().getPrimaryKey().getPrimaryKey(),
-                                outPkColumnNames));
-            }
-        }
-        if (inputCatalogTable.getTableSchema().getConstraintKeys() != null) {
-            List<ConstraintKey> outConstraintKey = new ArrayList<>();
-            for (ConstraintKey constraintKey :
-                    inputCatalogTable.getTableSchema().getConstraintKeys()) {
-                List<ConstraintKey.ConstraintKeyColumn> 
outConstraintColumnKeys = new ArrayList<>();
-                for (ConstraintKey.ConstraintKeyColumn constraintKeyColumn :
-                        constraintKey.getColumnNames()) {
-                    String constraintColumnName = 
constraintKeyColumn.getColumnName();
-                    for (int i = 0; i < inputColumnsMapping.size(); i++) {
-                        if 
(constraintColumnName.equals(inputColumnsMapping.get(i))) {
-                            outConstraintColumnKeys.add(
-                                    ConstraintKey.ConstraintKeyColumn.of(
-                                            outRowType.getFieldName(i),
-                                            
constraintKeyColumn.getSortType()));
-                        }
-                    }
-                }
-                if (!outConstraintColumnKeys.isEmpty()) {
-                    outConstraintKey.add(
-                            ConstraintKey.of(
-                                    constraintKey.getConstraintType(),
-                                    constraintKey.getConstraintName(),
-                                    outConstraintColumnKeys));
-                }
-            }
-            if (!outConstraintKey.isEmpty()) {
-                builder.constraintKey(outConstraintKey);
-            }
+        if (inputCatalogTable.getTableSchema().getPrimaryKey() != null
+                && outputColumns.containsAll(
+                        
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames())) {
+            builder = 
builder.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey().copy());
         }
 
+        List<ConstraintKey> outputConstraintKeys =
+                inputCatalogTable.getTableSchema().getConstraintKeys().stream()
+                        .filter(
+                                key -> {
+                                    List<String> constraintColumnNames =
+                                            key.getColumnNames().stream()
+                                                    .map(
+                                                            
ConstraintKey.ConstraintKeyColumn
+                                                                    
::getColumnName)
+                                                    
.collect(Collectors.toList());
+                                    return 
outputColumns.containsAll(constraintColumnNames);
+                                })
+                        .map(ConstraintKey::copy)
+                        .collect(Collectors.toList());
+
+        builder = builder.constraintKey(outputConstraintKeys);
+
         String[] fieldNames = outRowType.getFieldNames();
         SeaTunnelDataType<?>[] fieldTypes = outRowType.getFieldTypes();
         List<Column> columns = new ArrayList<>(fieldNames.length);

Reply via email to