gustavodemorais commented on code in PR #28164:
URL: https://github.com/apache/flink/pull/28164#discussion_r3258993550
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,34 @@ Prefer row semantics, when possible. `PARTITION BY` is
only necessary when downs
If you are producing an upsert table — that is, you are emitting
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the
partition key you select here will be considered both the primary key and the
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary
key exactly.
+#### Upsert output
Review Comment:
```suggestion
#### Upsert table
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,34 @@ Prefer row semantics, when possible. `PARTITION BY` is
only necessary when downs
If you are producing an upsert table — that is, you are emitting
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the
partition key you select here will be considered both the primary key and the
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary
key exactly.
+#### Upsert output
+
+When `PARTITION BY` is combined with an `op_mapping` that does NOT include
`UPDATE_BEFORE`, the output changelog is an upsert table keyed on the partition
columns. Each input row produces an `INSERT`, `UPDATE_AFTER`, or `DELETE` event
with the partition key acting as the upsert key.
+
+```sql
+-- Upsert input: INSERT / UPDATE_AFTER / DELETE only
+-- +I[id:1, op:'INSERT', name:'Alice']
+-- +I[id:2, op:'INSERT', name:'Bob']
+-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2']
+-- +I[id:2, op:'DELETE', name:'Bob']
+
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE cdc_stream PARTITION BY id,
+ op_mapping => MAP[
+ 'INSERT', 'INSERT',
+ 'UPDATE_AFTER', 'UPDATE_AFTER',
+ 'DELETE', 'DELETE']
+)
+
+-- Output (upsert changelog, upsert key = id):
+-- +I[id:1, name:'Alice']
+-- +I[id:2, name:'Bob']
+-- +U[id:1, name:'Alice2']
+-- -D[id:2, name:'Bob']
+```
+
+Without `PARTITION BY`, or when the active `op_mapping` includes
`UPDATE_BEFORE`, the output remains a retract changelog.
Review Comment:
```suggestion
By default, without `PARTITION BY`, or when the active `op_mapping` includes
`UPDATE_BEFORE`, the output remains a retract changelog.
```
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -226,6 +226,18 @@ public interface PartitionedTable {
* <p>For row semantics (each row processed independently), use {@link
Table#fromChangelog} on
* the unpartitioned table.
*
+ * <p>Output changelog mode:
+ *
+ * <ul>
+ * <li><b>Retract</b> (default): the active {@code op_mapping} includes
{@code UPDATE_BEFORE}
+ * or no updates at all. The output emits {@code INSERT}, {@code
UPDATE_BEFORE}, {@code
+ * UPDATE_AFTER}, and {@code DELETE}.
+ * <li><b>Upsert</b>: the {@code op_mapping} maps to {@code
UPDATE_AFTER} without {@code
+ * UPDATE_BEFORE}. The output emits {@code INSERT}, {@code
UPDATE_AFTER}, and full {@code
Review Comment:
```suggestion
* UPDATE_BEFORE}. The output emits {@code INSERT}, {@code
UPDATE_AFTER}, and {@code
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -95,6 +98,50 @@ public Optional<List<DataType>> inferInputTypes(
return Optional.of(DataTypes.ROW(outputFields).notNull());
};
+ //
--------------------------------------------------------------------------------------------
+ // Changelog mode inference
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Emits an upsert changelog when the input is partitioned (set semantics)
and the resolved
+ * {@code op_mapping} maps to {@code UPDATE_AFTER} without {@code
UPDATE_BEFORE}. In all other
+ * cases the output is a retract changelog. When upsert mode is selected,
the partition key acts
+ * as the upsert key.
+ *
+ * <p>Upsert mode uses full deletes ({@link ChangelogMode#upsert(boolean)
upsert(false)})
+ * because the runtime forwards each input delete row with all fields
populated; only the {@link
+ * org.apache.flink.types.RowKind} is rewritten.
Review Comment:
```suggestion
* <p>Upsert mode uses full deletes by default ({@link
ChangelogMode#upsert(boolean) upsert(false)}). We currently only support
consuming from a changelog stream with full deletes (where not only the primary
keys, but all fields fields are populated).
```
nit: we could maybe even delete this and add only more info when we add
"consume_full_updates"
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java:
##########
@@ -182,10 +183,26 @@ public RelWriter explainTerms(RelWriter pw) {
for (Ord<RelNode> ord : Ord.zip(inputs)) {
pw.input("input#" + ord.i, ord.e);
}
- return pw.item("invocation", scan.getCall())
+ pw.item("invocation", scan.getCall())
.item("uid", uid)
.item("select", String.join(",", getRowType().getFieldNames()))
.item("rowType", getRowType());
+ final Set<ImmutableBitSet> upsertKeys =
Review Comment:
I don't think you want to do this. This would change the plan output for all
PTFs and you'd theoretically have to regenerate all PTF tests in the code base.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java:
##########
@@ -182,10 +183,26 @@ public RelWriter explainTerms(RelWriter pw) {
for (Ord<RelNode> ord : Ord.zip(inputs)) {
pw.input("input#" + ord.i, ord.e);
}
- return pw.item("invocation", scan.getCall())
+ pw.item("invocation", scan.getCall())
.item("uid", uid)
.item("select", String.join(",", getRowType().getFieldNames()))
.item("rowType", getRowType());
+ final Set<ImmutableBitSet> upsertKeys =
Review Comment:
You just want to output the upsertKeys in the plan, right? Then you want to
use/thinker your tests to use the testing utility that allows you do so instead
of changing the default explain for ptf nodes
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -226,6 +226,18 @@ public interface PartitionedTable {
* <p>For row semantics (each row processed independently), use {@link
Table#fromChangelog} on
* the unpartitioned table.
*
+ * <p>Output changelog mode:
+ *
+ * <ul>
+ * <li><b>Retract</b> (default): the active {@code op_mapping} includes
{@code UPDATE_BEFORE}
+ * or no updates at all. The output emits {@code INSERT}, {@code
UPDATE_BEFORE}, {@code
Review Comment:
```suggestion
* or only {@code INSERT} and {@code DELETE} pairs. The output
emits {@code INSERT}, {@code UPDATE_BEFORE}, {@code
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,34 @@ Prefer row semantics, when possible. `PARTITION BY` is
only necessary when downs
If you are producing an upsert table — that is, you are emitting
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the
partition key you select here will be considered both the primary key and the
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary
key exactly.
+#### Upsert output
+
+When `PARTITION BY` is combined with an `op_mapping` that does NOT include
`UPDATE_BEFORE`, the output changelog is an upsert table keyed on the partition
columns. Each input row produces an `INSERT`, `UPDATE_AFTER`, or `DELETE` event
with the partition key acting as the upsert key.
Review Comment:
```suggestion
To generate an `Upsert table`, the following requirements must be met:
* **Key Partitioning:** You must use `PARTITION BY <key>`, where the
partition key corresponds to the unique/primary key of the dataset.
* **Op Mapping Configuration:** The `op_mapping` must include `UPDATE_AFTER`
and must NOT include `UPDATE_BEFORE`.
**How it works:**
The engine assumes that the keys provided in the `PARTITION BY` clause
function as the unique upsert keys. The resulting output changelog becomes an
upsert table keyed on these partition columns. Each incoming row is evaluated
and produces `INSERT`, `UPDATE_AFTER`, or `DELETE` events, using the partition
key as the explicit upsert key. Therefore, if the incoming changelog contains
unique keys (such as a primary key), they **must** be used in the `PARTITION
BY` clause.
The FROM_CHANGELOG PTF assumes events arrived ordered. If the source itself
does not guarantee ordering for events for the same PARTITION BY keys, consider
using using ORDER BY <link>.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -226,6 +226,18 @@ public interface PartitionedTable {
* <p>For row semantics (each row processed independently), use {@link
Table#fromChangelog} on
* the unpartitioned table.
*
+ * <p>Output changelog mode:
+ *
+ * <ul>
+ * <li><b>Retract</b> (default): the active {@code op_mapping} includes
{@code UPDATE_BEFORE}
+ * or no updates at all. The output emits {@code INSERT}, {@code
UPDATE_BEFORE}, {@code
Review Comment:
```suggestion
* or no updates at all. The output possibly emits {@code INSERT},
{@code UPDATE_BEFORE}, {@code
```
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -226,6 +226,18 @@ public interface PartitionedTable {
* <p>For row semantics (each row processed independently), use {@link
Table#fromChangelog} on
* the unpartitioned table.
*
+ * <p>Output changelog mode:
+ *
+ * <ul>
+ * <li><b>Retract</b> (default): the active {@code op_mapping} includes
{@code UPDATE_BEFORE}
+ * or no updates at all. The output emits {@code INSERT}, {@code
UPDATE_BEFORE}, {@code
+ * UPDATE_AFTER}, and {@code DELETE}.
+ * <li><b>Upsert</b>: the {@code op_mapping} maps to {@code
UPDATE_AFTER} without {@code
Review Comment:
```suggestion
* <li><b>Upsert</b>: the {@code op_mapping} maps {@code UPDATE_AFTER}
without {@code
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -95,6 +98,50 @@ public Optional<List<DataType>> inferInputTypes(
return Optional.of(DataTypes.ROW(outputFields).notNull());
};
+ //
--------------------------------------------------------------------------------------------
+ // Changelog mode inference
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Emits an upsert changelog when the input is partitioned (set semantics)
and the resolved
+ * {@code op_mapping} maps to {@code UPDATE_AFTER} without {@code
UPDATE_BEFORE}. In all other
+ * cases the output is a retract changelog. When upsert mode is selected,
the partition key acts
+ * as the upsert key.
+ *
+ * <p>Upsert mode uses full deletes ({@link ChangelogMode#upsert(boolean)
upsert(false)})
+ * because the runtime forwards each input delete row with all fields
populated; only the {@link
+ * org.apache.flink.types.RowKind} is rewritten.
+ */
+ public static final ChangelogModeStrategy CHANGELOG_MODE_STRATEGY =
+ ctx -> isUpsertConfig(ctx) ? ChangelogMode.upsert(false) :
ChangelogMode.all();
+
+ /**
+ * Returns {@code true} when the FROM_CHANGELOG call should emit an upsert
changelog: the input
+ * table is partitioned AND the resolved {@code op_mapping} contains
{@code UPDATE_AFTER}
+ * without {@code UPDATE_BEFORE}. Falls back to {@code false} when the
mapping is absent or
+ * cannot be resolved as a literal, since the default mapping includes
both (retract).
Review Comment:
```suggestion
* cannot be resolved as a literal. The default mapping maps to retract
table.
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -95,6 +98,50 @@ public Optional<List<DataType>> inferInputTypes(
return Optional.of(DataTypes.ROW(outputFields).notNull());
};
+ //
--------------------------------------------------------------------------------------------
+ // Changelog mode inference
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Emits an upsert changelog when the input is partitioned (set semantics)
and the resolved
+ * {@code op_mapping} maps to {@code UPDATE_AFTER} without {@code
UPDATE_BEFORE}. In all other
+ * cases the output is a retract changelog. When upsert mode is selected,
the partition key acts
+ * as the upsert key.
+ *
+ * <p>Upsert mode uses full deletes ({@link ChangelogMode#upsert(boolean)
upsert(false)})
+ * because the runtime forwards each input delete row with all fields
populated; only the {@link
+ * org.apache.flink.types.RowKind} is rewritten.
+ */
+ public static final ChangelogModeStrategy CHANGELOG_MODE_STRATEGY =
+ ctx -> isUpsertConfig(ctx) ? ChangelogMode.upsert(false) :
ChangelogMode.all();
Review Comment:
The whole change here in this file is very nice and easy to read 🙂
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,34 @@ Prefer row semantics, when possible. `PARTITION BY` is
only necessary when downs
If you are producing an upsert table — that is, you are emitting
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the
partition key you select here will be considered both the primary key and the
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary
key exactly.
+#### Upsert output
Review Comment:
This is a fair point to keep this simple. I thought a bit and still think
it'd be good to have the section. We have retract as the default mode. The
changelog mode of a table is an user facing option which users are aware which
users are often dealing with. So a section telling them what they have to pay
attention to when they want to fabricate an upsert table makes sense imo
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java:
##########
@@ -200,6 +200,69 @@ public class FromChangelogTestPrograms {
+ "input => TABLE cdc_stream PARTITION BY
id)")
.build();
+ public static final TableTestProgram UPSERT_PARTITION_BY =
+ TableTestProgram.of(
+ "from-changelog-upsert-partition-by",
+ "PARTITION BY + op_mapping without UPDATE_BEFORE
produces an "
+ + "upsert changelog keyed on the partition
columns")
+ .setupTableSource(
+ SourceTestStep.newBuilder("cdc_stream")
+ .addSchema("name STRING", "id INT", "op
STRING")
+ .producedValues(
+ Row.of("Alice", 1, "INSERT"),
+ Row.of("Bob", 2, "INSERT"),
+ Row.of("Alice2", 1,
"UPDATE_AFTER"),
+ Row.of("Bob", 2, "DELETE"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("id INT PRIMARY KEY NOT
ENFORCED", "name STRING")
+ .consumedValues(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice"),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob"),
+ Row.ofKind(RowKind.UPDATE_AFTER,
1, "Alice2"),
+ Row.ofKind(RowKind.DELETE, 2,
"Bob"))
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+ + "input => TABLE cdc_stream PARTITION BY
id, "
+ + "op_mapping => MAP["
+ + "'INSERT', 'INSERT', "
+ + "'UPDATE_AFTER', 'UPDATE_AFTER', "
+ + "'DELETE', 'DELETE'])")
+ .build();
+
+ public static final TableTestProgram UPSERT_PARTITION_BY_CUSTOM_MAPPING =
Review Comment:
Agree with @fhueske
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]