gustavodemorais commented on code in PR #28235:
URL: https://github.com/apache/flink/pull/28235#discussion_r3289465994


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java:
##########
@@ -85,6 +100,33 @@ public ToChangelogFunction(final SpecializedContext 
context) {
             validateOpMap(this.rawOpMap, tableSemantics);
         }
         this.outputIndices = 
ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics);
+        this.inputRowType = (RowType) 
tableSemantics.dataType().getLogicalType();
+        final boolean producesFullDeletes =
+                callContext.getArgumentValue(3, Boolean.class).orElse(false);
+        final int[] upsertKeys = tableSemantics.upsertKeyColumns();
+        final boolean hasPartitionBy = 
tableSemantics.partitionByColumns().length > 0;
+        // We emit partial deletes when the caller did not ask for full ones 
and we know which
+        // input columns identify a row. The framework prepends partition-key 
columns to the
+        // output without consulting the function, so in set semantics 
partition keys are
+        // preserved on DELETE rows for free. In row semantics we rely on the 
input table's
+        // upsert key to preserve identifying columns on DELETE; without one 
we cannot null
+        // anything safely and pass the input through unchanged.
+        this.nullPayloadOnDelete =
+                !producesFullDeletes && (hasPartitionBy || upsertKeys.length > 
0);

Review Comment:
   Also, do we want to throw a nice validation error if the user asks for 
producesFullDeletes and we can't produce it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to