gustavodemorais commented on code in PR #28235:
URL: https://github.com/apache/flink/pull/28235#discussion_r3310091406


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -204,9 +204,18 @@ public interface PartitionedTable {
      *         descriptor("deleted").asArgument("op"),
      *         map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
      *     );
+     *
+     * // Opt out of full-delete semantics. The default (true) requires 
fully-populated DELETE
+     * // rows and inserts a ChangelogNormalize for upsert sources. When 
false, the function
+     * // preserves the partition key on DELETE rows and nulls the rest, which 
avoids the
+     * // stateful normalization operator upstream.
+     * Table result = table
+     *     .partitionBy($("id"))
+     *     .toChangelog(lit(false).asArgument("produces_full_deletes"));

Review Comment:
   ```suggestion
        * // Opt out of full-delete semantics. When `true` (default), DELETE 
rows carry the full pre-image.
        * // When `false`, only the identifying key columns are preserved and 
the rest are nulled. 
        * // See [Delete handling](#delete-handling) for more details.
        * Table result = table
        *     .partitionBy($("id"))
        *     .toChangelog(lit(false).asArgument("produces_full_deletes"));
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,76 @@ SELECT * FROM TO_CHANGELOG(
 -- UPDATE_BEFORE is dropped (not in the mapping)
 ```
 
+#### Delete handling
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and 
what the planner requires from the input. The matrix below shows each 
combination with `PARTITION BY` (set semantics) and without (row semantics).
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert 
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted 
upstream to materialize the full pre-image from state. For sources that already 
emit a full pre-image (e.g. retract), the flag is a no-op. The function then 
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
+```
+
+**Set semantics** (`PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
+```
+
+##### `produces_full_deletes => false`
+
+The planner skips `ChangelogNormalize` and the function emits partial DELETE 
rows. This avoids the stateful normalization operator for upsert sources (e.g. 
Kafka compacted topics) where the full pre-image is not needed downstream. 
Requires an upsert key (row semantics) or `PARTITION BY` (set semantics); 
otherwise the call is rejected with a validation error.

Review Comment:
   ```suggestion
   The planner skips `ChangelogNormalize` and the function emits partial DELETE 
