This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9944684355 [Fix] Fix PrimaryKey in transform (#5704)
9944684355 is described below
commit 994468435576501b59307c6282f093f1ee6935a2
Author: Eric <[email protected]>
AuthorDate: Wed Oct 25 19:28:45 2023 +0800
[Fix] Fix PrimaryKey in transform (#5704)
---
.../fieldmapper/FieldMapperTransform.java | 23 +++++--
.../transform/filter/FilterFieldTransform.java | 28 +++++++--
.../seatunnel/transform/sql/SQLTransform.java | 70 ++++++++--------------
3 files changed, 65 insertions(+), 56 deletions(-)
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
index 4eed0b72e1..f90c498219 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
@@ -92,6 +92,7 @@ public class FieldMapperTransform extends
AbstractCatalogSupportTransform {
List<Column> outputColumns = new ArrayList<>(fieldMapper.size());
needReaderColIndex = new ArrayList<>(fieldMapper.size());
ArrayList<String> inputFieldNames =
Lists.newArrayList(seaTunnelRowType.getFieldNames());
+ ArrayList<String> outputFieldNames = new ArrayList<>();
fieldMapper.forEach(
(key, value) -> {
int fieldIndex = inputFieldNames.indexOf(key);
@@ -110,18 +111,32 @@ public class FieldMapperTransform extends
AbstractCatalogSupportTransform {
oldColumn.getDefaultValue(),
oldColumn.getComment());
outputColumns.add(outputColumn);
+ outputFieldNames.add(outputColumn.getName());
needReaderColIndex.add(fieldIndex);
});
List<ConstraintKey> outputConstraintKeys =
inputCatalogTable.getTableSchema().getConstraintKeys().stream()
+ .filter(
+ key -> {
+ List<String> constraintColumnNames =
+ key.getColumnNames().stream()
+ .map(
+
ConstraintKey.ConstraintKeyColumn
+
::getColumnName)
+
.collect(Collectors.toList());
+ return
outputFieldNames.containsAll(constraintColumnNames);
+ })
.map(ConstraintKey::copy)
.collect(Collectors.toList());
- PrimaryKey copiedPrimaryKey =
- inputCatalogTable.getTableSchema().getPrimaryKey() == null
- ? null
- :
inputCatalogTable.getTableSchema().getPrimaryKey().copy();
+ PrimaryKey copiedPrimaryKey = null;
+ if (inputCatalogTable.getTableSchema().getPrimaryKey() != null
+ && outputFieldNames.containsAll(
+
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames())) {
+ copiedPrimaryKey =
inputCatalogTable.getTableSchema().getPrimaryKey().copy();
+ }
+
return TableSchema.builder()
.primaryKey(copiedPrimaryKey)
.columns(outputColumns)
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
index 615b749c6d..011713a982 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
@@ -87,6 +87,7 @@ public class FilterFieldTransform extends
AbstractCatalogSupportTransform {
inputCatalogTable.getTableSchema().toPhysicalRowDataType();
inputValueIndex = new int[filterFields.size()];
+ ArrayList<String> outputFieldNames = new ArrayList<>();
for (int i = 0; i < filterFields.size(); i++) {
String field = filterFields.get(i);
int inputFieldIndex = seaTunnelRowType.indexOf(field);
@@ -97,21 +98,36 @@ public class FilterFieldTransform extends
AbstractCatalogSupportTransform {
inputValueIndex[i] = inputFieldIndex;
outputColumns.add(
inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).copy());
+ outputFieldNames.add(
+
inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).getName());
}
- List<ConstraintKey> copyConstraintKeys =
+ List<ConstraintKey> outputConstraintKeys =
inputCatalogTable.getTableSchema().getConstraintKeys().stream()
+ .filter(
+ key -> {
+ List<String> constraintColumnNames =
+ key.getColumnNames().stream()
+ .map(
+
ConstraintKey.ConstraintKeyColumn
+
::getColumnName)
+
.collect(Collectors.toList());
+ return
outputFieldNames.containsAll(constraintColumnNames);
+ })
.map(ConstraintKey::copy)
.collect(Collectors.toList());
- PrimaryKey copiedPrimaryKey =
- inputCatalogTable.getTableSchema().getPrimaryKey() == null
- ? null
- :
inputCatalogTable.getTableSchema().getPrimaryKey().copy();
+ PrimaryKey copiedPrimaryKey = null;
+ if (inputCatalogTable.getTableSchema().getPrimaryKey() != null
+ && outputFieldNames.containsAll(
+
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames())) {
+ copiedPrimaryKey =
inputCatalogTable.getTableSchema().getPrimaryKey().copy();
+ }
+
return TableSchema.builder()
.columns(outputColumns)
.primaryKey(copiedPrimaryKey)
- .constraintKey(copyConstraintKeys)
+ .constraintKey(outputConstraintKeys)
.build();
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
index fca94e5af8..2fcbaa05ee 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
@@ -25,7 +25,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -38,7 +37,9 @@ import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
import static
org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType.ZETA;
@@ -119,55 +120,32 @@ public class SQLTransform extends
AbstractCatalogSupportTransform {
tryOpen();
List<String> inputColumnsMapping = new ArrayList<>();
SeaTunnelRowType outRowType =
sqlEngine.typeMapping(inputColumnsMapping);
+ List<String> outputColumns = Arrays.asList(outRowType.getFieldNames());
TableSchema.Builder builder = TableSchema.builder();
- if (inputCatalogTable.getTableSchema().getPrimaryKey() != null) {
- List<String> outPkColumnNames = new ArrayList<>();
- for (String pkColumnName :
-
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames()) {
- for (int i = 0; i < inputColumnsMapping.size(); i++) {
- if (pkColumnName.equals(inputColumnsMapping.get(i))) {
- outPkColumnNames.add(outRowType.getFieldName(i));
- }
- }
- }
- if (!outPkColumnNames.isEmpty()) {
- builder.primaryKey(
- PrimaryKey.of(
-
inputCatalogTable.getTableSchema().getPrimaryKey().getPrimaryKey(),
- outPkColumnNames));
- }
- }
- if (inputCatalogTable.getTableSchema().getConstraintKeys() != null) {
- List<ConstraintKey> outConstraintKey = new ArrayList<>();
- for (ConstraintKey constraintKey :
- inputCatalogTable.getTableSchema().getConstraintKeys()) {
- List<ConstraintKey.ConstraintKeyColumn>
outConstraintColumnKeys = new ArrayList<>();
- for (ConstraintKey.ConstraintKeyColumn constraintKeyColumn :
- constraintKey.getColumnNames()) {
- String constraintColumnName =
constraintKeyColumn.getColumnName();
- for (int i = 0; i < inputColumnsMapping.size(); i++) {
- if
(constraintColumnName.equals(inputColumnsMapping.get(i))) {
- outConstraintColumnKeys.add(
- ConstraintKey.ConstraintKeyColumn.of(
- outRowType.getFieldName(i),
-
constraintKeyColumn.getSortType()));
- }
- }
- }
- if (!outConstraintColumnKeys.isEmpty()) {
- outConstraintKey.add(
- ConstraintKey.of(
- constraintKey.getConstraintType(),
- constraintKey.getConstraintName(),
- outConstraintColumnKeys));
- }
- }
- if (!outConstraintKey.isEmpty()) {
- builder.constraintKey(outConstraintKey);
- }
+ if (inputCatalogTable.getTableSchema().getPrimaryKey() != null
+ && outputColumns.containsAll(
+
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames())) {
+ builder =
builder.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey().copy());
}
+ List<ConstraintKey> outputConstraintKeys =
+ inputCatalogTable.getTableSchema().getConstraintKeys().stream()
+ .filter(
+ key -> {
+ List<String> constraintColumnNames =
+ key.getColumnNames().stream()
+ .map(
+
ConstraintKey.ConstraintKeyColumn
+
::getColumnName)
+
.collect(Collectors.toList());
+ return
outputColumns.containsAll(constraintColumnNames);
+ })
+ .map(ConstraintKey::copy)
+ .collect(Collectors.toList());
+
+ builder = builder.constraintKey(outputConstraintKeys);
+
String[] fieldNames = outRowType.getFieldNames();
SeaTunnelDataType<?>[] fieldTypes = outRowType.getFieldTypes();
List<Column> columns = new ArrayList<>(fieldNames.length);