gustavodemorais commented on code in PR #28235: URL: https://github.com/apache/flink/pull/28235#discussion_r3304882090
########## docs/content/docs/sql/reference/queries/changelog.md: ########## @@ -397,6 +399,48 @@ SELECT * FROM TO_CHANGELOG( -- UPDATE_BEFORE is dropped (not in the mapping) ``` +#### Delete handling + +The `produces_full_deletes` argument controls how DELETE rows are emitted and what the planner requires from the input. The behavior depends on whether `PARTITION BY` is used (set semantics) or not (row semantics). + +**With `produces_full_deletes => true`.** The planner requires the input to produce DELETE rows with all columns populated. For upsert sources, a `ChangelogNormalize` operator is inserted to materialize the full pre-image from state. The function then emits fully-populated DELETE rows downstream. + +```sql +-- Upsert source: -D[id:5] (key-only). +-- ChangelogNormalize materializes the full pre-image from state. +-- Output: +I[op:'DELETE', id:5, name:'Alice'] +SELECT * FROM TO_CHANGELOG( + input => TABLE upsert_source, + produces_full_deletes => true +) +``` + +**With `produces_full_deletes => false` (default).** The planner does not require fully-populated DELETE rows on the input. For upsert sources that emit key-only deletes (e.g. Kafka compacted topics), this avoids the stateful `ChangelogNormalize` operator that would otherwise materialize the full pre-image of each deleted row. + +In **row semantics** (no `PARTITION BY`), the function preserves the upsert key columns of the input on DELETE rows and nulls the rest. The upsert key is the one the planner already derives from the input (typically a declared `PRIMARY KEY`), so no `PARTITION BY` is required to get partial DELETEs. + +```sql +-- Source 't' declares PRIMARY KEY (id). Source emits -D[id:5, name:'Alice'] (full pre-image). +-- Output: +I[op:'DELETE', id:5, name:null] +SELECT * FROM TO_CHANGELOG(input => TABLE t) +``` + +When the input has no derivable upsert key (e.g. a pure append-only source, or an upstream operator that erased the key), there is no identifying column to preserve. The function then passes the input through unchanged. + +```sql +-- Source emits -D[id:5] (key-only, no declared key). +-- Output: +I[op:'DELETE', id:5, name:null] +SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source) +``` Review Comment: The example here is not matching what you want to show. You mention it passes through unchanged but the example does name:null -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
