luoyuxia commented on code in PR #22525:
URL: https://github.com/apache/flink/pull/22525#discussion_r1195872817
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -647,15 +649,51 @@ private InternalTypeInfo<RowData> getInputTypeInfo() {
}
private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema
schema) {
- return schema.getPrimaryKey()
- .map(k ->
k.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray())
- .orElse(new int[0]);
+ if (schema.getPrimaryKey().isPresent()) {
+ UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get();
+ int[] primaryKeyIndices = new
int[uniqueConstraint.getColumns().size()];
+ // sinkRowType may not contain full primary keys in case of
row-level update or delete.
+ // in such case, return an empty array
+ for (int i = 0; i < uniqueConstraint.getColumns().size(); i++) {
+ int fieldIndex =
sinkRowType.getFieldIndex(uniqueConstraint.getColumns().get(i));
+ if (fieldIndex == -1) {
+ return new int[0];
+ }
Review Comment:
We use primary keys as shuffle keys, but they still are primary keys which
maybe used for other purposes in the `CommonExecSink`. If not all primary keys
are available, we should consider it as no primary keys which is the semantic
of primary keys.
Btw, I have moved such logic to `BatchExecSink` to make every clear since
only update statement which is only supported in batch needs to consider it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]