luoyuxia commented on code in PR #22525:
URL: https://github.com/apache/flink/pull/22525#discussion_r1195177400
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -647,15 +649,51 @@ private InternalTypeInfo<RowData> getInputTypeInfo() {
}
private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema
schema) {
- return schema.getPrimaryKey()
- .map(k ->
k.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray())
- .orElse(new int[0]);
+ if (schema.getPrimaryKey().isPresent()) {
+ UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get();
+ int[] primaryKeyIndices = new
int[uniqueConstraint.getColumns().size()];
+ // sinkRowType may not contain full primary keys in case of
row-level update or delete.
+ // in such case, return an empty array
+ for (int i = 0; i < uniqueConstraint.getColumns().size(); i++) {
+ int fieldIndex =
sinkRowType.getFieldIndex(uniqueConstraint.getColumns().get(i));
+ if (fieldIndex == -1) {
+ return new int[0];
+ }
+ primaryKeyIndices[i] = fieldIndex;
+ }
+ return primaryKeyIndices;
+ } else {
+ return new int[0];
+ }
}
private RowType getPhysicalRowType(ResolvedSchema schema) {
+ // row-level modification may only write partial columns
+ if (tableSinkSpec.getSinkAbilities() != null) {
+ for (SinkAbilitySpec sinkAbilitySpec :
tableSinkSpec.getSinkAbilities()) {
+ if (sinkAbilitySpec instanceof RowLevelUpdateSpec) {
+ RowLevelUpdateSpec rowLevelUpdateSpec =
(RowLevelUpdateSpec) sinkAbilitySpec;
+ return getPhysicalRowType(schema,
rowLevelUpdateSpec.getRequireColumnIndices());
+ } else if (sinkAbilitySpec instanceof RowLevelDeleteSpec) {
+ RowLevelDeleteSpec rowLevelDeleteSpec =
(RowLevelDeleteSpec) sinkAbilitySpec;
+ return getPhysicalRowType(
+ schema,
rowLevelDeleteSpec.getRequiredPhysicalColumnIndices());
Review Comment:
There're many places use `getProducedType` in the source ability spec, but
seems it's the only place to use something like `Optional<RowType>
getConsumedType`. I think we can just limit it in here to avoid think too much
to early and consider to expose in `SinkabilitySpec` if we found it'll be
needed in many places in the future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]