gustavodemorais commented on code in PR #28235:
URL: https://github.com/apache/flink/pull/28235#discussion_r3289462696
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java:
##########
@@ -85,6 +100,33 @@ public ToChangelogFunction(final SpecializedContext
context) {
validateOpMap(this.rawOpMap, tableSemantics);
}
this.outputIndices =
ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics);
+ this.inputRowType = (RowType)
tableSemantics.dataType().getLogicalType();
+ final boolean producesFullDeletes =
+ callContext.getArgumentValue(3, Boolean.class).orElse(false);
+ final int[] upsertKeys = tableSemantics.upsertKeyColumns();
+ final boolean hasPartitionBy =
tableSemantics.partitionByColumns().length > 0;
+ // We emit partial deletes when the caller did not ask for full ones
and we know which
+ // input columns identify a row. The framework prepends partition-key
columns to the
+ // output without consulting the function, so in set semantics
partition keys are
+ // preserved on DELETE rows for free. In row semantics we rely on the
input table's
+ // upsert key to preserve identifying columns on DELETE; without one
we cannot null
+ // anything safely and pass the input through unchanged.
+ this.nullPayloadOnDelete =
+ !producesFullDeletes && (hasPartitionBy || upsertKeys.length >
0);
Review Comment:
Move this to a small function so that the constructor is still easy to read
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]