fsk119 commented on code in PR #22525:
URL: https://github.com/apache/flink/pull/22525#discussion_r1194926178


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -647,15 +649,51 @@ private InternalTypeInfo<RowData> getInputTypeInfo() {
     }
 
     private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema 
schema) {
-        return schema.getPrimaryKey()
-                .map(k -> 
k.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray())
-                .orElse(new int[0]);
+        if (schema.getPrimaryKey().isPresent()) {
+            UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get();
+            int[] primaryKeyIndices = new 
int[uniqueConstraint.getColumns().size()];
+            // sinkRowType may not contain full primary keys in case of 
row-level update or delete.
+            // in such case, return an empty array
+            for (int i = 0; i < uniqueConstraint.getColumns().size(); i++) {
+                int fieldIndex = 
sinkRowType.getFieldIndex(uniqueConstraint.getColumns().get(i));
+                if (fieldIndex == -1) {
+                    return new int[0];
+                }

Review Comment:
   I think we should try our best to validate all avaliable pk columns.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java:
##########
@@ -78,6 +87,11 @@ public SupportsRowLevelDelete.RowLevelDeleteMode 
getRowLevelDeleteMode() {
         return rowLevelDeleteMode;
     }
 
+    @Nonnull
+    public int[] getRequiredPhysicalColumnIndices() {
+        return requiredPhysicalColumnIndices;
+    }
+
     @Override
     public boolean equals(Object o) {

Review Comment:
   Don't forget to modify the equals method.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java:
##########
@@ -38,27 +38,36 @@
 
 /**
  * A sub-class of {@link SinkAbilitySpec} that can not only 
serialize/deserialize the row-level
- * delete mode to/from JSON, but also can delete existing data for {@link
- * org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ * delete mode & required physical column indices to/from JSON, but also can 
delete existing data
+ * for {@link 
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @JsonTypeName("RowLevelDelete")
 public class RowLevelDeleteSpec implements SinkAbilitySpec {
     public static final String FIELD_NAME_ROW_LEVEL_DELETE_MODE = 
"rowLevelDeleteMode";
+    public static final String FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES =
+            "requiredPhysicalColumnIndices";

Review Comment:
   `ProjectPushDownSpec` also contains an array to mark the projection. I think 
`requiredPhysicalColumn` is enough.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -647,15 +649,51 @@ private InternalTypeInfo<RowData> getInputTypeInfo() {
     }
 
     private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema 
schema) {
-        return schema.getPrimaryKey()
-                .map(k -> 
k.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray())
-                .orElse(new int[0]);
+        if (schema.getPrimaryKey().isPresent()) {
+            UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get();
+            int[] primaryKeyIndices = new 
int[uniqueConstraint.getColumns().size()];
+            // sinkRowType may not contain full primary keys in case of 
row-level update or delete.
+            // in such case, return an empty array
+            for (int i = 0; i < uniqueConstraint.getColumns().size(); i++) {
+                int fieldIndex = 
sinkRowType.getFieldIndex(uniqueConstraint.getColumns().get(i));
+                if (fieldIndex == -1) {
+                    return new int[0];
+                }
+                primaryKeyIndices[i] = fieldIndex;
+            }
+            return primaryKeyIndices;
+        } else {
+            return new int[0];
+        }
     }
 
     private RowType getPhysicalRowType(ResolvedSchema schema) {
+        // row-level modification may only write partial columns
+        if (tableSinkSpec.getSinkAbilities() != null) {
+            for (SinkAbilitySpec sinkAbilitySpec : 
tableSinkSpec.getSinkAbilities()) {
+                if (sinkAbilitySpec instanceof RowLevelUpdateSpec) {
+                    RowLevelUpdateSpec rowLevelUpdateSpec = 
(RowLevelUpdateSpec) sinkAbilitySpec;
+                    return getPhysicalRowType(schema, 
rowLevelUpdateSpec.getRequireColumnIndices());
+                } else if (sinkAbilitySpec instanceof RowLevelDeleteSpec) {
+                    RowLevelDeleteSpec rowLevelDeleteSpec = 
(RowLevelDeleteSpec) sinkAbilitySpec;
+                    return getPhysicalRowType(
+                            schema, 
rowLevelDeleteSpec.getRequiredPhysicalColumnIndices());

Review Comment:
   I just wonder whether it's better to introduce a method like 
`Optional<RowType> getConsumedType` for this sink ability spec. Actually, we 
already have `getProducedType` in the source ability spec.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java:
##########
@@ -78,6 +87,11 @@ public SupportsRowLevelDelete.RowLevelDeleteMode 
getRowLevelDeleteMode() {
         return rowLevelDeleteMode;
     }
 
+    @Nonnull

Review Comment:
   remove the annotation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to