raminqaf commented on code in PR #28164:
URL: https://github.com/apache/flink/pull/28164#discussion_r3259867576


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -95,6 +98,50 @@ public Optional<List<DataType>> inferInputTypes(
                 return Optional.of(DataTypes.ROW(outputFields).notNull());
             };
 
+    // 
--------------------------------------------------------------------------------------------
+    // Changelog mode inference
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Emits an upsert changelog when the input is partitioned (set semantics) 
and the resolved
+     * {@code op_mapping} maps to {@code UPDATE_AFTER} without {@code 
UPDATE_BEFORE}. In all other
+     * cases the output is a retract changelog. When upsert mode is selected, 
the partition key acts
+     * as the upsert key.
+     *
+     * <p>Upsert mode uses full deletes ({@link ChangelogMode#upsert(boolean) 
upsert(false)})
+     * because the runtime forwards each input delete row with all fields 
populated; only the {@link
+     * org.apache.flink.types.RowKind} is rewritten.
+     */
+    public static final ChangelogModeStrategy CHANGELOG_MODE_STRATEGY =
+            ctx -> isUpsertConfig(ctx) ? ChangelogMode.upsert(false) : 
ChangelogMode.all();
+
+    /**
+     * Returns {@code true} when the FROM_CHANGELOG call should emit an upsert 
changelog: the input
+     * table is partitioned AND the resolved {@code op_mapping} contains 
{@code UPDATE_AFTER}
+     * without {@code UPDATE_BEFORE}. Falls back to {@code false} when the 
mapping is absent or
+     * cannot be resolved as a literal, since the default mapping includes 
both (retract).
+     */
+    @SuppressWarnings("unchecked")
+    public static boolean isUpsertConfig(final ChangelogContext ctx) {
+        final boolean partitioned =
+                ctx.getTableSemantics(ARG_TABLE)
+                        .map(ts -> ts.partitionByColumns().length > 0)
+                        .orElse(false);
+        if (!partitioned) {
+            return false;
+        }
+        final Optional<Map> opMapping = ctx.getArgumentValue(ARG_OP_MAPPING, 
Map.class);
+        if (opMapping.isEmpty()) {
+            return false;
+        }
+        final Map<String, String> mapping = opMapping.get();
+        final boolean hasUpdateAfter =
+                mapping.values().stream().anyMatch(v -> 
UPDATE_AFTER.equals(v.trim()));

Review Comment:
   good catch! I filter the nulls before motching



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,34 @@ Prefer row semantics, when possible. `PARTITION BY` is 
only necessary when downs
 
 If you are producing an upsert table — that is, you are emitting 
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the 
partition key you select here will be considered both the primary key and the 
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary 
key exactly.
 
+#### Upsert output
+
+When `PARTITION BY` is combined with an `op_mapping` that does NOT include 
`UPDATE_BEFORE`, the output changelog is an upsert table keyed on the partition 
columns. Each input row produces an `INSERT`, `UPDATE_AFTER`, or `DELETE` event 
with the partition key acting as the upsert key.
+
+```sql
+-- Upsert input: INSERT / UPDATE_AFTER / DELETE only
+-- +I[id:1, op:'INSERT',       name:'Alice']
+-- +I[id:2, op:'INSERT',       name:'Bob']
+-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2']
+-- +I[id:2, op:'DELETE',       name:'Bob']
+
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream PARTITION BY id,
+  op_mapping => MAP[
+    'INSERT',       'INSERT',
+    'UPDATE_AFTER', 'UPDATE_AFTER',
+    'DELETE',       'DELETE']
+)
+
+-- Output (upsert changelog, upsert key = id):
+-- +I[id:1, name:'Alice']
+-- +I[id:2, name:'Bob']
+-- +U[id:1, name:'Alice2']
+-- -D[id:2, name:'Bob']
+```
+
+Without `PARTITION BY`, or when the active `op_mapping` includes 
`UPDATE_BEFORE`, the output remains a retract changelog.

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to