gustavodemorais commented on code in PR #28235:
URL: https://github.com/apache/flink/pull/28235#discussion_r3310091406
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -204,9 +204,18 @@ public interface PartitionedTable {
* descriptor("deleted").asArgument("op"),
* map("INSERT, UPDATE_AFTER", "false", "DELETE",
"true").asArgument("op_mapping")
* );
+ *
+ * // Opt out of full-delete semantics. The default (true) requires
fully-populated DELETE
+ * // rows and inserts a ChangelogNormalize for upsert sources. When
false, the function
+ * // preserves the partition key on DELETE rows and nulls the rest, which
avoids the
+ * // stateful normalization operator upstream.
+ * Table result = table
+ * .partitionBy($("id"))
+ * .toChangelog(lit(false).asArgument("produces_full_deletes"));
Review Comment:
```suggestion
* // Opt out of full-delete semantics. When `true` (default), DELETE
rows carry the full pre-image.
* // When `false`, only the identifying key columns are preserved and
the rest are nulled.
* // See [Delete handling](#delete-handling) for more details.
* Table result = table
* .partitionBy($("id"))
* .toChangelog(lit(false).asArgument("produces_full_deletes"));
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,76 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```
+#### Delete handling
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and
what the planner requires from the input. The matrix below shows each
combination with `PARTITION BY` (set semantics) and without (row semantics).
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted
upstream to materialize the full pre-image from state. For sources that already
emit a full pre-image (e.g. retract), the flag is a no-op. The function then
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
+```
+
+**Set semantics** (`PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
+```
+
+##### `produces_full_deletes => false`
+
+The planner skips `ChangelogNormalize` and the function emits partial DELETE
rows. This avoids the stateful normalization operator for upsert sources (e.g.
Kafka compacted topics) where the full pre-image is not needed downstream.
Requires an upsert key (row semantics) or `PARTITION BY` (set semantics);
otherwise the call is rejected with a validation error.
Review Comment:
```suggestion
The planner skips `ChangelogNormalize` and the function emits partial DELETE
rows. This avoids the stateful normalization operator for upsert sources (e.g.
Kafka compacted topics) where the full pre-image is not needed downstream.
Requires an upsert key to be present for the input table (row semantics) or
`PARTITION BY` (set semantics); otherwise the call is rejected with a
validation error.
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,76 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```
+#### Delete handling
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and
what the planner requires from the input. The matrix below shows each
combination with `PARTITION BY` (set semantics) and without (row semantics).
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted
upstream to materialize the full pre-image from state. For sources that already
emit a full pre-image (e.g. retract), the flag is a no-op. The function then
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
+```
+
+**Set semantics** (`PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
+```
+
+##### `produces_full_deletes => false`
+
+The planner skips `ChangelogNormalize` and the function emits partial DELETE
rows. This avoids the stateful normalization operator for upsert sources (e.g.
Kafka compacted topics) where the full pre-image is not needed downstream.
Requires an upsert key (row semantics) or `PARTITION BY` (set semantics);
otherwise the call is rejected with a validation error.
+
+**Row semantics** (no `PARTITION BY`): the function preserves the
planner-derived upsert key columns on DELETE rows and nulls the rest. The
upsert key is typically a declared `PRIMARY KEY`.
Review Comment:
```suggestion
**Row semantics** (no `PARTITION BY`): the function preserves the
planner-derived upsert key columns on DELETE rows and nulls the rest. The
upsert key is typically a declared `PRIMARY KEY` when directly reading from
source or the key provided in a GROUP BY <key>.
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,76 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```
+#### Delete handling
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and
what the planner requires from the input. The matrix below shows each
combination with `PARTITION BY` (set semantics) and without (row semantics).
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted
upstream to materialize the full pre-image from state. For sources that already
emit a full pre-image (e.g. retract), the flag is a no-op. The function then
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
+```
+
+**Set semantics** (`PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
+```
+
+##### `produces_full_deletes => false`
+
+The planner skips `ChangelogNormalize` and the function emits partial DELETE
rows. This avoids the stateful normalization operator for upsert sources (e.g.
Kafka compacted topics) where the full pre-image is not needed downstream.
Requires an upsert key (row semantics) or `PARTITION BY` (set semantics);
otherwise the call is rejected with a validation error.
+
+**Row semantics** (no `PARTITION BY`): the function preserves the
planner-derived upsert key columns on DELETE rows and nulls the rest. The
upsert key is typically a declared `PRIMARY KEY`.
+
+```sql
+-- Upsert source with PRIMARY KEY (id): -D[id:5] (key-only).
+-- Output: +I[op:'DELETE', id:5, name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, produces_full_deletes
=> false)
+
+-- Retract source with PRIMARY KEY (id): -D[id:5, name:'Alice'] (full
pre-image).
+-- Output: +I[op:'DELETE', id:5, name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source,
produces_full_deletes => false)
+```
+
+**Set semantics** (`PARTITION BY`): the function preserves the partition key
and nulls every non-partition-key column on DELETE rows. This matches the shape
expected by upsert sinks and Kafka compacted topics.
Review Comment:
```suggestion
**Set semantics** (`PARTITION BY`): the function preserves the partition key
and nulls every non-partition-key column on DELETE rows. The key used as the
partition-key column should be the unique key that will be used as the record
identifier. This matches the shape expected by upsert sinks and Kafka compacted
topics.
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,76 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```
+#### Delete handling
Review Comment:
Nice documentation!
```suggestion
#### Full vs partial deletes
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -296,17 +296,19 @@ This is useful when you need to materialize changelog
events into a downstream s
SELECT * FROM TO_CHANGELOG(
input => TABLE source_table [PARTITION BY key_col],
[op => DESCRIPTOR(op_column_name),]
- [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
+ [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...],]
+ [produces_full_deletes => BOOLEAN]
)
```
### Parameters
-| Parameter | Required | Description
|
-|:-------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `input` | Yes | The input table. With `PARTITION BY`, rows with
the same key are co-located and run in the same operator instance. Without
`PARTITION BY`, each row is processed independently. Accepts insert-only,
retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key
should match or be a subset of the upsert key of the subquery.
|
-| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`.
|
-| `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation
names to custom output codes. Keys can contain comma-separated names to map
multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When
provided, only mapped operations are forwarded - unmapped events are dropped.
Each change operation may appear at most once across all entries. |
+| Parameter | Required | Description
|
+|:------------------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `input` | Yes | The input table. With `PARTITION BY`,
rows with the same key are co-located and run in the same operator instance.
Without `PARTITION BY`, each row is processed independently. Accepts
insert-only, retract, and upsert tables. For upsert tables, the provided
`PARTITION BY` key should match or be a subset of the upsert key of the
subquery. |
+| `op` | No | A `DESCRIPTOR` with a single column
name for the operation code column. Defaults to `op`.
|
+| `op_mapping` | No | A `MAP<STRING, STRING>` mapping change
operation names to custom output codes. Keys can contain comma-separated names
to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`).
When provided, only mapped operations are forwarded - unmapped events are
dropped. Each change operation may appear at most once across all entries. |
+| `produces_full_deletes` | No | A `BOOLEAN` literal that controls how
DELETE rows are emitted. When `true` (default), the function requires
fully-populated DELETE rows from the input. The planner inserts a
`ChangelogNormalize` operator for upsert sources that emit key-only deletes, so
downstream sees the full pre-image on DELETE. When `false`, the function
instead emits partial DELETE rows: in row semantics it preserves the
planner-derived upsert key columns of the input and nulls the rest; in set
semantics (`PARTITION BY`) it preserves the partition key and nulls the rest.
Requires that the input declares an upsert key or that the call uses `PARTITION
BY`; otherwise the function has no identifying columns to preserve and the call
is rejected. |
Review Comment:
Let's have here a short description with the information that the user needs
to use it. Detailed information is in the section below
```suggestion
| `produces_full_deletes` | No | A `BOOLEAN` literal that controls how
DELETE rows are emitted. When `true` (default), DELETE rows carry all columns,
the full image. When `false`, only the identifying key columns are preserved
and the rest are nulled. See [Delete handling](#delete-handling) for more
details. |
```
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java:
##########
@@ -1459,9 +1459,19 @@ default TableResult executeInsert(
* descriptor("deleted").asArgument("op"),
* map("INSERT, UPDATE_AFTER", "false", "DELETE",
"true").asArgument("op_mapping")
* );
+ *
+ * // Opt out of full-delete semantics. The default (true) requires
fully-populated DELETE
+ * // rows and inserts a ChangelogNormalize for upsert sources. When
false, the function
+ * // emits partial DELETE rows: row semantics preserves the
planner-derived upsert key
+ * // columns and nulls the rest; set semantics preserves the partition
key. Requires an
+ * // upsert key or PARTITION BY; otherwise the call is rejected.
Review Comment:
```suggestion
*
* // Opt out of full-delete semantics. When `true` (default), DELETE
rows carry the full pre-image.
* // When `false`, only the identifying key columns are preserved and
the rest are nulled.
* // See [Delete handling](#delete-handling) for more details.
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java:
##########
@@ -128,6 +128,28 @@ public interface TableSemantics {
*/
Optional<ChangelogMode> changelogMode();
+ /**
+ * Upsert key columns derived from the passed table's metadata.
+ *
+ * <p>Returns 0-based column indices that uniquely identify a row for
upsert semantics. This is
+ * distinct from {@link #partitionByColumns()}: partition keys describe
distribution and
+ * co-location, upsert keys describe row identity. Useful for functions
that need to emit
+ * key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER pairs, or route
CDC events without
+ * forcing the caller to repeat the key via {@code PARTITION BY}.
+ *
+ * <p>Returns an empty array when no upsert key is derivable (e.g., a pure
append-only source)
+ * or when the planner has not yet computed metadata (during type
inference).
+ *
+ * <p>If the planner derives multiple candidate upsert keys for the same
input (e.g., a table
+ * with several unique constraints), only one is returned. The planner
today picks the candidate
Review Comment:
```suggestion
* with several primary key constraints), only one is returned. The
planner today picks the candidate
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java:
##########
@@ -128,6 +128,28 @@ public interface TableSemantics {
*/
Optional<ChangelogMode> changelogMode();
+ /**
+ * Upsert key columns derived from the passed table's metadata.
+ *
+ * <p>Returns 0-based column indices that uniquely identify a row for
upsert semantics. This is
+ * distinct from {@link #partitionByColumns()}: partition keys describe
distribution and
+ * co-location, upsert keys describe row identity. Useful for functions
that need to emit
+ * key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER pairs, or route
CDC events without
+ * forcing the caller to repeat the key via {@code PARTITION BY}.
+ *
+ * <p>Returns an empty array when no upsert key is derivable (e.g., a pure
append-only source)
+ * or when the planner has not yet computed metadata (during type
inference).
+ *
+ * <p>If the planner derives multiple candidate upsert keys for the same
input (e.g., a table
+ * with several unique constraints), only one is returned. The planner
today picks the candidate
+ * with the smallest number of columns, but ties between equal-cardinality
keys are resolved by
+ * an implementation detail and may change across releases. Callers must
not rely on which
+ * candidate is returned when more than one exists.
Review Comment:
I don't think we can allow that? If we change the key that is returned
between versions, this can generate huge bugs in pipelines after migrations.
Imagine you store in state key1 -> val1 and after a migration you get a
deletion for key2 -> val2 (which was previously k1), then we corrupt results
and state forever. I think we have to settle for a specific way of picking the
upsert key and document it. E.g. Shortest and first indexes
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java:
##########
@@ -187,12 +198,63 @@ private static Optional<List<DataType>>
validateOpMappingKeys(
return Optional.empty();
}
- private static String resolveOpColumnName(final CallContext callContext) {
- return callContext
- .getArgumentValue(1, ColumnList.class)
- .filter(cl -> !cl.getNames().isEmpty())
- .map(cl -> cl.getNames().get(0))
- .orElse(DEFAULT_OP_COLUMN_NAME);
+ @SuppressWarnings("rawtypes")
+ private static Optional<List<DataType>> validateProducesFullDeletes(
+ final CallContext callContext, final boolean throwOnFailure) {
+ final boolean isExplicit =
!callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES);
+ if (!isExplicit) {
+ return Optional.empty();
+ }
+ if (!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) {
+ return callContext.fail(
+ throwOnFailure,
+ "The 'produces_full_deletes' argument must be a constant
BOOLEAN literal.");
+ }
+ final boolean producesFullDeletes =
+ callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES,
Boolean.class).orElse(true);
+ if (!producesFullDeletes) {
+ return Optional.empty();
+ }
+ // The check against the input changelog mode lives in the function
constructor since
+ // TableSemantics#changelogMode() returns empty here at type-inference
time. The mapping
+ // check below only needs the literal op_mapping argument, so it lives
here. Only runs
+ // when the user explicitly set produces_full_deletes=true; the
default true is not
+ // validated since it is a safe no-op for any input.
+ final Optional<Map> opMapping =
callContext.getArgumentValue(ARG_OP_MAPPING, Map.class);
+ if (opMapping.isPresent() && !mapsDelete((Map<String, String>)
opMapping.get())) {
+ return callContext.fail(
+ throwOnFailure,
+ "Invalid 'produces_full_deletes' for TO_CHANGELOG: the
active 'op_mapping' "
+ + "does not map DELETE rows, so no DELETE rows are
emitted. Remove "
+ + "the 'produces_full_deletes' argument or add a
DELETE entry to "
+ + "'op_mapping'.");
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Returns {@code true} when at least one {@code op_mapping} key
references {@code DELETE}. Keys
+ * may be comma-separated (e.g., {@code "INSERT, DELETE"}) per the
user-facing contract.
+ */
+ private static boolean mapsDelete(final Map<String, String> opMapping) {
+ for (final String key : opMapping.keySet()) {
+ for (final String rawName : key.split(",")) {
+ if (DELETE.equals(rawName.trim())) {
Review Comment:
We are case-sensitive, right? Should we add this information to the error
message of opMapping
https://github.com/apache/flink/pull/28235/changes#r3310307549
##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;
+import org.apache.flink.table.types.inference.utils.TableSemanticsMock;
+import org.apache.flink.types.ColumnList;
+
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TO_CHANGELOG_INPUT_TYPE_STRATEGY;
+
+/** Tests for {@link ToChangelogTypeStrategy#INPUT_TYPE_STRATEGY}. */
+class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase {
+
+ private static final DataType TABLE_TYPE =
+ DataTypes.ROW(
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("score", DataTypes.BIGINT()));
+
+ private static final DataType DESCRIPTOR_TYPE = DataTypes.DESCRIPTOR();
+
+ private static final DataType MAP_TYPE = DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING());
+
+ private static final DataType BOOLEAN_TYPE = DataTypes.BOOLEAN();
+
+ @Override
+ protected Stream<TestSpec> testData() {
+ return Stream.of(
+ // Valid: produces_full_deletes=true with default op_mapping
(includes DELETE)
+ TestSpec.forStrategy(
+ "Valid produces_full_deletes=true with default
mapping",
+ TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+ .calledWithArgumentTypes(
+ TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE,
BOOLEAN_TYPE)
+ .calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
+ .calledWithLiteralAt(1, ColumnList.of("op"))
+ .calledWithLiteralAt(2, null)
+ .calledWithLiteralAt(3, true)
Review Comment:
Can you use the constants you created for all the indexes in these tests?
```suggestion
.calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
```
##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;
+import org.apache.flink.table.types.inference.utils.TableSemanticsMock;
+import org.apache.flink.types.ColumnList;
+
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TO_CHANGELOG_INPUT_TYPE_STRATEGY;
+
+/** Tests for {@link ToChangelogTypeStrategy#INPUT_TYPE_STRATEGY}. */
+class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase {
+
+ private static final DataType TABLE_TYPE =
+ DataTypes.ROW(
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("score", DataTypes.BIGINT()));
+
+ private static final DataType DESCRIPTOR_TYPE = DataTypes.DESCRIPTOR();
+
+ private static final DataType MAP_TYPE = DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING());
+
+ private static final DataType BOOLEAN_TYPE = DataTypes.BOOLEAN();
+
+ @Override
+ protected Stream<TestSpec> testData() {
+ return Stream.of(
+ // Valid: produces_full_deletes=true with default op_mapping
(includes DELETE)
+ TestSpec.forStrategy(
+ "Valid produces_full_deletes=true with default
mapping",
+ TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+ .calledWithArgumentTypes(
+ TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE,
BOOLEAN_TYPE)
+ .calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
+ .calledWithLiteralAt(1, ColumnList.of("op"))
+ .calledWithLiteralAt(2, null)
+ .calledWithLiteralAt(3, true)
+ .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE, BOOLEAN_TYPE),
+
+ // Valid: produces_full_deletes=true with op_mapping that
includes DELETE
+ TestSpec.forStrategy(
+ "Valid produces_full_deletes=true with
explicit DELETE mapping",
+ TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+ .calledWithArgumentTypes(
+ TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE,
BOOLEAN_TYPE)
+ .calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
+ .calledWithLiteralAt(1, ColumnList.of("op"))
+ .calledWithLiteralAt(2, Map.of("INSERT", "I",
"DELETE", "D"))
+ .calledWithLiteralAt(3, true)
+ .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE, BOOLEAN_TYPE),
+
+ // Valid: produces_full_deletes=true with comma-separated
DELETE key
+ TestSpec.forStrategy(
+ "Valid produces_full_deletes=true with
comma-separated DELETE",
+ TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+ .calledWithArgumentTypes(
+ TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE,
BOOLEAN_TYPE)
+ .calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
+ .calledWithLiteralAt(1, ColumnList.of("op"))
+ .calledWithLiteralAt(2, Map.of("INSERT, DELETE", "X"))
+ .calledWithLiteralAt(3, true)
+ .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE, BOOLEAN_TYPE),
+
+ // Valid: produces_full_deletes=false with op_mapping that
omits DELETE
+ TestSpec.forStrategy(
+ "Valid produces_full_deletes=false with no
DELETE in mapping",
+ TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+ .calledWithArgumentTypes(
+ TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE,
BOOLEAN_TYPE)
+ .calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
+ .calledWithLiteralAt(1, ColumnList.of("op"))
+ .calledWithLiteralAt(2, Map.of("INSERT, UPDATE_AFTER",
"X"))
+ .calledWithLiteralAt(3, false)
+ .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE, BOOLEAN_TYPE),
+
+ // Error: produces_full_deletes=true with op_mapping that
strips DELETE
+ TestSpec.forStrategy(
+ "produces_full_deletes=true rejected when
op_mapping omits DELETE",
+ TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+ .calledWithArgumentTypes(
+ TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE,
BOOLEAN_TYPE)
+ .calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
+ .calledWithLiteralAt(1, ColumnList.of("op"))
+ .calledWithLiteralAt(2, Map.of("INSERT, UPDATE_AFTER",
"X"))
+ .calledWithLiteralAt(3, true)
+ .expectErrorMessage(
+ "Invalid 'produces_full_deletes' for
TO_CHANGELOG: the active "
+ + "'op_mapping' does not map DELETE
rows"),
+
+ // Error: multi-column descriptor
Review Comment:
```suggestion
// Error: multi-column descriptor for `op`
```
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -598,4 +599,220 @@ public class ToChangelogTestPrograms {
ValidationException.class,
"Duplicate change operation: 'DELETE'")
.build();
+
+ public static final TableTestProgram
PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT =
+ TableTestProgram.of(
+
"to-changelog-produces-full-deletes-on-append-only-input",
+ "fails when produces_full_deletes=true on an input
that never emits DELETE rows")
+ .setupTableSource(SIMPLE_SOURCE)
+ .runFailingSql(
+ "SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t, "
+ + "produces_full_deletes => true)",
+ ValidationException.class,
+ "the input table only produces [INSERT] and never
emits DELETE rows")
+ .build();
+
+ //
--------------------------------------------------------------------------------------------
+ // Row semantics x delete handling matrix
+ //
--------------------------------------------------------------------------------------------
+
+ public static final TableTestProgram ROW_SEM_PARTIAL_DELETES =
Review Comment:
Let's rename the tests so it matches the naming style we have
...
RETRACT_PRODUCES_PARTIAL_DELETES
UPSERT_PRODUCES_FULL_DELETES
UPSERT_PRODUCES_PARTIAL_DELETES
RETRACT_PARTITION_BY_PRODUCES_PARTIAL_DELETES
RETRACT_PARTITION_BY_PRODUCES_FULL_DELETES
UPSERT_PARTITION_BY_PRODUCES_FULL_DELETES
...
This makes it easier for us to navigate tests
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -598,4 +599,220 @@ public class ToChangelogTestPrograms {
ValidationException.class,
"Duplicate change operation: 'DELETE'")
.build();
+
+ public static final TableTestProgram
PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT =
Review Comment:
Move this to error validation tests section and rename so it matches other
tests
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -204,9 +204,18 @@ public interface PartitionedTable {
* descriptor("deleted").asArgument("op"),
* map("INSERT, UPDATE_AFTER", "false", "DELETE",
"true").asArgument("op_mapping")
* );
+ *
+ * // Opt out of full-delete semantics. The default (true) requires
fully-populated DELETE
+ * // rows and inserts a ChangelogNormalize for upsert sources. When
false, the function
+ * // preserves the partition key on DELETE rows and nulls the rest, which
avoids the
+ * // stateful normalization operator upstream.
+ * Table result = table
+ * .partitionBy($("id"))
+ * .toChangelog(lit(false).asArgument("produces_full_deletes"));
Review Comment:
Let's keep the api usage simple with only the info the user needs. More
details can be found in the documentation
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java:
##########
@@ -128,6 +128,28 @@ public interface TableSemantics {
*/
Optional<ChangelogMode> changelogMode();
+ /**
+ * Upsert key columns derived from the passed table's metadata.
+ *
+ * <p>Returns 0-based column indices that uniquely identify a row for
upsert semantics. This is
+ * distinct from {@link #partitionByColumns()}: partition keys describe
distribution and
+ * co-location, upsert keys describe row identity. Useful for functions
that need to emit
+ * key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER pairs, or route
CDC events without
+ * forcing the caller to repeat the key via {@code PARTITION BY}.
+ *
+ * <p>Returns an empty array when no upsert key is derivable (e.g., a pure
append-only source)
+ * or when the planner has not yet computed metadata (during type
inference).
+ *
+ * <p>If the planner derives multiple candidate upsert keys for the same
input (e.g., a table
+ * with several unique constraints), only one is returned. The planner
today picks the candidate
+ * with the smallest number of columns, but ties between equal-cardinality
keys are resolved by
+ * an implementation detail and may change across releases. Callers must
not rely on which
+ * candidate is returned when more than one exists.
Review Comment:
This would also be one of the reasons to pass down all upsert keys so that
users decide which one they want to use
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java:
##########
@@ -128,6 +128,28 @@ public interface TableSemantics {
*/
Optional<ChangelogMode> changelogMode();
+ /**
+ * Upsert key columns derived from the passed table's metadata.
+ *
+ * <p>Returns 0-based column indices that uniquely identify a row for
upsert semantics. This is
+ * distinct from {@link #partitionByColumns()}: partition keys describe
distribution and
+ * co-location, upsert keys describe row identity. Useful for functions
that need to emit
+ * key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER pairs, or route
CDC events without
+ * forcing the caller to repeat the key via {@code PARTITION BY}.
+ *
+ * <p>Returns an empty array when no upsert key is derivable (e.g., a pure
append-only source)
+ * or when the planner has not yet computed metadata (during type
inference).
Review Comment:
append-only sources can actually have a primary and an upsert key
```suggestion
* <p>Returns an empty array when no upsert key is derivable
* or when the planner has not yet computed metadata (during type
inference).
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java:
##########
@@ -187,12 +198,63 @@ private static Optional<List<DataType>>
validateOpMappingKeys(
return Optional.empty();
}
- private static String resolveOpColumnName(final CallContext callContext) {
- return callContext
- .getArgumentValue(1, ColumnList.class)
- .filter(cl -> !cl.getNames().isEmpty())
- .map(cl -> cl.getNames().get(0))
- .orElse(DEFAULT_OP_COLUMN_NAME);
+ @SuppressWarnings("rawtypes")
+ private static Optional<List<DataType>> validateProducesFullDeletes(
+ final CallContext callContext, final boolean throwOnFailure) {
+ final boolean isExplicit =
!callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES);
+ if (!isExplicit) {
+ return Optional.empty();
+ }
+ if (!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) {
+ return callContext.fail(
+ throwOnFailure,
+ "The 'produces_full_deletes' argument must be a constant
BOOLEAN literal.");
+ }
+ final boolean producesFullDeletes =
+ callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES,
Boolean.class).orElse(true);
+ if (!producesFullDeletes) {
+ return Optional.empty();
+ }
+ // The check against the input changelog mode lives in the function
constructor since
+ // TableSemantics#changelogMode() returns empty here at type-inference
time. The mapping
+ // check below only needs the literal op_mapping argument, so it lives
here. Only runs
+ // when the user explicitly set produces_full_deletes=true; the
default true is not
+ // validated since it is a safe no-op for any input.
+ final Optional<Map> opMapping =
callContext.getArgumentValue(ARG_OP_MAPPING, Map.class);
+ if (opMapping.isPresent() && !mapsDelete((Map<String, String>)
opMapping.get())) {
+ return callContext.fail(
+ throwOnFailure,
+ "Invalid 'produces_full_deletes' for TO_CHANGELOG: the
active 'op_mapping' "
Review Comment:
nice
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java:
##########
@@ -128,6 +128,28 @@ public interface TableSemantics {
*/
Optional<ChangelogMode> changelogMode();
+ /**
+ * Upsert key columns derived from the passed table's metadata.
+ *
+ * <p>Returns 0-based column indices that uniquely identify a row for
upsert semantics. This is
+ * distinct from {@link #partitionByColumns()}: partition keys describe
distribution and
+ * co-location, upsert keys describe row identity. Useful for functions
that need to emit
+ * key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER pairs, or route
CDC events without
+ * forcing the caller to repeat the key via {@code PARTITION BY}.
Review Comment:
```suggestion
* <p>Returns 0-based column indices that uniquely identify a row for
upsert semantics. This is
* distinct from {@link #partitionByColumns()}: partition keys describe
distribution and
* co-location, upsert keys describe row identity. Useful for functions
that need to emit
* key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER pairs, or want
to have a unique identifier to interact with state.
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,76 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```
+#### Delete handling
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and
what the planner requires from the input. The matrix below shows each
combination with `PARTITION BY` (set semantics) and without (row semantics).
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted
upstream to materialize the full pre-image from state. For sources that already
emit a full pre-image (e.g. retract), the flag is a no-op. The function then
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
+```
+
+**Set semantics** (`PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
+```
+
+##### `produces_full_deletes => false`
+
+The planner skips `ChangelogNormalize` and the function emits partial DELETE
rows. This avoids the stateful normalization operator for upsert sources (e.g.
Kafka compacted topics) where the full pre-image is not needed downstream.
Requires an upsert key (row semantics) or `PARTITION BY` (set semantics);
otherwise the call is rejected with a validation error.
Review Comment:
I think we don't have a good explanation of the upsert key in the docs. It'd
be good to have it and then link it in the places where we mention it for the
users
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java:
##########
Review Comment:
```suggestion
"Invalid target mapping for argument 'op_mapping'. "
+ "Unknown change operation: '%s'. Operations are case-sensitive. Valid
values are: %s.",
```
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -598,4 +599,220 @@ public class ToChangelogTestPrograms {
ValidationException.class,
"Duplicate change operation: 'DELETE'")
.build();
+
+ public static final TableTestProgram
PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT =
+ TableTestProgram.of(
+
"to-changelog-produces-full-deletes-on-append-only-input",
+ "fails when produces_full_deletes=true on an input
that never emits DELETE rows")
+ .setupTableSource(SIMPLE_SOURCE)
+ .runFailingSql(
+ "SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t, "
+ + "produces_full_deletes => true)",
+ ValidationException.class,
+ "the input table only produces [INSERT] and never
emits DELETE rows")
+ .build();
+
+ //
--------------------------------------------------------------------------------------------
+ // Row semantics x delete handling matrix
+ //
--------------------------------------------------------------------------------------------
+
+ public static final TableTestProgram ROW_SEM_PARTIAL_DELETES =
+ TableTestProgram.of(
+ "to-changelog-row-sem-partial-deletes",
+ "row semantics: produces_full_deletes=false skips
ChangelogNormalize and a partial DELETE row from the input passes through
unchanged")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT")
+ .addMode(ChangelogMode.all())
+ .producedValues(
+ Row.ofKind(RowKind.INSERT,
"Alice", 10L),
+ Row.ofKind(RowKind.INSERT, "Bob",
20L),
+ Row.ofKind(RowKind.UPDATE_BEFORE,
"Alice", 10L),
+ Row.ofKind(RowKind.UPDATE_AFTER,
"Alice", 30L),
+ Row.ofKind(RowKind.DELETE, "Bob",
null))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("op STRING", "name STRING",
"score BIGINT")
+ .consumedValues(
+ "+I[INSERT, Alice, 10]",
+ "+I[INSERT, Bob, 20]",
+ "+I[UPDATE_BEFORE, Alice, 10]",
+ "+I[UPDATE_AFTER, Alice, 30]",
+ "+I[DELETE, Bob, null]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t, "
+ + "produces_full_deletes => false)")
+ .build();
+
+ public static final TableTestProgram ROW_SEM_FORCE_FULL_DELETES =
+ TableTestProgram.of(
+ "to-changelog-row-sem-force-full-deletes",
+ "row semantics: produces_full_deletes=true forces
ChangelogNormalize to materialize the full DELETE row from an upsert source
emitting key-only deletes")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT")
+ .addMode(ChangelogMode.upsert())
+ .producedValues(
+ Row.ofKind(RowKind.INSERT,
"Alice", 10L),
+ // Key-only delete:
ChangelogNormalize fills the row.
+ Row.ofKind(RowKind.DELETE,
"Alice", null))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("op STRING", "name STRING",
"score BIGINT")
+ .consumedValues(
+ "+I[INSERT, Alice, 10]",
"+I[DELETE, Alice, 10]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t, "
+ + "produces_full_deletes => true)")
+ .build();
+
+ public static final TableTestProgram
ROW_SEM_PARTIAL_DELETES_VIA_UPSERT_KEY =
+ TableTestProgram.of(
+
"to-changelog-row-sem-partial-deletes-via-upsert-key",
+ "row semantics with single-column upsert key + "
+ + "produces_full_deletes=false: DELETE
preserves the key "
+ + "column and nulls the rest without
requiring PARTITION BY")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT")
+ .addMode(ChangelogMode.upsert())
+ .producedValues(
+ Row.ofKind(RowKind.INSERT,
"Alice", 10L),
+ Row.ofKind(RowKind.DELETE,
"Alice", 10L))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("op STRING", "name STRING",
"score BIGINT")
+ .consumedValues(
+ "+I[INSERT, Alice, 10]",
"+I[DELETE, Alice, null]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t, "
+ + "produces_full_deletes => false)")
+ .build();
+
+ //
--------------------------------------------------------------------------------------------
+ // Set semantics x delete handling matrix
+ //
--------------------------------------------------------------------------------------------
+
+ public static final TableTestProgram SET_SEM_FORCE_PARTIAL_DELETES =
+ TableTestProgram.of(
+ "to-changelog-set-sem-force-partial-deletes",
+ "set semantics: produces_full_deletes=false nulls
non-partition-key columns on DELETE even when the input row is fully populated")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT")
+ .addMode(ChangelogMode.all())
+ .producedValues(
+ Row.ofKind(RowKind.INSERT,
"Alice", 10L),
+ Row.ofKind(RowKind.INSERT, "Bob",
20L),
+ Row.ofKind(RowKind.UPDATE_AFTER,
"Alice", 30L),
+ Row.ofKind(RowKind.DELETE, "Bob",
20L))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("name STRING", "op STRING",
"score BIGINT")
+ .consumedValues(
+ "+I[Alice, INSERT, 10]",
+ "+I[Bob, INSERT, 20]",
+ "+I[Alice, UPDATE_AFTER, 30]",
+ "+I[Bob, DELETE, null]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t PARTITION BY name,"
+ + "produces_full_deletes => false)")
+ .build();
+
+ public static final TableTestProgram SET_SEM_PARTIAL_DELETES =
+ TableTestProgram.of(
+ "to-changelog-set-sem-partial-deletes",
+ "set semantics: produces_full_deletes=false
(default) lets a partial DELETE row from the input pass through with
non-partition-key columns null")
Review Comment:
This is wrong, the default is true. Is this test passing?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]