raminqaf commented on code in PR #28235:
URL: https://github.com/apache/flink/pull/28235#discussion_r3311155353


##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,76 @@ SELECT * FROM TO_CHANGELOG(
 -- UPDATE_BEFORE is dropped (not in the mapping)
 ```
 
+#### Delete handling
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and 
what the planner requires from the input. The matrix below shows each 
combination with `PARTITION BY` (set semantics) and without (row semantics).
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert 
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted 
upstream to materialize the full pre-image from state. For sources that already 
emit a full pre-image (e.g. retract), the flag is a no-op. The function then 
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
+```
+
+**Set semantics** (`PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
+```
+
+##### `produces_full_deletes => false`
+
+The planner skips `ChangelogNormalize` and the function emits partial DELETE 
rows. This avoids the stateful normalization operator for upsert sources (e.g. 
Kafka compacted topics) where the full pre-image is not needed downstream. 
Requires an upsert key (row semantics) or `PARTITION BY` (set semantics); 
otherwise the call is rejected with a validation error.
+
+**Row semantics** (no `PARTITION BY`): the function preserves the 
planner-derived upsert key columns on DELETE rows and nulls the rest. The 
upsert key is typically a declared `PRIMARY KEY`.
+
+```sql
+-- Upsert source with PRIMARY KEY (id): -D[id:5] (key-only).
+-- Output: +I[op:'DELETE', id:5, name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, produces_full_deletes 
=> false)
+
+-- Retract source with PRIMARY KEY (id): -D[id:5, name:'Alice'] (full 
pre-image).
+-- Output: +I[op:'DELETE', id:5, name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source, 
produces_full_deletes => false)
+```
+
+**Set semantics** (`PARTITION BY`): the function preserves the partition key 
and nulls every non-partition-key column on DELETE rows. This matches the shape 
expected by upsert sinks and Kafka compacted topics.

Review Comment:
   done



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -296,17 +296,19 @@ This is useful when you need to materialize changelog 
events into a downstream s
 SELECT * FROM TO_CHANGELOG(
   input => TABLE source_table [PARTITION BY key_col],
   [op => DESCRIPTOR(op_column_name),]
-  [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
+  [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...],]
+  [produces_full_deletes => BOOLEAN]
 )
 ```
 
 ### Parameters
 
-| Parameter    | Required | Description                                        
                                                                                
                                                                                
                                                                                
                                                      |
-|:-------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `input`      | Yes      | The input table. With `PARTITION BY`, rows with 
the same key are co-located and run in the same operator instance. Without 
`PARTITION BY`, each row is processed independently. Accepts insert-only, 
retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key 
should match or be a subset of the upsert key of the subquery.                  
                    |
-| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`.                                        
                                                                                
                                                                                
                                                        |
-| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping change operation 
names to custom output codes. Keys can contain comma-separated names to map 
multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When 
provided, only mapped operations are forwarded - unmapped events are dropped. 
Each change operation may appear at most once across all entries. |
+| Parameter               | Required | Description                             
                                                                                
                                                                                
                                                                                
                                                                 |
+|:------------------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `input`                 | Yes      | The input table. With `PARTITION BY`, 
rows with the same key are co-located and run in the same operator instance. 
Without `PARTITION BY`, each row is processed independently. Accepts 
insert-only, retract, and upsert tables. For upsert tables, the provided 
`PARTITION BY` key should match or be a subset of the upsert key of the 
subquery.       |
+| `op`                    | No       | A `DESCRIPTOR` with a single column 
name for the operation code column. Defaults to `op`.                           
                                                                                
                                                                                
                                                                     |
+| `op_mapping`            | No       | A `MAP<STRING, STRING>` mapping change 
operation names to custom output codes. Keys can contain comma-separated names 
to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). 
When provided, only mapped operations are forwarded - unmapped events are 
dropped. Each change operation may appear at most once across all entries. |
+| `produces_full_deletes` | No       | A `BOOLEAN` literal that controls how 
DELETE rows are emitted. When `true` (default), the function requires 
fully-populated DELETE rows from the input. The planner inserts a 
`ChangelogNormalize` operator for upsert sources that emit key-only deletes, so 
downstream sees the full pre-image on DELETE. When `false`, the function 
instead emits partial DELETE rows: in row semantics it preserves the 
planner-derived upsert key columns of the input and nulls the rest; in set 
semantics (`PARTITION BY`) it preserves the partition key and nulls the rest. 
Requires that the input declares an upsert key or that the call uses `PARTITION 
BY`; otherwise the function has no identifying columns to preserve and the call 
is rejected. |

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to