rows. This avoids the stateful normalization operator for upsert sources (e.g. 
Kafka compacted topics) where the full pre-image is not needed downstream. 
Requires an upsert key to be present for the input table (row semantics) or 
`PARTITION BY` (set semantics); otherwise the call is rejected with a 
validation error.
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,76 @@ SELECT * FROM TO_CHANGELOG(
 -- UPDATE_BEFORE is dropped (not in the mapping)
 ```
 
+#### Delete handling
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and 
what the planner requires from the input. The matrix below shows each 
combination with `PARTITION BY` (set semantics) and without (row semantics).
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert 
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted 
upstream to materialize the full pre-image from state. For sources that already 
emit a full pre-image (e.g. retract), the flag is a no-op. The function then 
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
+```
+
+**Set semantics** (`PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
+```
+
+##### `produces_full_deletes => false`
+
+The planner skips `ChangelogNormalize` and the function emits partial DELETE 
rows. This avoids the stateful normalization operator for upsert sources (e.g. 
Kafka compacted topics) where the full pre-image is not needed downstream. 
Requires an upsert key (row semantics) or `PARTITION BY` (set semantics); 
otherwise the call is rejected with a validation error.
+
+**Row semantics** (no `PARTITION BY`): the function preserves the 
planner-derived upsert key columns on DELETE rows and nulls the rest. The 
upsert key is typically a declared `PRIMARY KEY`.

Review Comment:
   ```suggestion
   **Row semantics** (no `PARTITION BY`): the function preserves the 
planner-derived upsert key columns on DELETE rows and nulls the rest. The 
upsert key is typically a declared `PRIMARY KEY` when directly reading from 
source or the key provided in a GROUP BY <key>.
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,76 @@ SELECT * FROM TO_CHANGELOG(
 -- UPDATE_BEFORE is dropped (not in the mapping)
 ```
 
+#### Delete handling
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and 
what the planner requires from the input. The matrix below shows each 
combination with `PARTITION BY` (set semantics) and without (row semantics).
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert 
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted 
upstream to materialize the full pre-image from state. For sources that already 
emit a full pre-image (e.g. retract), the flag is a no-op. The function then 
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
+```
+
+**Set semantics** (`PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
+```
+
+##### `produces_full_deletes => false`
+
+The planner skips `ChangelogNormalize` and the function emits partial DELETE 
rows. This avoids the stateful normalization operator for upsert sources (e.g. 
Kafka compacted topics) where the full pre-image is not needed downstream. 
Requires an upsert key (row semantics) or `PARTITION BY` (set semantics); 
otherwise the call is rejected with a validation error.
+
+**Row semantics** (no `PARTITION BY`): the function preserves the 
planner-derived upsert key columns on DELETE rows and nulls the rest. The 
upsert key is typically a declared `PRIMARY KEY`.
+
+```sql
+-- Upsert source with PRIMARY KEY (id): -D[id:5] (key-only).
+-- Output: +I[op:'DELETE', id:5, name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, produces_full_deletes 
=> false)
+
+-- Retract source with PRIMARY KEY (id): -D[id:5, name:'Alice'] (full 
pre-image).
+-- Output: +I[op:'DELETE', id:5, name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source, 
produces_full_deletes => false)
+```
+
+**Set semantics** (`PARTITION BY`): the function preserves the partition key 
and nulls every non-partition-key column on DELETE rows. This matches the shape 
expected by upsert sinks and Kafka compacted topics.

Review Comment:
   ```suggestion
   **Set semantics** (`PARTITION BY`): the function preserves the partition key 
and nulls every non-partition-key column on DELETE rows. The key used as the 
partition-key column should be the unique key that will be used as the record 
identifier. This matches the shape expected by upsert sinks and Kafka compacted 
topics.
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,76 @@ SELECT * FROM TO_CHANGELOG(
 -- UPDATE_BEFORE is dropped (not in the mapping)
 ```
 
+#### Delete handling

Review Comment:
   Nice documentation!
   
   ```suggestion
   #### Full vs partial deletes
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -296,17 +296,19 @@ This is useful when you need to materialize changelog 
events into a downstream s
 SELECT * FROM TO_CHANGELOG(
   input => TABLE source_table [PARTITION BY key_col],
   [op => DESCRIPTOR(op_column_name),]
-  [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
+  [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...],]
+  [produces_full_deletes => BOOLEAN]
 )
 ```
 
 ### Parameters
 
-| Parameter    | Required | Description                                        
                                                                                
                                                                                
                                                                                
                                                      |
-|:-------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `input`      | Yes      | The input table. With `PARTITION BY`, rows with 
the same key are co-located and run in the same operator instance. Without 
`PARTITION BY`, each row is processed independently. Accepts insert-only, 
retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key 
should match or be a subset of the upsert key of the subquery.                  
                    |
-| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`.                                        
                                                                                
                                                                                
                                                        |
-| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping change operation 
names to custom output codes. Keys can contain comma-separated names to map 
multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When 
provided, only mapped operations are forwarded - unmapped events are dropped. 
Each change operation may appear at most once across all entries. |
+| Parameter               | Required | Description                             
                                                                                
                                                                                
                                                                                
                                                                 |
+|:------------------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `input`                 | Yes      | The input table. With `PARTITION BY`, 
rows with the same key are co-located and run in the same operator instance. 
Without `PARTITION BY`, each row is processed independently. Accepts 
insert-only, retract, and upsert tables. For upsert tables, the provided 
`PARTITION BY` key should match or be a subset of the upsert key of the 
subquery.       |
+| `op`                    | No       | A `DESCRIPTOR` with a single column 
name for the operation code column. Defaults to `op`.                           
                                                                                
                                                                                
                                                                     |
+| `op_mapping`            | No       | A `MAP<STRING, STRING>` mapping change 
operation names to custom output codes. Keys can contain comma-separated names 
to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). 
When provided, only mapped operations are forwarded - unmapped events are 
dropped. Each change operation may appear at most once across all entries. |
+| `produces_full_deletes` | No       | A `BOOLEAN` literal that controls how 
DELETE rows are emitted. When `true` (default), the function requires 
fully-populated DELETE rows from the input. The planner inserts a 
`ChangelogNormalize` operator for upsert sources that emit key-only deletes, so 
downstream sees the full pre-image on DELETE. When `false`, the function 
instead emits partial DELETE rows: in row semantics it preserves the 
planner-derived upsert key columns of the input and nulls the rest; in set 
semantics (`PARTITION BY`) it preserves the partition key and nulls the rest. 
Requires that the input declares an upsert key or that the call uses `PARTITION 
BY`; otherwise the function has no identifying columns to preserve and the call 
is rejected. |

Review Comment:
   Let's have here a short description with the information that the user needs 
to use it. Detailed information is in the section below 
   
   ```suggestion
   | `produces_full_deletes` | No | A `BOOLEAN` literal that controls how 
DELETE rows are emitted. When `true` (default), DELETE rows carry all columns, 
the full image. When `false`, only the identifying key columns are preserved 
and the rest are nulled. See [Delete handling](#delete-handling) for more 
details. |
   
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java:
##########
@@ -1459,9 +1459,19 @@ default TableResult executeInsert(
      *     descriptor("deleted").asArgument("op"),
      *     map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
      * );
+     *
+     * // Opt out of full-delete semantics. The default (true) requires 
fully-populated DELETE
+     * // rows and inserts a ChangelogNormalize for upsert sources. When 
false, the function
+     * // emits partial DELETE rows: row semantics preserves the 
planner-derived upsert key
+     * // columns and nulls the rest; set semantics preserves the partition 
key. Requires an
+     * // upsert key or PARTITION BY; otherwise the call is rejected.

Review Comment:
   ```suggestion
        *
        * // Opt out of full-delete semantics. When `true` (default), DELETE 
rows carry the full pre-image.
        * // When `false`, only the identifying key columns are preserved and 
the rest are nulled. 
        * // See [Delete handling](#delete-handling) for more details.
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java:
##########
@@ -128,6 +128,28 @@ public interface TableSemantics {
      */
     Optional<ChangelogMode> changelogMode();
 
+    /**
+     * Upsert key columns derived from the passed table's metadata.
+     *
+     * <p>Returns 0-based column indices that uniquely identify a row for 
upsert semantics. This is
+     * distinct from {@link #partitionByColumns()}: partition keys describe 
distribution and
+     * co-location, upsert keys describe row identity. Useful for functions 
that need to emit
+     * key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER pairs, or route 
CDC events without
+     * forcing the caller to repeat the key via {@code PARTITION BY}.
+     *
+     * <p>Returns an empty array when no upsert key is derivable (e.g., a pure 
append-only source)
+     * or when the planner has not yet computed metadata (during type 
inference).
+     *
+     * <p>If the planner derives multiple candidate upsert keys for the same 
input (e.g., a table
+     * with several unique constraints), only one is returned. The planner 
today picks the candidate

Review Comment:
   ```suggestion
        * with several primary key constraints), only one is returned. The 
planner today picks the candidate
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java:
##########
@@ -128,6 +128,28 @@ public interface TableSemantics {
      */
     Optional<ChangelogMode> changelogMode();
 
+    /**
+     * Upsert key columns derived from the passed table's metadata.
+     *
+     * <p>Returns 0-based column indices that uniquely identify a row for 
upsert semantics. This is
+     * distinct from {@link #partitionByColumns()}: partition keys describe 
distribution and
+     * co-location, upsert keys describe row identity. Useful for functions 
that need to emit
+     * key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER pairs, or route 
CDC events without
+     * forcing the caller to repeat the key via {@code PARTITION BY}.
+     *
+     * <p>Returns an empty array when no upsert key is derivable (e.g., a pure 
append-only source)
+     * or when the planner has not yet computed metadata (during type 
inference).
+     *
+     * <p>If the planner derives multiple candidate upsert keys for the same 
input (e.g., a table
+     * with several unique constraints), only one is returned. The planner 
today picks the candidate
+     * with the smallest number of columns, but ties between equal-cardinality 
keys are resolved by
+     * an implementation detail and may change across releases. Callers must 
not rely on which
+     * candidate is returned when more than one exists.

Review Comment:
   I don't think we can allow that? If we change the key that is returned 
between versions, this can generate huge bugs in pipelines after migrations. 
Imagine you store in state key1 -> val1 and after a migration you get a 
deletion for key2 -> val2 (which was previously k1), then we corrupt results 
and state forever. I think we have to settle for a specific way of picking the 
upsert key and document it. E.g. Shortest and first indexes
   



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java:
##########
@@ -187,12 +198,63 @@ private static Optional<List<DataType>> 
validateOpMappingKeys(
         return Optional.empty();
     }
 
-    private static String resolveOpColumnName(final CallContext callContext) {
-        return callContext
-                .getArgumentValue(1, ColumnList.class)
-                .filter(cl -> !cl.getNames().isEmpty())
-                .map(cl -> cl.getNames().get(0))
-                .orElse(DEFAULT_OP_COLUMN_NAME);
+    @SuppressWarnings("rawtypes")
+    private static Optional<List<DataType>> validateProducesFullDeletes(
+            final CallContext callContext, final boolean throwOnFailure) {
+        final boolean isExplicit = 
!callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES);
+        if (!isExplicit) {
+            return Optional.empty();
+        }
+        if (!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "The 'produces_full_deletes' argument must be a constant 
BOOLEAN literal.");
+        }
+        final boolean producesFullDeletes =
+                callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES, 
Boolean.class).orElse(true);
+        if (!producesFullDeletes) {
+            return Optional.empty();
+        }
+        // The check against the input changelog mode lives in the function 
constructor since
+        // TableSemantics#changelogMode() returns empty here at type-inference 
time. The mapping
+        // check below only needs the literal op_mapping argument, so it lives 
here. Only runs
+        // when the user explicitly set produces_full_deletes=true; the 
default true is not
+        // validated since it is a safe no-op for any input.
+        final Optional<Map> opMapping = 
callContext.getArgumentValue(ARG_OP_MAPPING, Map.class);
+        if (opMapping.isPresent() && !mapsDelete((Map<String, String>) 
opMapping.get())) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "Invalid 'produces_full_deletes' for TO_CHANGELOG: the 
active 'op_mapping' "
+                            + "does not map DELETE rows, so no DELETE rows are 
emitted. Remove "
+                            + "the 'produces_full_deletes' argument or add a 
DELETE entry to "
+                            + "'op_mapping'.");
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Returns {@code true} when at least one {@code op_mapping} key 
references {@code DELETE}. Keys
+     * may be comma-separated (e.g., {@code "INSERT, DELETE"}) per the 
user-facing contract.
+     */
+    private static boolean mapsDelete(final Map<String, String> opMapping) {
+        for (final String key : opMapping.keySet()) {
+            for (final String rawName : key.split(",")) {
+                if (DELETE.equals(rawName.trim())) {

Review Comment:
   We are case-sensitive, right? Should we add this information to the error 
message of opMapping 
https://github.com/apache/flink/pull/28235/changes#r3310307549



##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;
+import org.apache.flink.table.types.inference.utils.TableSemanticsMock;
+import org.apache.flink.types.ColumnList;
+
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TO_CHANGELOG_INPUT_TYPE_STRATEGY;
+
+/** Tests for {@link ToChangelogTypeStrategy#INPUT_TYPE_STRATEGY}. */
+class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase {
+
+    private static final DataType TABLE_TYPE =
+            DataTypes.ROW(
+                    DataTypes.FIELD("name", DataTypes.STRING()),
+                    DataTypes.FIELD("score", DataTypes.BIGINT()));
+
+    private static final DataType DESCRIPTOR_TYPE = DataTypes.DESCRIPTOR();
+
+    private static final DataType MAP_TYPE = DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING());
+
+    private static final DataType BOOLEAN_TYPE = DataTypes.BOOLEAN();
+
+    @Override
+    protected Stream<TestSpec> testData() {
+        return Stream.of(
+                // Valid: produces_full_deletes=true with default op_mapping 
(includes DELETE)
+                TestSpec.forStrategy(
+                                "Valid produces_full_deletes=true with default 
mapping",
+                                TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
+                        .calledWithLiteralAt(2, null)
+                        .calledWithLiteralAt(3, true)

Review Comment:
   Can you use the constants you created for all the indexes in these tests? 
   ```suggestion
                           .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
   ```



##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;
+import org.apache.flink.table.types.inference.utils.TableSemanticsMock;
+import org.apache.flink.types.ColumnList;
+
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TO_CHANGELOG_INPUT_TYPE_STRATEGY;
+
+/** Tests for {@link ToChangelogTypeStrategy#INPUT_TYPE_STRATEGY}. */
+class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase {
+
+    private static final DataType TABLE_TYPE =
+            DataTypes.ROW(
+                    DataTypes.FIELD("name", DataTypes.STRING()),
+                    DataTypes.FIELD("score", DataTypes.BIGINT()));
+
+    private static final DataType DESCRIPTOR_TYPE = DataTypes.DESCRIPTOR();
+
+    private static final DataType MAP_TYPE = DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING());
+
+    private static final DataType BOOLEAN_TYPE = DataTypes.BOOLEAN();
+
+    @Override
+    protected Stream<TestSpec> testData() {
+        return Stream.of(
+                // Valid: produces_full_deletes=true with default op_mapping 
(includes DELETE)
+                TestSpec.forStrategy(
+                                "Valid produces_full_deletes=true with default 
mapping",
+                                TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
+                        .calledWithLiteralAt(2, null)
+                        .calledWithLiteralAt(3, true)
+                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE),
+
+                // Valid: produces_full_deletes=true with op_mapping that 
includes DELETE
+                TestSpec.forStrategy(
+                                "Valid produces_full_deletes=true with 
explicit DELETE mapping",
+                                TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
+                        .calledWithLiteralAt(2, Map.of("INSERT", "I", 
"DELETE", "D"))
+                        .calledWithLiteralAt(3, true)
+                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE),
+
+                // Valid: produces_full_deletes=true with comma-separated 
DELETE key
+                TestSpec.forStrategy(
+                                "Valid produces_full_deletes=true with 
comma-separated DELETE",
+                                TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
+                        .calledWithLiteralAt(2, Map.of("INSERT, DELETE", "X"))
+                        .calledWithLiteralAt(3, true)
+                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE),
+
+                // Valid: produces_full_deletes=false with op_mapping that 
omits DELETE
+                TestSpec.forStrategy(
+                                "Valid produces_full_deletes=false with no 
DELETE in mapping",
+                                TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
+                        .calledWithLiteralAt(2, Map.of("INSERT, UPDATE_AFTER", 
"X"))
+                        .calledWithLiteralAt(3, false)
+                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE),
+
+                // Error: produces_full_deletes=true with op_mapping that 
strips DELETE
+                TestSpec.forStrategy(
+                                "produces_full_deletes=true rejected when 
op_mapping omits DELETE",
+                                TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
+                        .calledWithLiteralAt(2, Map.of("INSERT, UPDATE_AFTER", 
"X"))
+                        .calledWithLiteralAt(3, true)
+                        .expectErrorMessage(
+                                "Invalid 'produces_full_deletes' for 
TO_CHANGELOG: the active "
+                                        + "'op_mapping' does not map DELETE 
rows"),
+
+                // Error: multi-column descriptor

Review Comment:
   ```suggestion
                   // Error: multi-column descriptor for `op`
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -598,4 +599,220 @@ public class ToChangelogTestPrograms {
                             ValidationException.class,
                             "Duplicate change operation: 'DELETE'")
                     .build();
+
+    public static final TableTestProgram 
PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT =
+            TableTestProgram.of(
+                            
"to-changelog-produces-full-deletes-on-append-only-input",
+                            "fails when produces_full_deletes=true on an input 
that never emits DELETE rows")
+                    .setupTableSource(SIMPLE_SOURCE)
+                    .runFailingSql(
+                            "SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t, "
+                                    + "produces_full_deletes => true)",
+                            ValidationException.class,
+                            "the input table only produces [INSERT] and never 
emits DELETE rows")
+                    .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Row semantics x delete handling matrix
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram ROW_SEM_PARTIAL_DELETES =

Review Comment:
   Let's rename the tests so it matches the naming style we have 
   
   ...
   RETRACT_PRODUCES_PARTIAL_DELETES
   UPSERT_PRODUCES_FULL_DELETES
   UPSERT_PRODUCES_PARTIAL_DELETES
   RETRACT_PARTITION_BY_PRODUCES_PARTIAL_DELETES
   RETRACT_PARTITION_BY_PRODUCES_FULL_DELETES
   UPSERT_PARTITION_BY_PRODUCES_FULL_DELETES
   ...
   
   This makes it easier for us to navigate tests 



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -598,4 +599,220 @@ public class ToChangelogTestPrograms {
                             ValidationException.class,
                             "Duplicate change operation: 'DELETE'")
                     .build();
+
+    public static final TableTestProgram 
PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT =

Review Comment:
   Move this to error validation tests section and rename so it matches other 
tests



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -204,9 +204,18 @@ public interface PartitionedTable {
      *         descriptor("deleted").asArgument("op"),
      *         map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
      *     );
+     *
+     * // Opt out of full-delete semantics. The default (true) requires 
fully-populated DELETE
+     * // rows and inserts a ChangelogNormalize for upsert sources. When 
false, the function
+     * // preserves the partition key on DELETE rows and nulls the rest, which 
avoids the
+     * // stateful normalization operator upstream.
+     * Table result = table
+     *     .partitionBy($("id"))
+     *     .toChangelog(lit(false).asArgument("produces_full_deletes"));

Review Comment:
   Let's keep the api usage simple with only the info the user needs. More 
details can be found in the documentation



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java:
##########
@@ -128,6 +128,28 @@ public interface TableSemantics {
      */
     Optional<ChangelogMode> changelogMode();
 
+    /**
+     * Upsert key columns derived from the passed table's metadata.
+     *
+     * <p>Returns 0-based column indices that uniquely identify a row for 
upsert semantics. This is
+     * distinct from {@link #partitionByColumns()}: partition keys describe 
distribution and
+     * co-location, upsert keys describe row identity. Useful for functions 
that need to emit
+     * key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER pairs, or route 
CDC events without
+     * forcing the caller to repeat the key via {@code PARTITION BY}.
+     *
+     * <p>Returns an empty array when no upsert key is derivable (e.g., a pure 
append-only source)
+     * or when the planner has not yet computed metadata (during type 
inference).
+     *
+     * <p>If the planner derives multiple candidate upsert keys for the same 
input (e.g., a table
+     * with several unique constraints), only one is returned. The planner 
today picks the candidate
+     * with the smallest number of columns, but ties between equal-cardinality 
keys are resolved by
+     * an implementation detail and may change across releases. Callers must 
not rely on which
+     * candidate is returned when more than one exists.

Review Comment:
   This would also be one of the reasons to pass down all upsert keys so that 
users decide which one they want to use



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java:
##########
@@ -128,6 +128,28 @@ public interface TableSemantics {
      */
     Optional<ChangelogMode> changelogMode();
 
+    /**
+     * Upsert key columns derived from the passed table's metadata.
+     *
+     * <p>Returns 0-based column indices that uniquely identify a row for 
upsert semantics. This is
+     * distinct from {@link #partitionByColumns()}: partition keys describe 
distribution and
+     * co-location, upsert keys describe row identity. Useful for functions 
that need to emit
+     * key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER pairs, or route 
CDC events without
+     * forcing the caller to repeat the key via {@code PARTITION BY}.
+     *
+     * <p>Returns an empty array when no upsert key is derivable (e.g., a pure 
append-only source)
+     * or when the planner has not yet computed metadata (during type 
inference).

Review Comment:
   append-only sources can actually have a primary and an upsert key
   
   ```suggestion
        * <p>Returns an empty array when no upsert key is derivable
        * or when the planner has not yet computed metadata (during type 
inference).
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java:
##########
@@ -187,12 +198,63 @@ private static Optional<List<DataType>> 
validateOpMappingKeys(
         return Optional.empty();
     }
 
-    private static String resolveOpColumnName(final CallContext callContext) {
-        return callContext
-                .getArgumentValue(1, ColumnList.class)
-                .filter(cl -> !cl.getNames().isEmpty())
-                .map(cl -> cl.getNames().get(0))
-                .orElse(DEFAULT_OP_COLUMN_NAME);
+    @SuppressWarnings("rawtypes")
+    private static Optional<List<DataType>> validateProducesFullDeletes(
+            final CallContext callContext, final boolean throwOnFailure) {
+        final boolean isExplicit = 
!callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES);
+        if (!isExplicit) {
+            return Optional.empty();
+        }
+        if (!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "The 'produces_full_deletes' argument must be a constant 
BOOLEAN literal.");
+        }
+        final boolean producesFullDeletes =
+                callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES, 
Boolean.class).orElse(true);
+        if (!producesFullDeletes) {
+            return Optional.empty();
+        }
+        // The check against the input changelog mode lives in the function 
constructor since
+        // TableSemantics#changelogMode() returns empty here at type-inference 
time. The mapping
+        // check below only needs the literal op_mapping argument, so it lives 
here. Only runs
+        // when the user explicitly set produces_full_deletes=true; the 
default true is not
+        // validated since it is a safe no-op for any input.
+        final Optional<Map> opMapping = 
callContext.getArgumentValue(ARG_OP_MAPPING, Map.class);
+        if (opMapping.isPresent() && !mapsDelete((Map<String, String>) 
opMapping.get())) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "Invalid 'produces_full_deletes' for TO_CHANGELOG: the 
active 'op_mapping' "

Review Comment:
   nice



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java:
##########
@@ -128,6 +128,28 @@ public interface TableSemantics {
      */
     Optional<ChangelogMode> changelogMode();
 
+    /**
+     * Upsert key columns derived from the passed table's metadata.
+     *
+     * <p>Returns 0-based column indices that uniquely identify a row for 
upsert semantics. This is
+     * distinct from {@link #partitionByColumns()}: partition keys describe 
distribution and
+     * co-location, upsert keys describe row identity. Useful for functions 
that need to emit
+     * key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER pairs, or route 
CDC events without
+     * forcing the caller to repeat the key via {@code PARTITION BY}.

Review Comment:
   
   ```suggestion
        * <p>Returns 0-based column indices that uniquely identify a row for 
upsert semantics. This is
        * distinct from {@link #partitionByColumns()}: partition keys describe 
distribution and
        * co-location, upsert keys describe row identity. Useful for functions 
that need to emit
        * key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER pairs, or want 
to have a unique identifier to interact with state.
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,76 @@ SELECT * FROM TO_CHANGELOG(
 -- UPDATE_BEFORE is dropped (not in the mapping)
 ```
 
+#### Delete handling
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and 
what the planner requires from the input. The matrix below shows each 
combination with `PARTITION BY` (set semantics) and without (row semantics).
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert 
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted 
upstream to materialize the full pre-image from state. For sources that already 
emit a full pre-image (e.g. retract), the flag is a no-op. The function then 
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
+```
+
+**Set semantics** (`PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
+```
+
+##### `produces_full_deletes => false`
+
+The planner skips `ChangelogNormalize` and the function emits partial DELETE 
rows. This avoids the stateful normalization operator for upsert sources (e.g. 
Kafka compacted topics) where the full pre-image is not needed downstream. 
Requires an upsert key (row semantics) or `PARTITION BY` (set semantics); 
otherwise the call is rejected with a validation error.

Review Comment:
   I think we don't have a good explanation of the upsert key in the docs. It'd 
be good to have it and then link it in the places where we mention it for the 
users



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java:
##########


Review Comment:
   ```suggestion
   "Invalid target mapping for argument 'op_mapping'. "
   + "Unknown change operation: '%s'. Operations are case-sensitive. Valid 
values are: %s.",
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -598,4 +599,220 @@ public class ToChangelogTestPrograms {
                             ValidationException.class,
                             "Duplicate change operation: 'DELETE'")
                     .build();
+
+    public static final TableTestProgram 
PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT =
+            TableTestProgram.of(
+                            
"to-changelog-produces-full-deletes-on-append-only-input",
+                            "fails when produces_full_deletes=true on an input 
that never emits DELETE rows")
+                    .setupTableSource(SIMPLE_SOURCE)
+                    .runFailingSql(
+                            "SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t, "
+                                    + "produces_full_deletes => true)",
+                            ValidationException.class,
+                            "the input table only produces [INSERT] and never 
emits DELETE rows")
+                    .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Row semantics x delete handling matrix
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram ROW_SEM_PARTIAL_DELETES =
+            TableTestProgram.of(
+                            "to-changelog-row-sem-partial-deletes",
+                            "row semantics: produces_full_deletes=false skips 
ChangelogNormalize and a partial DELETE row from the input passes through 
unchanged")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[INSERT, Alice, 10]",
+                                            "+I[INSERT, Bob, 20]",
+                                            "+I[UPDATE_BEFORE, Alice, 10]",
+                                            "+I[UPDATE_AFTER, Alice, 30]",
+                                            "+I[DELETE, Bob, null]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t, "
+                                    + "produces_full_deletes => false)")
+                    .build();
+
+    public static final TableTestProgram ROW_SEM_FORCE_FULL_DELETES =
+            TableTestProgram.of(
+                            "to-changelog-row-sem-force-full-deletes",
+                            "row semantics: produces_full_deletes=true forces 
ChangelogNormalize to materialize the full DELETE row from an upsert source 
emitting key-only deletes")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            // Key-only delete: 
ChangelogNormalize fills the row.
+                                            Row.ofKind(RowKind.DELETE, 
"Alice", null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[INSERT, Alice, 10]", 
"+I[DELETE, Alice, 10]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t, "
+                                    + "produces_full_deletes => true)")
+                    .build();
+
+    public static final TableTestProgram 
ROW_SEM_PARTIAL_DELETES_VIA_UPSERT_KEY =
+            TableTestProgram.of(
+                            
"to-changelog-row-sem-partial-deletes-via-upsert-key",
+                            "row semantics with single-column upsert key + "
+                                    + "produces_full_deletes=false: DELETE 
preserves the key "
+                                    + "column and nulls the rest without 
requiring PARTITION BY")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.DELETE, 
"Alice", 10L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[INSERT, Alice, 10]", 
"+I[DELETE, Alice, null]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t, "
+                                    + "produces_full_deletes => false)")
+                    .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Set semantics x delete handling matrix
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram SET_SEM_FORCE_PARTIAL_DELETES =
+            TableTestProgram.of(
+                            "to-changelog-set-sem-force-partial-deletes",
+                            "set semantics: produces_full_deletes=false nulls 
non-partition-key columns on DELETE even when the input row is fully populated")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[Alice, INSERT, 10]",
+                                            "+I[Bob, INSERT, 20]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, null]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name,"
+                                    + "produces_full_deletes => false)")
+                    .build();
+
+    public static final TableTestProgram SET_SEM_PARTIAL_DELETES =
+            TableTestProgram.of(
+                            "to-changelog-set-sem-partial-deletes",
+                            "set semantics: produces_full_deletes=false 
(default) lets a partial DELETE row from the input pass through with 
non-partition-key columns null")

Review Comment:
   This is wrong, the default is true. Is this test passing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to