twalthr commented on code in PR #28235:
URL: https://github.com/apache/flink/pull/28235#discussion_r3316605228
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,90 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```
+#### Upsert key
+
+An **upsert key** is a column or set of columns that uniquely identifies a row
across its lifecycle in a changelog. It is what downstream operators and sinks
use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to.
+
+The planner derives the upsert key from the input table:
+
+* A declared `PRIMARY KEY` on the source table when reading directly.
+* The grouping columns of an upstream `GROUP BY <key>`.
+* The keys propagated by operators that preserve them (e.g. lookup joins,
calc-projections that keep the key columns).
Review Comment:
```suggestion
* The grouping columns of an upstream `GROUP BY <key>` or `PARTITION BY
<key>`.
* The keys propagated by operators that preserve them (e.g. lookup joins,
projections that keep the key columns).
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,90 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```
+#### Upsert key
+
+An **upsert key** is a column or set of columns that uniquely identifies a row
across its lifecycle in a changelog. It is what downstream operators and sinks
use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to.
+
+The planner derives the upsert key from the input table:
+
+* A declared `PRIMARY KEY` on the source table when reading directly.
+* The grouping columns of an upstream `GROUP BY <key>`.
+* The keys propagated by operators that preserve them (e.g. lookup joins,
calc-projections that keep the key columns).
+
+When no upsert key can be derived (e.g. a plain append-only source with no key
constraint and no grouping upstream), the input has no row identity and
downstream operators must treat it as append-only or fall back to retract
semantics.
+
+`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve
when emitting partial DELETE rows. See [Full vs partial
deletes](#full-vs-partial-deletes) below.
+
+#### Full vs partial deletes
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and
what the planner requires from the input. The matrix below shows each
combination with `PARTITION BY` (set semantics) and without (row semantics).
When `false`, the function relies on the input table's [upsert
key](#upsert-key) to decide which columns to preserve.
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted
upstream to materialize the full pre-image from state. For sources that already
emit a full pre-image (e.g. retract), the flag is a no-op. The function then
passes the input row through unchanged on DELETE.
Review Comment:
```suggestion
The planner requires fully-populated DELETE rows on the input. For upsert
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted
upstream to calculate the full pre-image from state. For sources that already
emit a full pre-image (e.g. retract), the flag is a no-op. The function then
passes the input row through unchanged on DELETE.
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,90 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```
+#### Upsert key
+
+An **upsert key** is a column or set of columns that uniquely identifies a row
across its lifecycle in a changelog. It is what downstream operators and sinks
use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to.
+
+The planner derives the upsert key from the input table:
+
+* A declared `PRIMARY KEY` on the source table when reading directly.
+* The grouping columns of an upstream `GROUP BY <key>`.
+* The keys propagated by operators that preserve them (e.g. lookup joins,
calc-projections that keep the key columns).
+
+When no upsert key can be derived (e.g. a plain append-only source with no key
constraint and no grouping upstream), the input has no row identity and
downstream operators must treat it as append-only or fall back to retract
semantics.
+
+`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve
when emitting partial DELETE rows. See [Full vs partial
deletes](#full-vs-partial-deletes) below.
Review Comment:
I would drop this paragraph. Imho it doesn't contain any interesting
information.
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,90 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```
+#### Upsert key
+
+An **upsert key** is a column or set of columns that uniquely identifies a row
across its lifecycle in a changelog. It is what downstream operators and sinks
use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to.
+
+The planner derives the upsert key from the input table:
+
+* A declared `PRIMARY KEY` on the source table when reading directly.
+* The grouping columns of an upstream `GROUP BY <key>`.
+* The keys propagated by operators that preserve them (e.g. lookup joins,
calc-projections that keep the key columns).
+
+When no upsert key can be derived (e.g. a plain append-only source with no key
constraint and no grouping upstream), the input has no row identity and
downstream operators must treat it as append-only or fall back to retract
semantics.
+
+`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve
when emitting partial DELETE rows. See [Full vs partial
deletes](#full-vs-partial-deletes) below.
+
+#### Full vs partial deletes
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and
what the planner requires from the input. The matrix below shows each
combination with `PARTITION BY` (set semantics) and without (row semantics).
When `false`, the function relies on the input table's [upsert
key](#upsert-key) to decide which columns to preserve.
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted
upstream to materialize the full pre-image from state. For sources that already
emit a full pre-image (e.g. retract), the flag is a no-op. The function then
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
Review Comment:
I think this would make it clearer.
```suggestion
-- Upsert source: -D[id:5, name:null] (key-only).
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,90 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```
+#### Upsert key
+
+An **upsert key** is a column or set of columns that uniquely identifies a row
across its lifecycle in a changelog. It is what downstream operators and sinks
use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to.
+
+The planner derives the upsert key from the input table:
+
+* A declared `PRIMARY KEY` on the source table when reading directly.
+* The grouping columns of an upstream `GROUP BY <key>`.
+* The keys propagated by operators that preserve them (e.g. lookup joins,
calc-projections that keep the key columns).
+
+When no upsert key can be derived (e.g. a plain append-only source with no key
constraint and no grouping upstream), the input has no row identity and
downstream operators must treat it as append-only or fall back to retract
semantics.
+
+`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve
when emitting partial DELETE rows. See [Full vs partial
deletes](#full-vs-partial-deletes) below.
+
+#### Full vs partial deletes
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and
what the planner requires from the input. The matrix below shows each
combination with `PARTITION BY` (set semantics) and without (row semantics).
When `false`, the function relies on the input table's [upsert
key](#upsert-key) to decide which columns to preserve.
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted
upstream to materialize the full pre-image from state. For sources that already
emit a full pre-image (e.g. retract), the flag is a no-op. The function then
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
+```
+
+**Set semantics** (`PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
+```
+
+##### `produces_full_deletes => false`
+
+The planner skips `ChangelogNormalize` and the function emits partial DELETE
rows. This avoids the stateful normalization operator for upsert sources (e.g.
Kafka compacted topics) where the full pre-image is not needed downstream.
Requires an [upsert key](#upsert-key) to be present for the input table (row
semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected
with a validation error.
Review Comment:
```suggestion
The planner skips `ChangelogNormalize` and the function emits partial DELETE
rows. This avoids the stateful normalization operator for upsert sources (e.g.
Kafka compacted topics) where the full pre-image is not needed downstream. This
requires an [upsert key](#upsert-key) to be present for the input table (row
semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected
with a validation error.
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java:
##########
@@ -46,39 +42,30 @@
@Internal
public final class ToChangelogTypeStrategy {
- private static final String DEFAULT_OP_COLUMN_NAME = "op";
+ // Positional argument indexes for TO_CHANGELOG. Must match the order of
StaticArguments
+ // registered in BuiltInFunctionDefinitions#TO_CHANGELOG; changing one
without the other
+ // silently breaks argument resolution.
+ public static final int ARG_TABLE = 0;
Review Comment:
```suggestion
public static final int ARG_INPUT = 0;
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java:
##########
@@ -128,6 +129,29 @@ public interface TableSemantics {
*/
Optional<ChangelogMode> changelogMode();
+ /**
+ * Upsert key candidates derived from the passed table's metadata.
+ *
+ * <p>Returns a list of 0-based column index arrays that uniquely identify
a row for upsert
+ * semantics. This is distinct from {@link #partitionByColumns()}:
partition keys describe
+ * distribution and co-location, upsert keys describe row identity. Useful
for functions that
+ * need to emit key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER
pairs, or want to have a
+ * unique identifier to interact with state.
+ *
+ * <p>Returns an empty list when no upsert key is derivable, or when the
planner has not yet
Review Comment:
Instead of much text. An example might be useful here to show the difference
between PARTITION BY and upsert key
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java:
##########
@@ -128,6 +129,29 @@ public interface TableSemantics {
*/
Optional<ChangelogMode> changelogMode();
+ /**
+ * Upsert key candidates derived from the passed table's metadata.
+ *
+ * <p>Returns a list of 0-based column index arrays that uniquely identify
a row for upsert
+ * semantics. This is distinct from {@link #partitionByColumns()}:
partition keys describe
+ * distribution and co-location, upsert keys describe row identity. Useful
for functions that
+ * need to emit key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER
pairs, or want to have a
+ * unique identifier to interact with state.
+ *
+ * <p>Returns an empty list when no upsert key is derivable, or when the
planner has not yet
+ * computed metadata (during type inference).
+ *
+ * <p>When the planner derives multiple candidate upsert keys for the same
input (e.g., a table
+ * with several primary key constraints), all of them are returned.
Picking which candidate to
Review Comment:
> several primary key constraints
a table can have only one primary key constraint, do we have a better
example?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java:
##########
@@ -348,14 +348,29 @@ public CallContext toCallContext(
@Nullable List<Integer> inputTimeColumns,
@Nullable List<ChangelogMode> inputChangelogModes,
@Nullable ChangelogMode outputChangelogMode) {
+ return toCallContext(
+ call, inputTimeColumns, inputChangelogModes,
outputChangelogMode, null);
+ }
+
+ /**
+ * Variant that additionally exposes the call's input upsert keys. Used by
the streaming codegen
+ * path so PTFs can specialize themselves on the input's row-identity
information.
+ */
+ public CallContext toCallContext(
Review Comment:
is the old function still used, otherwise we can drop the overload. I have
seen Claude doing this regularly.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java:
##########
@@ -187,12 +198,63 @@ private static Optional<List<DataType>>
validateOpMappingKeys(
return Optional.empty();
}
- private static String resolveOpColumnName(final CallContext callContext) {
- return callContext
- .getArgumentValue(1, ColumnList.class)
- .filter(cl -> !cl.getNames().isEmpty())
- .map(cl -> cl.getNames().get(0))
- .orElse(DEFAULT_OP_COLUMN_NAME);
+ @SuppressWarnings("rawtypes")
+ private static Optional<List<DataType>> validateProducesFullDeletes(
+ final CallContext callContext, final boolean throwOnFailure) {
+ final boolean isExplicit =
!callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES);
+ if (!isExplicit) {
+ return Optional.empty();
+ }
+ if (!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) {
+ return callContext.fail(
+ throwOnFailure,
+ "The 'produces_full_deletes' argument must be a constant
BOOLEAN literal.");
+ }
+ final boolean producesFullDeletes =
+ callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES,
Boolean.class).orElse(true);
+ if (!producesFullDeletes) {
+ return Optional.empty();
+ }
+ // The check against the input changelog mode lives in the function
constructor since
+ // TableSemantics#changelogMode() returns empty here at type-inference
time. The mapping
+ // check below only needs the literal op_mapping argument, so it lives
here. Only runs
+ // when the user explicitly set produces_full_deletes=true; the
default true is not
+ // validated since it is a safe no-op for any input.
+ final Optional<Map> opMapping =
callContext.getArgumentValue(ARG_OP_MAPPING, Map.class);
+ if (opMapping.isPresent() && !mapsDelete((Map<String, String>)
opMapping.get())) {
Review Comment:
I find this check a bit confusing. If I set `produces_full_deletes => true`,
I'm allowed to NOT contain DELETE, But if I set `produces_full_deletes =>
false`, I'm not allowed to. Remove this check all together, or
`produces_partial_deletes => false` which defaults to false, at it would be
common for option arguments.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -598,4 +598,192 @@ public class ToChangelogTestPrograms {
ValidationException.class,
"Duplicate change operation: 'DELETE'")
.build();
+
+ public static final TableTestProgram
INVALID_PRODUCES_FULL_DELETES_FOR_APPEND_ONLY =
+ TableTestProgram.of(
+
"to-changelog-invalid-produces-full-deletes-for-append-only",
+ "fails when produces_full_deletes=true on an input
that never emits DELETE rows")
+ .setupTableSource(SIMPLE_SOURCE)
+ .runFailingSql(
+ "SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t, "
+ + "produces_full_deletes => true)",
+ ValidationException.class,
+ "the input table only produces [INSERT] and never
emits DELETE rows")
+ .build();
+
+ //
--------------------------------------------------------------------------------------------
+ // Full vs partial deletes matrix (input kind x PARTITION BY x
produces_full_deletes)
+ //
--------------------------------------------------------------------------------------------
+
+ public static final TableTestProgram RETRACT_PRODUCES_PARTIAL_DELETES =
+ TableTestProgram.of(
+ "to-changelog-retract-produces-partial-deletes",
+ "retract input in row semantics with
produces_full_deletes=false: skips ChangelogNormalize and the partial DELETE
row from the input passes through unchanged")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT")
Review Comment:
add a test with NOT NULL on input columns
```suggestion
"name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT NOT NULL")
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java:
##########
@@ -310,17 +326,21 @@ private RuntimeTableSemantics createRuntimeTableSemantics(
final int timeColumn =
inputTimeColumns.get(tableArgCall.getInputIndex());
+ final int inputIndex = tableArgCall.getInputIndex();
Review Comment:
put this variable higher and also improve the line above
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,90 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```
+#### Upsert key
+
+An **upsert key** is a column or set of columns that uniquely identifies a row
across its lifecycle in a changelog. It is what downstream operators and sinks
use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to.
+
+The planner derives the upsert key from the input table:
+
+* A declared `PRIMARY KEY` on the source table when reading directly.
+* The grouping columns of an upstream `GROUP BY <key>`.
+* The keys propagated by operators that preserve them (e.g. lookup joins,
calc-projections that keep the key columns).
+
+When no upsert key can be derived (e.g. a plain append-only source with no key
constraint and no grouping upstream), the input has no row identity and
downstream operators must treat it as append-only or fall back to retract
semantics.
+
+`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve
when emitting partial DELETE rows. See [Full vs partial
deletes](#full-vs-partial-deletes) below.
+
+#### Full vs partial deletes
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and
what the planner requires from the input. The matrix below shows each
combination with `PARTITION BY` (set semantics) and without (row semantics).
When `false`, the function relies on the input table's [upsert
key](#upsert-key) to decide which columns to preserve.
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted
upstream to materialize the full pre-image from state. For sources that already
emit a full pre-image (e.g. retract), the flag is a no-op. The function then
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
Review Comment:
```suggestion
-- ChangelogNormalize calculates the full pre-image from state.
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -397,6 +399,90 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```
+#### Upsert key
+
+An **upsert key** is a column or set of columns that uniquely identifies a row
across its lifecycle in a changelog. It is what downstream operators and sinks
use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to.
+
+The planner derives the upsert key from the input table:
+
+* A declared `PRIMARY KEY` on the source table when reading directly.
+* The grouping columns of an upstream `GROUP BY <key>`.
+* The keys propagated by operators that preserve them (e.g. lookup joins,
calc-projections that keep the key columns).
+
+When no upsert key can be derived (e.g. a plain append-only source with no key
constraint and no grouping upstream), the input has no row identity and
downstream operators must treat it as append-only or fall back to retract
semantics.
+
+`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve
when emitting partial DELETE rows. See [Full vs partial
deletes](#full-vs-partial-deletes) below.
+
+#### Full vs partial deletes
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and
what the planner requires from the input. The matrix below shows each
combination with `PARTITION BY` (set semantics) and without (row semantics).
When `false`, the function relies on the input table's [upsert
key](#upsert-key) to decide which columns to preserve.
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted
upstream to materialize the full pre-image from state. For sources that already
emit a full pre-image (e.g. retract), the flag is a no-op. The function then
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
+```
+
+**Set semantics** (`PARTITION BY`):
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
+```
+
+##### `produces_full_deletes => false`
+
+The planner skips `ChangelogNormalize` and the function emits partial DELETE
rows. This avoids the stateful normalization operator for upsert sources (e.g.
Kafka compacted topics) where the full pre-image is not needed downstream.
Requires an [upsert key](#upsert-key) to be present for the input table (row
semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected
with a validation error.
+
+**Row semantics** (no `PARTITION BY`): the function preserves the
planner-derived upsert key columns on DELETE rows and nulls the rest. The
upsert key is typically a declared `PRIMARY KEY` when directly reading from a
source or the key provided in a `GROUP BY <key>`.
+
+```sql
+-- Upsert source with PRIMARY KEY (id): -D[id:5] (key-only).
+-- Output: +I[op:'DELETE', id:5, name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, produces_full_deletes
=> false)
+
+-- Retract source with PRIMARY KEY (id): -D[id:5, name:'Alice'] (full
pre-image).
+-- Output: +I[op:'DELETE', id:5, name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source,
produces_full_deletes => false)
+```
+
+**Set semantics** (`PARTITION BY`): the function preserves the partition key
and nulls every non-partition-key column on DELETE rows. The key used as the
partition-key column should be the unique key that will be used as the record
identifier. This matches the shape expected by upsert sinks and Kafka compacted
topics.
Review Comment:
```suggestion
**Set semantics** (`PARTITION BY`): the function preserves the partition key
and nulls every non-partition-key column on DELETE rows. The key used as the
partition-key column should be the unique key that will be used as the record
identifier. This matches the shape expected by upsert sinks such as Kafka
compacted topics.
```
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java:
##########
@@ -781,6 +781,11 @@ public Optional<ChangelogMode> changelogMode() {
return Optional.empty();
}
+ @Override
+ public List<int[]> upsertKeyColumns() {
Review Comment:
Usually we use array of array for index paths. All connector interfaces do
it this way.
```suggestion
public int[][] upsertKeyColumns() {
```
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -204,9 +204,19 @@ public interface PartitionedTable {
* descriptor("deleted").asArgument("op"),
* map("INSERT, UPDATE_AFTER", "false", "DELETE",
"true").asArgument("op_mapping")
* );
+ *
+ * // Opt out of full-delete semantics. When `true` (default), DELETE rows
carry the full
+ * // pre-image. When `false`, only the identifying key columns are
preserved and the rest
+ * // are nulled. See [Full vs partial deletes](
+ * //
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/changelog/#full-vs-partial-deletes)
Review Comment:
This links in code are outdated rather quickly.
```suggestion
* in documentation.
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java:
##########
@@ -187,12 +198,63 @@ private static Optional<List<DataType>>
validateOpMappingKeys(
return Optional.empty();
}
- private static String resolveOpColumnName(final CallContext callContext) {
- return callContext
- .getArgumentValue(1, ColumnList.class)
- .filter(cl -> !cl.getNames().isEmpty())
- .map(cl -> cl.getNames().get(0))
- .orElse(DEFAULT_OP_COLUMN_NAME);
+ @SuppressWarnings("rawtypes")
+ private static Optional<List<DataType>> validateProducesFullDeletes(
+ final CallContext callContext, final boolean throwOnFailure) {
+ final boolean isExplicit =
!callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES);
+ if (!isExplicit) {
+ return Optional.empty();
+ }
+ if (!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) {
+ return callContext.fail(
+ throwOnFailure,
+ "The 'produces_full_deletes' argument must be a constant
BOOLEAN literal.");
+ }
+ final boolean producesFullDeletes =
+ callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES,
Boolean.class).orElse(true);
+ if (!producesFullDeletes) {
+ return Optional.empty();
+ }
+ // The check against the input changelog mode lives in the function
constructor since
Review Comment:
```suggestion
// The check against the input changelog mode lives in the runtime
function constructor since
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]