luoyuxia commented on code in PR #22525:
URL: https://github.com/apache/flink/pull/22525#discussion_r1195167785


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -647,15 +649,51 @@ private InternalTypeInfo<RowData> getInputTypeInfo() {
     }
 
     private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema 
schema) {
-        return schema.getPrimaryKey()
-                .map(k -> 
k.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray())
-                .orElse(new int[0]);
+        if (schema.getPrimaryKey().isPresent()) {
+            UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get();
+            int[] primaryKeyIndices = new 
int[uniqueConstraint.getColumns().size()];
+            // sinkRowType may not contain full primary keys in case of 
row-level update or delete.
+            // in such case, return an empty array
+            for (int i = 0; i < uniqueConstraint.getColumns().size(); i++) {
+                int fieldIndex = 
sinkRowType.getFieldIndex(uniqueConstraint.getColumns().get(i));
+                if (fieldIndex == -1) {
+                    return new int[0];
+                }

Review Comment:
   In here, we do validate all avaliable pk columns.
   The logic is try to find all the pk columns from the `ResolvedSchema`, but 
if for one pk column, we can't find from the `ResolvedSchema`,  that means the 
pk columns  miss some columns in which case we will consider it as no pk 
columns.
   Note: this case only happens in update statement, the required columns 
returned by connector don't contain all pk columns.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to