fhueske commented on code in PR #28199:
URL: https://github.com/apache/flink/pull/28199#discussion_r3272273190


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java:
##########
@@ -68,4 +70,30 @@ static TraitCondition not(final TraitCondition condition) {
         return new BuiltInCondition(
                 BuiltInCondition.Kind.NOT, List.of(condition), ctx -> 
!condition.test(ctx));
     }
+
+    /**
+     * True when the named {@code MAP<STRING, STRING>} scalar argument has a 
key that, after
+     * splitting on comma and trimming each part, equals {@code key}. Returns 
true when the argument
+     * is omitted, on the assumption that an absent argument means the 
function falls back to a
+     * default that includes all keys.
+     */
+    @SuppressWarnings("rawtypes")
+    static TraitCondition mapArgIncludesKey(final String argName, final String 
key) {
+        return new BuiltInCondition(
+                BuiltInCondition.Kind.MAP_ARG_INCLUDES_KEY,
+                List.of(argName, key),
+                ctx ->
+                        ctx.getScalarArgument(argName, Map.class)
+                                .map(map -> mapKeysContain(map, key))
+                                .orElse(true));
+    }

Review Comment:
   would it make sense to have this defined more generic like
   
   ```suggestion
   static <X> TraitCondition argMatches(final String argName, final Class<X> 
argClass, final Predicate<X> predicate) {
           return new BuiltInCondition(
                   BuiltInCondition.Kind.ARG_MATCHES,
                   List.of(argName, argClass, predicate),
                   ctx ->
                           ctx.getScalarArgument(argName, argClass)
                                   .stream().anyMatch(predicate));
       }
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -184,6 +184,37 @@ public class ToChangelogTestPrograms {
                     .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
                     .build();
 
+    public static final TableTestProgram UPSERT_PARTITION_BY =

Review Comment:
   does it make sense to have a test program without full deletes?



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -369,6 +381,13 @@ When `PARTITION BY` is provided, **the output schema 
changes**. The partition ke
 
 Prefer row semantics, when possible. `PARTITION BY` is only necessary when 
downstream operators are keyed on that column and you want to co-locate rows 
for the same key in the same parallel operator instance.
 
+#### Avoiding ChangelogNormalize for upsert sources
+
+When the input is an upsert source (emits `UPDATE_AFTER` but no 
`UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by 
default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. 
This operator is stateful and can be expensive. When `PARTITION BY` is 
provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit 
the corresponding kinds:

Review Comment:
   ```suggestion
   When a query includes an upsert source input (emits `UPDATE_AFTER` but no 
`UPDATE_BEFORE`), the planner typically inserts a `ChangelogNormalize` operator 
to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. This 
operator is stateful and can be expensive. 
   By wrapping the upsert source input in a properly configured `TO_CHANGELOG` 
function, we can avoid the `ChangelogNormalize` operator. For this, the 
function's table input requires a `PARTITION BY` and an `op_mapping` that does 
not emit the corresponding kinds:
   ```
   
   I would rephase this a bit and make clear that this is in the context of the 
`TO_CHANGELOG` function.



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -369,6 +381,13 @@ When `PARTITION BY` is provided, **the output schema 
changes**. The partition ke
 
 Prefer row semantics, when possible. `PARTITION BY` is only necessary when 
downstream operators are keyed on that column and you want to co-locate rows 
for the same key in the same parallel operator instance.
 
+#### Avoiding ChangelogNormalize for upsert sources
+
+When the input is an upsert source (emits `UPDATE_AFTER` but no 
`UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by 
default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. 
This operator is stateful and can be expensive. When `PARTITION BY` is 
provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit 
the corresponding kinds:

Review Comment:
   I wonder if this argument isn't a bit misleading. We "promise" that users 
can avoid `ChangelogNormatlize` by using `TO_CHANGELOG` but don't explain what 
it really means. 
   They would need to "manually" handle all changes because Flink only sees an 
append-only stream.
   So it is certainly not a silver bullet and only applicable in a few cases 
that allow for manual change handling (which most users won't be able to do, 
IMO). 
   
   So isn't this more like an escape hatch for expert users?
   If so, it should be clearly positioned as such, IMO.



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -369,6 +381,13 @@ When `PARTITION BY` is provided, **the output schema 
changes**. The partition ke
 
 Prefer row semantics, when possible. `PARTITION BY` is only necessary when 
downstream operators are keyed on that column and you want to co-locate rows 
for the same key in the same parallel operator instance.
 
+#### Avoiding ChangelogNormalize for upsert sources
+
+When the input is an upsert source (emits `UPDATE_AFTER` but no 
`UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by 
default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. 
This operator is stateful and can be expensive. When `PARTITION BY` is 
provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit 
the corresponding kinds:
+
+* Omit `UPDATE_BEFORE` from `op_mapping` to skip `UPDATE_BEFORE` 
materialization.
+* If the source emits partial `DELETE` events (only the keys flow through, 
common with Flink's `upsert-kafka` connector or other key-compacted topics), 
it's necessary to omit `DELETE` from `op_mapping` to skip the full-`DELETE` 
materialization step that also happens in `ChangelogNormalize`.

Review Comment:
   isn't it a bit unintuitive to support partial deletes by not declaring a 
`DELETE` key in the map?
   The function would still emit deletes (partial, not full) although they 
aren't configured.
   Or am I misunderstanding something?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java:
##########
@@ -108,6 +118,35 @@ private static Map<RowKind, String> buildOpMap(@Nullable 
final Map<String, Strin
         return result;
     }
 
+    /**
+     * Rejects mappings that reference change operations the input changelog 
cannot produce. Without
+     * this check the extra entries are dead code: the corresponding rows 
never arrive, so the user
+     * gets silently incorrect plans (for upsert input also a wasted {@code 
ChangelogNormalize}).
+     *
+     * <p>Lives here rather than in the input type strategy because {@link
+     * TableSemantics#changelogMode()} returns empty during type inference and 
is only populated at
+     * specialization time, which is when this constructor runs.
+     */
+    private static void validateAgainstInputChangelogMode(
+            final Map<RowKind, String> mapping, final TableSemantics 
tableSemantics) {
+        final ChangelogMode inputMode = 
tableSemantics.changelogMode().orElse(null);
+        if (inputMode == null) {
+            return;
+        }
+        final Set<RowKind> unsupported =
+                mapping.keySet().stream()
+                        .filter(kind -> !inputMode.contains(kind))
+                        .collect(Collectors.toCollection(TreeSet::new));

Review Comment:
   with `anyMatch` we couldn't list all ops that aren't present in the input.
   
   But a `List<RowKind>` would suffice as well (`keySet` ensures no duplicates).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to