luoyuxia commented on code in PR #22525:
URL: https://github.com/apache/flink/pull/22525#discussion_r1195177400


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -647,15 +649,51 @@ private InternalTypeInfo<RowData> getInputTypeInfo() {
     }
 
     private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema 
schema) {
-        return schema.getPrimaryKey()
-                .map(k -> 
k.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray())
-                .orElse(new int[0]);
+        if (schema.getPrimaryKey().isPresent()) {
+            UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get();
+            int[] primaryKeyIndices = new 
int[uniqueConstraint.getColumns().size()];
+            // sinkRowType may not contain full primary keys in case of 
row-level update or delete.
+            // in such case, return an empty array
+            for (int i = 0; i < uniqueConstraint.getColumns().size(); i++) {
+                int fieldIndex = 
sinkRowType.getFieldIndex(uniqueConstraint.getColumns().get(i));
+                if (fieldIndex == -1) {
+                    return new int[0];
+                }
+                primaryKeyIndices[i] = fieldIndex;
+            }
+            return primaryKeyIndices;
+        } else {
+            return new int[0];
+        }
     }
 
     private RowType getPhysicalRowType(ResolvedSchema schema) {
+        // row-level modification may only write partial columns
+        if (tableSinkSpec.getSinkAbilities() != null) {
+            for (SinkAbilitySpec sinkAbilitySpec : 
tableSinkSpec.getSinkAbilities()) {
+                if (sinkAbilitySpec instanceof RowLevelUpdateSpec) {
+                    RowLevelUpdateSpec rowLevelUpdateSpec = 
(RowLevelUpdateSpec) sinkAbilitySpec;
+                    return getPhysicalRowType(schema, 
rowLevelUpdateSpec.getRequireColumnIndices());
+                } else if (sinkAbilitySpec instanceof RowLevelDeleteSpec) {
+                    RowLevelDeleteSpec rowLevelDeleteSpec = 
(RowLevelDeleteSpec) sinkAbilitySpec;
+                    return getPhysicalRowType(
+                            schema, 
rowLevelDeleteSpec.getRequiredPhysicalColumnIndices());

Review Comment:
   There're many places use `getProducedType` in the source ability spec, but 
seems it's the only place to use something like `Optional<RowType> 
getConsumedType`.  I think we can just limit it in here to avoid think too much 
to early  and consider to expose in `SinkabilitySpec` if we found it'll be 
needed in many places in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to