raminqaf commented on code in PR #28235:
URL: https://github.com/apache/flink/pull/28235#discussion_r3310127287
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,48 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```
+#### Delete handling
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and
what the planner requires from the input. The behavior depends on whether
`PARTITION BY` is used (set semantics) or not (row semantics).
+
+**With `produces_full_deletes => true`.** The planner requires the input to
produce DELETE rows with all columns populated. For upsert sources, a
`ChangelogNormalize` operator is inserted to materialize the full pre-image
from state. The function then emits fully-populated DELETE rows downstream.
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(
+ input => TABLE upsert_source,
+ produces_full_deletes => true
+)
+```
+
+**With `produces_full_deletes => false` (default).** The planner does not
require fully-populated DELETE rows on the input. For upsert sources that emit
key-only deletes (e.g. Kafka compacted topics), this avoids the stateful
`ChangelogNormalize` operator that would otherwise materialize the full
pre-image of each deleted row.
+
+In **row semantics** (no `PARTITION BY`), the function preserves the upsert
key columns of the input on DELETE rows and nulls the rest. The upsert key is
the one the planner already derives from the input (typically a declared
`PRIMARY KEY`), so no `PARTITION BY` is required to get partial DELETEs.
+
+```sql
+-- Source 't' declares PRIMARY KEY (id). Source emits -D[id:5, name:'Alice']
(full pre-image).
+-- Output: +I[op:'DELETE', id:5, name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE t)
+```
+
+When the input has no derivable upsert key (e.g. a pure append-only source, or
an upstream operator that erased the key), there is no identifying column to
preserve. The function then passes the input through unchanged.
+
+```sql
+-- Source emits -D[id:5] (key-only, no declared key).
+-- Output: +I[op:'DELETE', id:5, name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+```
Review Comment:
I re-wrote the docs on deletion. Explaining
<table>
<tr>
<td> /
<td> <b>produces_full_deletes=true</b>
<td> <b>produces_full_deletes=false</b>
<tr>
<td> <b>Set Semantic (PARTITION BY)</b>
<td> Source emits Upsert/Retract + (No)Need ChangelogNormalizer + Output
<td> Source emits Upsert/Retract + (No)Need ChangelogNormalizer + Output
<tr>
<td> <b>Row Semantic (no PARTITION BY)</b>
<td> Source emits Upsert/Retract + (No)Need ChangelogNormalizer + Output
<td> Source emits Upsert/Retract + (No)Need ChangelogNormalizer + Output
</table>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]