gustavodemorais commented on code in PR #28199:
URL: https://github.com/apache/flink/pull/28199#discussion_r3273315091


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java:
##########
@@ -108,6 +118,35 @@ private static Map<RowKind, String> buildOpMap(@Nullable 
final Map<String, Strin
         return result;
     }
 
+    /**
+     * Rejects mappings that reference change operations the input changelog 
cannot produce. Without
+     * this check the extra entries are dead code: the corresponding rows 
never arrive, so the user
+     * gets silently incorrect plans (for upsert input also a wasted {@code 
ChangelogNormalize}).
+     *
+     * <p>Lives here rather than in the input type strategy because {@link
+     * TableSemantics#changelogMode()} returns empty during type inference and 
is only populated at
+     * specialization time, which is when this constructor runs.
+     */
+    private static void validateAgainstInputChangelogMode(
+            final Map<RowKind, String> mapping, final TableSemantics 
tableSemantics) {
+        final ChangelogMode inputMode = 
tableSemantics.changelogMode().orElse(null);
+        if (inputMode == null) {
+            return;
+        }
+        final Set<RowKind> unsupported =
+                mapping.keySet().stream()
+                        .filter(kind -> !inputMode.contains(kind))
+                        .collect(Collectors.toCollection(TreeSet::new));

Review Comment:
   Changed to `final List<RowKind> unsupported =
                   mapping.keySet().stream().filter(kind -> 
!inputMode.contains(kind)).toList()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to