raminqaf opened a new pull request, #28235:
URL: https://github.com/apache/flink/pull/28235
## What is the purpose of the change
Adds the built-in `TO_CHANGELOG` process table function (PTF) introduced in
FLIP-564. The function converts a table (insert-only, retract, or upsert) into
a changelog view that surfaces the row-kind (`INSERT`, `UPDATE_BEFORE`,
`UPDATE_AFTER`, `DELETE`) as an explicit `op` column.
A new `produces_full_deletes` boolean parameter controls how DELETE rows are
emitted. When `false` (default), the function emits partial DELETEs: the
upsert-key columns of the input table are preserved and the remaining columns
are nulled. When `true`, full DELETE rows are passed through unchanged. Partial
deletes match the contract of most upsert sinks and avoid forcing users to
retain full pre-image state.
## Brief change log
- Added `BuiltInFunctionDefinitions.TO_CHANGELOG` and the corresponding
`ToChangelogFunction` runtime in `flink-table-api-java`.
- Added `StreamPhysicalToChangelog` / `StreamExecToChangelog` exec nodes
wired through `FlinkRelMdChangelogMode` and `FlinkRelMdUpsertKeys` so the
planner picks the correct downstream changelog mode.
- Computed the input upsert key at planning time via
`FlinkRelMetadataQuery.getUpsertKeys(input)` collapsed to one candidate via
`UpsertKeyUtil.smallestKey(...)`, and threaded it through the codegen path so
the runtime knows which columns to keep on DELETE.
- Implemented row-semantics and set-semantics variants; `PARTITION BY` over
a row-semantics input forwards the partitioning column as the upsert key.
- Added compiled-plan JSON support for `StreamExecProcessTableFunction` (new
`inputUpsertKeys` field with a per-input empty-array default for back-compat
with older plans).
## Verifying this change
This change added tests and can be verified as follows:
- New unit tests for the planner pieces in `ToChangelogTest`.
- New end-to-end semantic tests in `ToChangelogSemanticTests` covering:
insert-only inputs, retract inputs with a derivable upsert key (partial
DELETE), retract inputs with `produces_full_deletes=true` (full DELETE
pass-through), `PARTITION BY` over a non-leading column, single-column upsert
key from the input PK, and row semantics where the upsert key comes from the PK
constraint rather than `PARTITION BY`.
- New restore test in `ToChangelogRestoreTest` validates the compiled-plan +
savepoint round trip for a retract source.
- Validation tests for misuse: missing `input` argument, wrong argument
type, conflicting `PARTITION BY` and upsert key.
- Run with: `mvn -pl flink-table/flink-table-planner test
-Dtest='ToChangelogTest,ToChangelogSemanticTests,ToChangelogRestoreTest'`.
Result: 50 pass, 1 skip (existing savepoint stub).
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: yes (new `BuiltInFunctionDefinitions.TO_CHANGELOG`; new
compiled-plan JSON field on `StreamExecProcessTableFunction` with a back-compat
default)
- The serializers: no
- The runtime per-record code paths (performance sensitive): yes (the
`ToChangelogFunction` operator runs per record, but it only writes the row kind
plus a projected key on DELETE; no extra state)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? docs + JavaDocs (built-in
function docs page entry plus `BuiltInFunctionDefinitions.TO_CHANGELOG` JavaDoc)
---
##### Was generative AI tooling used to co-author this PR?
- [X] Yes
Generated-by: Claude Opus 4.7
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]