twalthr commented on code in PR #28235:
URL: https://github.com/apache/flink/pull/28235#discussion_r3316605228


##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,90 @@ SELECT * FROM TO_CHANGELOG(
 -- UPDATE_BEFORE is dropped (not in the mapping)
 ```
 
+#### Upsert key
+
+An **upsert key** is a column or set of columns that uniquely identifies a row 
across its lifecycle in a changelog. It is what downstream operators and sinks 
use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to.
+
+The planner derives the upsert key from the input table:
+
+* A declared `PRIMARY KEY` on the source table when reading directly.
+* The grouping columns of an upstream `GROUP BY <key>`.
+* The keys propagated by operators that preserve them (e.g. lookup joins, 
calc-projections that keep the key columns).

Review Comment:
   ```suggestion
   * The grouping columns of an upstream `GROUP BY <key>` or `PARTITION BY 
<key>`.
   * The keys propagated by operators that preserve them (e.g. lookup joins, 
projections that keep the key columns).
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,90 @@ SELECT * FROM TO_CHANGELOG(
 -- UPDATE_BEFORE is dropped (not in the mapping)
 ```
 
+#### Upsert key
+
+An **upsert key** is a column or set of columns that uniquely identifies a row 
across its lifecycle in a changelog. It is what downstream operators and sinks 
use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to.
+
+The planner derives the upsert key from the input table:
+
+* A declared `PRIMARY KEY` on the source table when reading directly.
+* The grouping columns of an upstream `GROUP BY <key>`.
+* The keys propagated by operators that preserve them (e.g. lookup joins, 
calc-projections that keep the key columns).
+
+When no upsert key can be derived (e.g. a plain append-only source with no key 
constraint and no grouping upstream), the input has no row identity and 
downstream operators must treat it as append-only or fall back to retract 
semantics.
+
+`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve 
when emitting partial DELETE rows. See [Full vs partial 
deletes](#full-vs-partial-deletes) below.
+
+#### Full vs partial deletes
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and 
what the planner requires from the input. The matrix below shows each 
combination with `PARTITION BY` (set semantics) and without (row semantics). 
When `false`, the function relies on the input table's [upsert 
key](#upsert-key) to decide which columns to preserve.
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert 
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted 
upstream to materialize the full pre-image from state. For sources that already 
emit a full pre-image (e.g. retract), the flag is a no-op. The function then 
passes the input row through unchanged on DELETE.

Review Comment:
   ```suggestion
   The planner requires fully-populated DELETE rows on the input. For upsert 
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted 
upstream to calculate the full pre-image from state. For sources that already 
emit a full pre-image (e.g. retract), the flag is a no-op. The function then 
passes the input row through unchanged on DELETE.
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,90 @@ SELECT * FROM TO_CHANGELOG(
 -- UPDATE_BEFORE is dropped (not in the mapping)
 ```
 
+#### Upsert key
+
+An **upsert key** is a column or set of columns that uniquely identifies a row 
across its lifecycle in a changelog. It is what downstream operators and sinks 
use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to.
+
+The planner derives the upsert key from the input table:
+
+* A declared `PRIMARY KEY` on the source table when reading directly.
+* The grouping columns of an upstream `GROUP BY <key>`.
+* The keys propagated by operators that preserve them (e.g. lookup joins, 
calc-projections that keep the key columns).
+
+When no upsert key can be derived (e.g. a plain append-only source with no key 
constraint and no grouping upstream), the input has no row identity and 
downstream operators must treat it as append-only or fall back to retract 
semantics.
+
+`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve 
when emitting partial DELETE rows. See [Full vs partial 
deletes](#full-vs-partial-deletes) below.

Review Comment:
   I would drop this paragraph. Imho it doesn't contain any interesting 
information.



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,90 @@ SELECT * FROM TO_CHANGELOG(
 -- UPDATE_BEFORE is dropped (not in the mapping)
 ```
 
+#### Upsert key
+
+An **upsert key** is a column or set of columns that uniquely identifies a row 
across its lifecycle in a changelog. It is what downstream operators and sinks 
use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to.
+
+The planner derives the upsert key from the input table:
+
+* A declared `PRIMARY KEY` on the source table when reading directly.
+* The grouping columns of an upstream `GROUP BY <key>`.
+* The keys propagated by operators that preserve them (e.g. lookup joins, 
calc-projections that keep the key columns).
+
+When no upsert key can be derived (e.g. a plain append-only source with no key 
constraint and no grouping upstream), the input has no row identity and 
downstream operators must treat it as append-only or fall back to retract 
semantics.
+
+`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve 
when emitting partial DELETE rows. See [Full vs partial 
deletes](#full-vs-partial-deletes) below.
+
+#### Full vs partial deletes
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and 
what the planner requires from the input. The matrix below shows each 
combination with `PARTITION BY` (set semantics) and without (row semantics). 
When `false`, the function relies on the input table's [upsert 
key](#upsert-key) to decide which columns to preserve.
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert 
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted 
upstream to materialize the full pre-image from state. For sources that already 
emit a full pre-image (e.g. retract), the flag is a no-op. The function then 
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).

Review Comment:
   I think this would make it clearer.
   ```suggestion
   -- Upsert source: -D[id:5, name:null] (key-only).
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,90 @@ SELECT * FROM TO_CHANGELOG(
 -- UPDATE_BEFORE is dropped (not in the mapping)
 ```
 
+#### Upsert key
+
+An **upsert key** is a column or set of columns that uniquely identifies a row 
across its lifecycle in a changelog. It is what downstream operators and sinks 
use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to.
+
+The planner derives the upsert key from the input table:
+
+* A declared `PRIMARY KEY` on the source table when reading directly.
+* The grouping columns of an upstream `GROUP BY <key>`.
+* The keys propagated by operators that preserve them (e.g. lookup joins, 
calc-projections that keep the key columns).
+
+When no upsert key can be derived (e.g. a plain append-only source with no key 
constraint and no grouping upstream), the input has no row identity and 
downstream operators must treat it as append-only or fall back to retract 
semantics.
+
+`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve 
when emitting partial DELETE rows. See [Full vs partial 
deletes](#full-vs-partial-deletes) below.
+
+#### Full vs partial deletes
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and 
what the planner requires from the input. The matrix below shows each 
combination with `PARTITION BY` (set semantics) and without (row semantics). 
When `false`, the function relies on the input table's [upsert 
key](#upsert-key) to decide which columns to preserve.
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert 
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted 
upstream to materialize the full pre-image from state. For sources that already 
emit a full pre-image (e.g. retract), the flag is a no-op. The function then 
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
+```
+
+**Set semantics** (`PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
+```
+
+##### `produces_full_deletes => false`
+
+The planner skips `ChangelogNormalize` and the function emits partial DELETE 
rows. This avoids the stateful normalization operator for upsert sources (e.g. 
Kafka compacted topics) where the full pre-image is not needed downstream. 
Requires an [upsert key](#upsert-key) to be present for the input table (row 
semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected 
with a validation error.

Review Comment:
   ```suggestion
   The planner skips `ChangelogNormalize` and the function emits partial DELETE 
rows. This avoids the stateful normalization operator for upsert sources (e.g. 
Kafka compacted topics) where the full pre-image is not needed downstream. This 
requires an [upsert key](#upsert-key) to be present for the input table (row 
semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected 
with a validation error.
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java:
##########
@@ -46,39 +42,30 @@
 @Internal
 public final class ToChangelogTypeStrategy {
 
-    private static final String DEFAULT_OP_COLUMN_NAME = "op";
+    // Positional argument indexes for TO_CHANGELOG. Must match the order of 
StaticArguments
+    // registered in BuiltInFunctionDefinitions#TO_CHANGELOG; changing one 
without the other
+    // silently breaks argument resolution.
+    public static final int ARG_TABLE = 0;

Review Comment:
   ```suggestion
       public static final int ARG_INPUT = 0;
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java:
##########
@@ -128,6 +129,29 @@ public interface TableSemantics {
      */
     Optional<ChangelogMode> changelogMode();
 
+    /**
+     * Upsert key candidates derived from the passed table's metadata.
+     *
+     * <p>Returns a list of 0-based column index arrays that uniquely identify 
a row for upsert
+     * semantics. This is distinct from {@link #partitionByColumns()}: 
partition keys describe
+     * distribution and co-location, upsert keys describe row identity. Useful 
for functions that
+     * need to emit key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER 
pairs, or want to have a
+     * unique identifier to interact with state.
+     *
+     * <p>Returns an empty list when no upsert key is derivable, or when the 
planner has not yet

Review Comment:
   Instead of much text. An example might be useful here to show the difference 
between PARTITION BY and upsert key



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java:
##########
@@ -128,6 +129,29 @@ public interface TableSemantics {
      */
     Optional<ChangelogMode> changelogMode();
 
+    /**
+     * Upsert key candidates derived from the passed table's metadata.
+     *
+     * <p>Returns a list of 0-based column index arrays that uniquely identify 
a row for upsert
+     * semantics. This is distinct from {@link #partitionByColumns()}: 
partition keys describe
+     * distribution and co-location, upsert keys describe row identity. Useful 
for functions that
+     * need to emit key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER 
pairs, or want to have a
+     * unique identifier to interact with state.
+     *
+     * <p>Returns an empty list when no upsert key is derivable, or when the 
planner has not yet
+     * computed metadata (during type inference).
+     *
+     * <p>When the planner derives multiple candidate upsert keys for the same 
input (e.g., a table
+     * with several primary key constraints), all of them are returned. 
Picking which candidate to

Review Comment:
   > several primary key constraints
   
   a table can have only one primary key constraint, do we have a better 
example?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java:
##########
@@ -348,14 +348,29 @@ public CallContext toCallContext(
             @Nullable List<Integer> inputTimeColumns,
             @Nullable List<ChangelogMode> inputChangelogModes,
             @Nullable ChangelogMode outputChangelogMode) {
+        return toCallContext(
+                call, inputTimeColumns, inputChangelogModes, 
outputChangelogMode, null);
+    }
+
+    /**
+     * Variant that additionally exposes the call's input upsert keys. Used by 
the streaming codegen
+     * path so PTFs can specialize themselves on the input's row-identity 
information.
+     */
+    public CallContext toCallContext(

Review Comment:
   is the old function still used, otherwise we can drop the overload. I have 
seen Claude doing this regularly.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java:
##########
@@ -187,12 +198,63 @@ private static Optional<List<DataType>> 
validateOpMappingKeys(
         return Optional.empty();
     }
 
-    private static String resolveOpColumnName(final CallContext callContext) {
-        return callContext
-                .getArgumentValue(1, ColumnList.class)
-                .filter(cl -> !cl.getNames().isEmpty())
-                .map(cl -> cl.getNames().get(0))
-                .orElse(DEFAULT_OP_COLUMN_NAME);
+    @SuppressWarnings("rawtypes")
+    private static Optional<List<DataType>> validateProducesFullDeletes(
+            final CallContext callContext, final boolean throwOnFailure) {
+        final boolean isExplicit = 
!callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES);
+        if (!isExplicit) {
+            return Optional.empty();
+        }
+        if (!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "The 'produces_full_deletes' argument must be a constant 
BOOLEAN literal.");
+        }
+        final boolean producesFullDeletes =
+                callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES, 
Boolean.class).orElse(true);
+        if (!producesFullDeletes) {
+            return Optional.empty();
+        }
+        // The check against the input changelog mode lives in the function 
constructor since
+        // TableSemantics#changelogMode() returns empty here at type-inference 
time. The mapping
+        // check below only needs the literal op_mapping argument, so it lives 
here. Only runs
+        // when the user explicitly set produces_full_deletes=true; the 
default true is not
+        // validated since it is a safe no-op for any input.
+        final Optional<Map> opMapping = 
callContext.getArgumentValue(ARG_OP_MAPPING, Map.class);
+        if (opMapping.isPresent() && !mapsDelete((Map<String, String>) 
opMapping.get())) {

Review Comment:
   I find this check a bit confusing. If I set `produces_full_deletes => true`, 
I'm allowed to NOT contain DELETE, But if I set `produces_full_deletes => 
false`, I'm not allowed to. Remove this check all together, or 
`produces_partial_deletes => false` which defaults to false, at it would be 
common for option arguments.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -598,4 +598,192 @@ public class ToChangelogTestPrograms {
                             ValidationException.class,
                             "Duplicate change operation: 'DELETE'")
                     .build();
+
+    public static final TableTestProgram 
INVALID_PRODUCES_FULL_DELETES_FOR_APPEND_ONLY =
+            TableTestProgram.of(
+                            
"to-changelog-invalid-produces-full-deletes-for-append-only",
+                            "fails when produces_full_deletes=true on an input 
that never emits DELETE rows")
+                    .setupTableSource(SIMPLE_SOURCE)
+                    .runFailingSql(
+                            "SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t, "
+                                    + "produces_full_deletes => true)",
+                            ValidationException.class,
+                            "the input table only produces [INSERT] and never 
emits DELETE rows")
+                    .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Full vs partial deletes matrix (input kind x PARTITION BY x 
produces_full_deletes)
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram RETRACT_PRODUCES_PARTIAL_DELETES =
+            TableTestProgram.of(
+                            "to-changelog-retract-produces-partial-deletes",
+                            "retract input in row semantics with 
produces_full_deletes=false: skips ChangelogNormalize and the partial DELETE 
row from the input passes through unchanged")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")

Review Comment:
   add a test with NOT NULL on input columns
   ```suggestion
                                               "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT NOT NULL")
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java:
##########
@@ -310,17 +326,21 @@ private RuntimeTableSemantics createRuntimeTableSemantics(
 
         final int timeColumn = 
inputTimeColumns.get(tableArgCall.getInputIndex());
 
+        final int inputIndex = tableArgCall.getInputIndex();

Review Comment:
   put this variable higher and also improve the line above



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,90 @@ SELECT * FROM TO_CHANGELOG(
 -- UPDATE_BEFORE is dropped (not in the mapping)
 ```
 
+#### Upsert key
+
+An **upsert key** is a column or set of columns that uniquely identifies a row 
across its lifecycle in a changelog. It is what downstream operators and sinks 
use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to.
+
+The planner derives the upsert key from the input table:
+
+* A declared `PRIMARY KEY` on the source table when reading directly.
+* The grouping columns of an upstream `GROUP BY <key>`.
+* The keys propagated by operators that preserve them (e.g. lookup joins, 
calc-projections that keep the key columns).
+
+When no upsert key can be derived (e.g. a plain append-only source with no key 
constraint and no grouping upstream), the input has no row identity and 
downstream operators must treat it as append-only or fall back to retract 
semantics.
+
+`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve 
when emitting partial DELETE rows. See [Full vs partial 
deletes](#full-vs-partial-deletes) below.
+
+#### Full vs partial deletes
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and 
what the planner requires from the input. The matrix below shows each 
combination with `PARTITION BY` (set semantics) and without (row semantics). 
When `false`, the function relies on the input table's [upsert 
key](#upsert-key) to decide which columns to preserve.
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert 
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted 
upstream to materialize the full pre-image from state. For sources that already 
emit a full pre-image (e.g. retract), the flag is a no-op. The function then 
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.

Review Comment:
   ```suggestion
   -- ChangelogNormalize calculates the full pre-image from state.
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,90 @@ SELECT * FROM TO_CHANGELOG(
 -- UPDATE_BEFORE is dropped (not in the mapping)
 ```
 
+#### Upsert key
+
+An **upsert key** is a column or set of columns that uniquely identifies a row 
across its lifecycle in a changelog. It is what downstream operators and sinks 
use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to.
+
+The planner derives the upsert key from the input table:
+
+* A declared `PRIMARY KEY` on the source table when reading directly.
+* The grouping columns of an upstream `GROUP BY <key>`.
+* The keys propagated by operators that preserve them (e.g. lookup joins, 
calc-projections that keep the key columns).
+
+When no upsert key can be derived (e.g. a plain append-only source with no key 
constraint and no grouping upstream), the input has no row identity and 
downstream operators must treat it as append-only or fall back to retract 
semantics.
+
+`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve 
when emitting partial DELETE rows. See [Full vs partial 
deletes](#full-vs-partial-deletes) below.
+
+#### Full vs partial deletes
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and 
what the planner requires from the input. The matrix below shows each 
combination with `PARTITION BY` (set semantics) and without (row semantics). 
When `false`, the function relies on the input table's [upsert 
key](#upsert-key) to decide which columns to preserve.
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert 
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted 
upstream to materialize the full pre-image from state. For sources that already 
emit a full pre-image (e.g. retract), the flag is a no-op. The function then 
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
+```
+
+**Set semantics** (`PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
+```
+
+##### `produces_full_deletes => false`
+
+The planner skips `ChangelogNormalize` and the function emits partial DELETE 
rows. This avoids the stateful normalization operator for upsert sources (e.g. 
Kafka compacted topics) where the full pre-image is not needed downstream. 
Requires an [upsert key](#upsert-key) to be present for the input table (row 
semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected 
with a validation error.
+
+**Row semantics** (no `PARTITION BY`): the function preserves the 
planner-derived upsert key columns on DELETE rows and nulls the rest. The 
upsert key is typically a declared `PRIMARY KEY` when directly reading from a 
source or the key provided in a `GROUP BY <key>`.
+
+```sql
+-- Upsert source with PRIMARY KEY (id): -D[id:5] (key-only).
+-- Output: +I[op:'DELETE', id:5, name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, produces_full_deletes 
=> false)
+
+-- Retract source with PRIMARY KEY (id): -D[id:5, name:'Alice'] (full 
pre-image).
+-- Output: +I[op:'DELETE', id:5, name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source, 
produces_full_deletes => false)
+```
+
+**Set semantics** (`PARTITION BY`): the function preserves the partition key 
and nulls every non-partition-key column on DELETE rows. The key used as the 
partition-key column should be the unique key that will be used as the record 
identifier. This matches the shape expected by upsert sinks and Kafka compacted 
topics.

Review Comment:
   ```suggestion
   **Set semantics** (`PARTITION BY`): the function preserves the partition key 
and nulls every non-partition-key column on DELETE rows. The key used as the 
partition-key column should be the unique key that will be used as the record 
identifier. This matches the shape expected by upsert sinks such as Kafka 
compacted topics.
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java:
##########
@@ -781,6 +781,11 @@ public Optional<ChangelogMode> changelogMode() {
             return Optional.empty();
         }
 
+        @Override
+        public List<int[]> upsertKeyColumns() {

Review Comment:
   Usually we use array of array for index paths. All connector interfaces do 
it this way. 
   ```suggestion
           public int[][] upsertKeyColumns() {
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -204,9 +204,19 @@ public interface PartitionedTable {
      *         descriptor("deleted").asArgument("op"),
      *         map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
      *     );
+     *
+     * // Opt out of full-delete semantics. When `true` (default), DELETE rows 
carry the full
+     * // pre-image. When `false`, only the identifying key columns are 
preserved and the rest
+     * // are nulled. See [Full vs partial deletes](
+     * // 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/changelog/#full-vs-partial-deletes)

Review Comment:
   This links in code are outdated rather quickly. 
   ```suggestion
        * in documentation.
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java:
##########
@@ -187,12 +198,63 @@ private static Optional<List<DataType>> 
validateOpMappingKeys(
         return Optional.empty();
     }
 
-    private static String resolveOpColumnName(final CallContext callContext) {
-        return callContext
-                .getArgumentValue(1, ColumnList.class)
-                .filter(cl -> !cl.getNames().isEmpty())
-                .map(cl -> cl.getNames().get(0))
-                .orElse(DEFAULT_OP_COLUMN_NAME);
+    @SuppressWarnings("rawtypes")
+    private static Optional<List<DataType>> validateProducesFullDeletes(
+            final CallContext callContext, final boolean throwOnFailure) {
+        final boolean isExplicit = 
!callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES);
+        if (!isExplicit) {
+            return Optional.empty();
+        }
+        if (!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "The 'produces_full_deletes' argument must be a constant 
BOOLEAN literal.");
+        }
+        final boolean producesFullDeletes =
+                callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES, 
Boolean.class).orElse(true);
+        if (!producesFullDeletes) {
+            return Optional.empty();
+        }
+        // The check against the input changelog mode lives in the function 
constructor since

Review Comment:
   ```suggestion
           // The check against the input changelog mode lives in the runtime 
function constructor since
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to