raminqaf commented on code in PR #28235: URL: https://github.com/apache/flink/pull/28235#discussion_r3311153580
########## docs/content/docs/sql/reference/queries/changelog.md: ########## @@ -397,6 +399,76 @@ SELECT * FROM TO_CHANGELOG( -- UPDATE_BEFORE is dropped (not in the mapping) ``` +#### Delete handling + +The `produces_full_deletes` argument controls how DELETE rows are emitted and what the planner requires from the input. The matrix below shows each combination with `PARTITION BY` (set semantics) and without (row semantics). + +##### `produces_full_deletes => true` (default) + +The planner requires fully-populated DELETE rows on the input. For upsert sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted upstream to materialize the full pre-image from state. For sources that already emit a full pre-image (e.g. retract), the flag is a no-op. The function then passes the input row through unchanged on DELETE. + +**Row semantics** (no `PARTITION BY`): + +```sql +-- Upsert source: -D[id:5] (key-only). +-- ChangelogNormalize materializes the full pre-image from state. +-- Output: +I[op:'DELETE', id:5, name:'Alice'] +SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source) + +-- Retract source: -D[id:5, name:'Alice'] (full pre-image). +-- No ChangelogNormalize inserted; the input row is passed through unchanged. +-- Output: +I[op:'DELETE', id:5, name:'Alice'] +SELECT * FROM TO_CHANGELOG(input => TABLE retract_source) +``` + +**Set semantics** (`PARTITION BY`): + +```sql +-- Upsert source: -D[id:5] (key-only). +-- ChangelogNormalize materializes the full pre-image from state. +-- Output: +I[id:5, op:'DELETE', name:'Alice'] +SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id) + +-- Retract source: -D[id:5, name:'Alice'] (full pre-image). +-- No ChangelogNormalize inserted; the input row is passed through unchanged. +-- Output: +I[id:5, op:'DELETE', name:'Alice'] +SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id) +``` + +##### `produces_full_deletes => false` + +The planner skips `ChangelogNormalize` and the function emits partial DELETE rows. This avoids the stateful normalization operator for upsert sources (e.g. Kafka compacted topics) where the full pre-image is not needed downstream. Requires an upsert key (row semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected with a validation error. Review Comment: Added a section for upsertKey ########## docs/content/docs/sql/reference/queries/changelog.md: ########## @@ -397,6 +399,76 @@ SELECT * FROM TO_CHANGELOG( -- UPDATE_BEFORE is dropped (not in the mapping) ``` +#### Delete handling + +The `produces_full_deletes` argument controls how DELETE rows are emitted and what the planner requires from the input. The matrix below shows each combination with `PARTITION BY` (set semantics) and without (row semantics). + +##### `produces_full_deletes => true` (default) + +The planner requires fully-populated DELETE rows on the input. For upsert sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted upstream to materialize the full pre-image from state. For sources that already emit a full pre-image (e.g. retract), the flag is a no-op. The function then passes the input row through unchanged on DELETE. + +**Row semantics** (no `PARTITION BY`): + +```sql +-- Upsert source: -D[id:5] (key-only). +-- ChangelogNormalize materializes the full pre-image from state. +-- Output: +I[op:'DELETE', id:5, name:'Alice'] +SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source) + +-- Retract source: -D[id:5, name:'Alice'] (full pre-image). +-- No ChangelogNormalize inserted; the input row is passed through unchanged. +-- Output: +I[op:'DELETE', id:5, name:'Alice'] +SELECT * FROM TO_CHANGELOG(input => TABLE retract_source) +``` + +**Set semantics** (`PARTITION BY`): + +```sql +-- Upsert source: -D[id:5] (key-only). +-- ChangelogNormalize materializes the full pre-image from state. +-- Output: +I[id:5, op:'DELETE', name:'Alice'] +SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id) + +-- Retract source: -D[id:5, name:'Alice'] (full pre-image). +-- No ChangelogNormalize inserted; the input row is passed through unchanged. +-- Output: +I[id:5, op:'DELETE', name:'Alice'] +SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id) +``` + +##### `produces_full_deletes => false` + +The planner skips `ChangelogNormalize` and the function emits partial DELETE rows. This avoids the stateful normalization operator for upsert sources (e.g. Kafka compacted topics) where the full pre-image is not needed downstream. Requires an upsert key (row semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected with a validation error. + +**Row semantics** (no `PARTITION BY`): the function preserves the planner-derived upsert key columns on DELETE rows and nulls the rest. The upsert key is typically a declared `PRIMARY KEY`. Review Comment: added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
