The GitHub Actions job "Nightly (beta)" on flink.git/master has failed.
Run started by GitHub user github-actions[bot] (triggered by 
github-actions[bot]).

Head commit for run:
4644cabd8bfed1104a6a1dbea854fe45a90877cd / Ramin Gharib <[email protected]>
[FLINK-39538][table] Support upsert output mode for FROM_CHANGELOG (#28164)

FROM_CHANGELOG now emits an upsert changelog (INSERT, UPDATE_AFTER, full 
DELETE) when the input table is partitioned (set semantics via PARTITION BY) 
and the active op_mapping maps to UPDATE_AFTER without UPDATE_BEFORE. The 
partition key acts as the upsert key. In all other cases the output remains a 
retract changelog.

Submitting an op_mapping with UPDATE_AFTER but no UPDATE_BEFORE without 
PARTITION BY is rejected at validation time, because upsert mode requires a key.

To enable the strategy to inspect the resolved op_mapping and the input table's 
partition keys, ChangelogFunction.ChangelogContext is extended with two default 
methods: getArgumentValue(int, Class) and getTableSemantics(int). Defaults 
return Optional.empty() to preserve source compatibility for existing 
implementations.

The planner-side wrapper in FlinkChangelogModeInferenceProgram delegates the 
two new methods to the underlying CallContext.

Upsert mode uses full deletes (ChangelogMode.upsert(false)) because the runtime 
forwards each input delete row with all fields populated; only the RowKind is 
rewritten. This matches the runtime's behavior and avoids forcing downstream 
operators to reconstruct rows from state.

The upsert key derivation in FlinkRelMdUniqueKeys.getPtfUniqueKeys already 
returns the partition columns when a PTF emits upsert, so no metadata changes 
are needed.

Report URL: https://github.com/apache/flink/actions/runs/26202801035

With regards,
GitHub Actions via GitBox

Reply via email to