gustavodemorais commented on code in PR #28199:
URL: https://github.com/apache/flink/pull/28199#discussion_r3273315091
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java:
##########
@@ -108,6 +118,35 @@ private static Map<RowKind, String> buildOpMap(@Nullable
final Map<String, Strin
return result;
}
+ /**
+ * Rejects mappings that reference change operations the input changelog
cannot produce. Without
+ * this check the extra entries are dead code: the corresponding rows
never arrive, so the user
+ * gets silently incorrect plans (for upsert input also a wasted {@code
ChangelogNormalize}).
+ *
+ * <p>Lives here rather than in the input type strategy because {@link
+ * TableSemantics#changelogMode()} returns empty during type inference and
is only populated at
+ * specialization time, which is when this constructor runs.
+ */
+ private static void validateAgainstInputChangelogMode(
+ final Map<RowKind, String> mapping, final TableSemantics
tableSemantics) {
+ final ChangelogMode inputMode =
tableSemantics.changelogMode().orElse(null);
+ if (inputMode == null) {
+ return;
+ }
+ final Set<RowKind> unsupported =
+ mapping.keySet().stream()
+ .filter(kind -> !inputMode.contains(kind))
+ .collect(Collectors.toCollection(TreeSet::new));
Review Comment:
Changed to `final List<RowKind> unsupported =
mapping.keySet().stream().filter(kind ->
!inputMode.contains(kind)).toList()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]