gustavodemorais commented on code in PR #28235:
URL: https://github.com/apache/flink/pull/28235#discussion_r3312237978
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -157,7 +157,7 @@ public class ToChangelogTestPrograms {
public static final TableTestProgram UPSERT =
TableTestProgram.of(
"to-changelog-upsert-input",
- "upsert input gets ChangelogNormalize for
UPDATE_BEFORE and full deletes")
+ "upsert input in row semantics gets
ChangelogNormalize for UPDATE_BEFORE and emits partial deletes")
Review Comment:
Says "emits partial deletes" but the test doesn't pass produces_full_deletes
→ defaults to true → output is +I[DELETE, Bob, 20] (full row). The original
"and full deletes" was right
```suggestion
"upsert input in row semantics gets
ChangelogNormalize for UPDATE_BEFORE and emits full deletes")
```
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java:
##########
@@ -246,7 +246,8 @@ private static RuntimeTableSemantics tableSemantics() {
RuntimeChangelogMode.serialize(ChangelogMode.insertOnly()),
/* passColumnsThrough */ false,
/* hasSetSemantics */ true,
- /* timeColumn */ 1);
+ /* timeColumn */ 1,
+ /* upsertKeyColumns */ new int[0]);
Review Comment:
I think this will not compile, old type
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java:
##########
@@ -121,6 +123,10 @@ public class StreamExecProcessTableFunction extends
ExecNodeBase<RowData>
@JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE)
private final ChangelogMode outputChangelogMode;
+ @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS)
+ @JsonInclude(JsonInclude.Include.NON_EMPTY)
+ private final List<int[]> inputUpsertKeys;
Review Comment:
TableSemantics is per-input - users access it via
callContext.getTableSemantics(pos) and partitionByColumns() / changelogMode() /
timeColumn() are already scoped to one input.
upsertKeyColumns() breaks the pattern: StreamPhysicalProcessTableFunction
does .flatMap(List::stream) across all inputs, and
OperatorBindingCallContext.getTableSemantics hands the whole flat list to every
position. Single-input TO_CHANGELOG hides this (only one input, both numbers
are 1) but any multi-input PTF would see other inputs' keys mixed in. Field
should be List<List<int[]>> indexed by input position, then sliced like
inputChangelogModes.get(tableArgCall.getInputIndex()).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]