raminqaf commented on code in PR #28235:
URL: https://github.com/apache/flink/pull/28235#discussion_r3311155353
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,76 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```
+#### Delete handling
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and
what the planner requires from the input. The matrix below shows each
combination with `PARTITION BY` (set semantics) and without (row semantics).
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted
upstream to materialize the full pre-image from state. For sources that already
emit a full pre-image (e.g. retract), the flag is a no-op. The function then
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
+```
+
+**Set semantics** (`PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
+```
+
+##### `produces_full_deletes => false`
+
+The planner skips `ChangelogNormalize` and the function emits partial DELETE
rows. This avoids the stateful normalization operator for upsert sources (e.g.
Kafka compacted topics) where the full pre-image is not needed downstream.
Requires an upsert key (row semantics) or `PARTITION BY` (set semantics);
otherwise the call is rejected with a validation error.
+
+**Row semantics** (no `PARTITION BY`): the function preserves the
planner-derived upsert key columns on DELETE rows and nulls the rest. The
upsert key is typically a declared `PRIMARY KEY`.
+
+```sql
+-- Upsert source with PRIMARY KEY (id): -D[id:5] (key-only).
+-- Output: +I[op:'DELETE', id:5, name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, produces_full_deletes
=> false)
+
+-- Retract source with PRIMARY KEY (id): -D[id:5, name:'Alice'] (full
pre-image).
+-- Output: +I[op:'DELETE', id:5, name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source,
produces_full_deletes => false)
+```
+
+**Set semantics** (`PARTITION BY`): the function preserves the partition key
and nulls every non-partition-key column on DELETE rows. This matches the shape
expected by upsert sinks and Kafka compacted topics.
Review Comment:
done
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -296,17 +296,19 @@ This is useful when you need to materialize changelog
events into a downstream s
SELECT * FROM TO_CHANGELOG(
input => TABLE source_table [PARTITION BY key_col],
[op => DESCRIPTOR(op_column_name),]
- [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
+ [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...],]
+ [produces_full_deletes => BOOLEAN]
)
```
### Parameters
-| Parameter | Required | Description
|
-|:-------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `input` | Yes | The input table. With `PARTITION BY`, rows with
the same key are co-located and run in the same operator instance. Without
`PARTITION BY`, each row is processed independently. Accepts insert-only,
retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key
should match or be a subset of the upsert key of the subquery.
|
-| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`.
|
-| `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation
names to custom output codes. Keys can contain comma-separated names to map
multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When
provided, only mapped operations are forwarded - unmapped events are dropped.
Each change operation may appear at most once across all entries. |
+| Parameter | Required | Description
|
+|:------------------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `input` | Yes | The input table. With `PARTITION BY`,
rows with the same key are co-located and run in the same operator instance.
Without `PARTITION BY`, each row is processed independently. Accepts
insert-only, retract, and upsert tables. For upsert tables, the provided
`PARTITION BY` key should match or be a subset of the upsert key of the
subquery. |
+| `op` | No | A `DESCRIPTOR` with a single column
name for the operation code column. Defaults to `op`.
|
+| `op_mapping` | No | A `MAP<STRING, STRING>` mapping change
operation names to custom output codes. Keys can contain comma-separated names
to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`).
When provided, only mapped operations are forwarded - unmapped events are
dropped. Each change operation may appear at most once across all entries. |
+| `produces_full_deletes` | No | A `BOOLEAN` literal that controls how
DELETE rows are emitted. When `true` (default), the function requires
fully-populated DELETE rows from the input. The planner inserts a
`ChangelogNormalize` operator for upsert sources that emit key-only deletes, so
downstream sees the full pre-image on DELETE. When `false`, the function
instead emits partial DELETE rows: in row semantics it preserves the
planner-derived upsert key columns of the input and nulls the rest; in set
semantics (`PARTITION BY`) it preserves the partition key and nulls the rest.
Requires that the input declares an upsert key or that the call uses `PARTITION
BY`; otherwise the function has no identifying columns to preserve and the call
is rejected. |
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]