gustavodemorais commented on code in PR #28235:
URL: https://github.com/apache/flink/pull/28235#discussion_r3312237978


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -157,7 +157,7 @@ public class ToChangelogTestPrograms {
     public static final TableTestProgram UPSERT =
             TableTestProgram.of(
                             "to-changelog-upsert-input",
-                            "upsert input gets ChangelogNormalize for 
UPDATE_BEFORE and full deletes")
+                            "upsert input in row semantics gets 
ChangelogNormalize for UPDATE_BEFORE and emits partial deletes")

Review Comment:
   Says "emits partial deletes" but the test doesn't pass produces_full_deletes 
→ defaults to true → output is +I[DELETE, Bob, 20] (full row). The original 
"and full deletes" was right
   
   ```suggestion
                               "upsert input in row semantics gets 
ChangelogNormalize for UPDATE_BEFORE and emits full deletes")
   ```



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java:
##########
@@ -246,7 +246,8 @@ private static RuntimeTableSemantics tableSemantics() {
                 RuntimeChangelogMode.serialize(ChangelogMode.insertOnly()),
                 /* passColumnsThrough */ false,
                 /* hasSetSemantics */ true,
-                /* timeColumn */ 1);
+                /* timeColumn */ 1,
+                /* upsertKeyColumns */ new int[0]);

Review Comment:
   I think this will not compile, old type



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java:
##########
@@ -121,6 +123,10 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
     @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE)
     private final ChangelogMode outputChangelogMode;
 
+    @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS)
+    @JsonInclude(JsonInclude.Include.NON_EMPTY)
+    private final List<int[]> inputUpsertKeys;

Review Comment:
   TableSemantics is per-input - users access it via 
callContext.getTableSemantics(pos) and partitionByColumns() / changelogMode() / 
timeColumn() are already scoped to one input. 
   
   upsertKeyColumns() breaks the pattern: StreamPhysicalProcessTableFunction 
does .flatMap(List::stream) across all inputs, and 
OperatorBindingCallContext.getTableSemantics hands the whole flat list to every 
position. Single-input TO_CHANGELOG hides this (only one input, both numbers 
are 1) but any multi-input PTF would see other inputs' keys mixed in. Field 
should be List<List<int[]>> indexed by input position, then sliced like 
inputChangelogModes.get(tableArgCall.getInputIndex()).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to