luoyuxia commented on code in PR #22525:
URL: https://github.com/apache/flink/pull/22525#discussion_r1195167785
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -647,15 +649,51 @@ private InternalTypeInfo<RowData> getInputTypeInfo() {
}
private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema
schema) {
- return schema.getPrimaryKey()
- .map(k ->
k.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray())
- .orElse(new int[0]);
+ if (schema.getPrimaryKey().isPresent()) {
+ UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get();
+ int[] primaryKeyIndices = new
int[uniqueConstraint.getColumns().size()];
+ // sinkRowType may not contain full primary keys in case of
row-level update or delete.
+ // in such case, return an empty array
+ for (int i = 0; i < uniqueConstraint.getColumns().size(); i++) {
+ int fieldIndex =
sinkRowType.getFieldIndex(uniqueConstraint.getColumns().get(i));
+ if (fieldIndex == -1) {
+ return new int[0];
+ }
Review Comment:
In here, we do validate all avaliable pk columns.
The logic is try to find all the pk columns from the `ResolvedSchema`, but
if for one pk column, we can't find from the `ResolvedSchema`, that means the
pk columns miss some columns in which case we will consider it as no pk
columns.
Note: this case only happens in update statement, the required columns
returned by connector don't contain all pk columns.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]