fsk119 commented on code in PR #22525:
URL: https://github.com/apache/flink/pull/22525#discussion_r1195847470


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java:
##########
@@ -78,6 +87,11 @@ public SupportsRowLevelDelete.RowLevelDeleteMode 
getRowLevelDeleteMode() {
         return rowLevelDeleteMode;
     }
 
+    @Nonnull

Review Comment:
   Actually in the code style, it suggests it's not a necessary behaviour to 
mark the field not null.  BTW, it's very verbose if we mark every method's 
return type not null[1].
   
   [1] 
https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#nullability-of-the-mutable-parts



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -647,15 +649,51 @@ private InternalTypeInfo<RowData> getInputTypeInfo() {
     }
 
     private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema 
schema) {
-        return schema.getPrimaryKey()
-                .map(k -> 
k.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray())
-                .orElse(new int[0]);
+        if (schema.getPrimaryKey().isPresent()) {
+            UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get();
+            int[] primaryKeyIndices = new 
int[uniqueConstraint.getColumns().size()];
+            // sinkRowType may not contain full primary keys in case of 
row-level update or delete.
+            // in such case, return an empty array
+            for (int i = 0; i < uniqueConstraint.getColumns().size(); i++) {
+                int fieldIndex = 
sinkRowType.getFieldIndex(uniqueConstraint.getColumns().get(i));
+                if (fieldIndex == -1) {
+                    return new int[0];
+                }
+                primaryKeyIndices[i] = fieldIndex;
+            }
+            return primaryKeyIndices;
+        } else {
+            return new int[0];
+        }
     }
 
     private RowType getPhysicalRowType(ResolvedSchema schema) {
+        // row-level modification may only write partial columns
+        if (tableSinkSpec.getSinkAbilities() != null) {
+            for (SinkAbilitySpec sinkAbilitySpec : 
tableSinkSpec.getSinkAbilities()) {
+                if (sinkAbilitySpec instanceof RowLevelUpdateSpec) {
+                    RowLevelUpdateSpec rowLevelUpdateSpec = 
(RowLevelUpdateSpec) sinkAbilitySpec;
+                    return getPhysicalRowType(schema, 
rowLevelUpdateSpec.getRequireColumnIndices());
+                } else if (sinkAbilitySpec instanceof RowLevelDeleteSpec) {
+                    RowLevelDeleteSpec rowLevelDeleteSpec = 
(RowLevelDeleteSpec) sinkAbilitySpec;
+                    return getPhysicalRowType(
+                            schema, 
rowLevelDeleteSpec.getRequiredPhysicalColumnIndices());

Review Comment:
   Fine. We can modify this if we needed in the future.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -647,15 +649,51 @@ private InternalTypeInfo<RowData> getInputTypeInfo() {
     }
 
     private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema 
schema) {
-        return schema.getPrimaryKey()
-                .map(k -> 
k.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray())
-                .orElse(new int[0]);
+        if (schema.getPrimaryKey().isPresent()) {
+            UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get();
+            int[] primaryKeyIndices = new 
int[uniqueConstraint.getColumns().size()];
+            // sinkRowType may not contain full primary keys in case of 
row-level update or delete.
+            // in such case, return an empty array
+            for (int i = 0; i < uniqueConstraint.getColumns().size(); i++) {
+                int fieldIndex = 
sinkRowType.getFieldIndex(uniqueConstraint.getColumns().get(i));
+                if (fieldIndex == -1) {
+                    return new int[0];
+                }

Review Comment:
   I mean why we don't return all available positions here?
   
   ```
     uniqueConstraint.getColumns().stream()
                       .mapToInt(sinkRowType::getFieldIndex)
                       .filter(i -> i == -1)
                       .toArray();
   ```
   
   I find the indcies is used as shuffle key, in the update statement we don't 
need shuffle?
   
   



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java:
##########
@@ -38,27 +38,36 @@
 
 /**
  * A sub-class of {@link SinkAbilitySpec} that can not only 
serialize/deserialize the row-level
- * delete mode to/from JSON, but also can delete existing data for {@link
- * org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ * delete mode & required physical column indices to/from JSON, but also can 
delete existing data
+ * for {@link 
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @JsonTypeName("RowLevelDelete")
 public class RowLevelDeleteSpec implements SinkAbilitySpec {
     public static final String FIELD_NAME_ROW_LEVEL_DELETE_MODE = 
"rowLevelDeleteMode";
+    public static final String FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES =
+            "requiredPhysicalColumnIndices";

Review Comment:
   oh. I mean it's better to rename to `requiredPhysicalColumn` to align the 
name behaviour. It's fine to use `requiredPhysicalColumnIndices`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to