luoyuxia commented on code in PR #22525:
URL: https://github.com/apache/flink/pull/22525#discussion_r1195872817


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -647,15 +649,51 @@ private InternalTypeInfo<RowData> getInputTypeInfo() {
     }
 
     private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema 
schema) {
-        return schema.getPrimaryKey()
-                .map(k -> 
k.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray())
-                .orElse(new int[0]);
+        if (schema.getPrimaryKey().isPresent()) {
+            UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get();
+            int[] primaryKeyIndices = new 
int[uniqueConstraint.getColumns().size()];
+            // sinkRowType may not contain full primary keys in case of 
row-level update or delete.
+            // in such case, return an empty array
+            for (int i = 0; i < uniqueConstraint.getColumns().size(); i++) {
+                int fieldIndex = 
sinkRowType.getFieldIndex(uniqueConstraint.getColumns().get(i));
+                if (fieldIndex == -1) {
+                    return new int[0];
+                }

Review Comment:
   We use primary keys as shuffle keys, but they still are primary keys which 
maybe used for other purposes in the `CommonExecSink`.  If not all primary keys 
are available, we should consider it as no primary keys which is the semantic 
of primary keys.
   Btw, I have moved such logic to `BatchExecSink` to make every clear since 
only update statement which is only supported in batch  needs to consider it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to