raminqaf commented on code in PR #28235:
URL: https://github.com/apache/flink/pull/28235#discussion_r3311145532
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java:
##########
@@ -187,12 +198,63 @@ private static Optional<List<DataType>>
validateOpMappingKeys(
return Optional.empty();
}
- private static String resolveOpColumnName(final CallContext callContext) {
- return callContext
- .getArgumentValue(1, ColumnList.class)
- .filter(cl -> !cl.getNames().isEmpty())
- .map(cl -> cl.getNames().get(0))
- .orElse(DEFAULT_OP_COLUMN_NAME);
+ @SuppressWarnings("rawtypes")
+ private static Optional<List<DataType>> validateProducesFullDeletes(
+ final CallContext callContext, final boolean throwOnFailure) {
+ final boolean isExplicit =
!callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES);
+ if (!isExplicit) {
+ return Optional.empty();
+ }
+ if (!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) {
+ return callContext.fail(
+ throwOnFailure,
+ "The 'produces_full_deletes' argument must be a constant
BOOLEAN literal.");
+ }
+ final boolean producesFullDeletes =
+ callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES,
Boolean.class).orElse(true);
+ if (!producesFullDeletes) {
+ return Optional.empty();
+ }
+ // The check against the input changelog mode lives in the function
constructor since
+ // TableSemantics#changelogMode() returns empty here at type-inference
time. The mapping
+ // check below only needs the literal op_mapping argument, so it lives
here. Only runs
+ // when the user explicitly set produces_full_deletes=true; the
default true is not
+ // validated since it is a safe no-op for any input.
+ final Optional<Map> opMapping =
callContext.getArgumentValue(ARG_OP_MAPPING, Map.class);
+ if (opMapping.isPresent() && !mapsDelete((Map<String, String>)
opMapping.get())) {
+ return callContext.fail(
+ throwOnFailure,
+ "Invalid 'produces_full_deletes' for TO_CHANGELOG: the
active 'op_mapping' "
+ + "does not map DELETE rows, so no DELETE rows are
emitted. Remove "
+ + "the 'produces_full_deletes' argument or add a
DELETE entry to "
+ + "'op_mapping'.");
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Returns {@code true} when at least one {@code op_mapping} key
references {@code DELETE}. Keys
+ * may be comma-separated (e.g., {@code "INSERT, DELETE"}) per the
user-facing contract.
+ */
+ private static boolean mapsDelete(final Map<String, String> opMapping) {
+ for (final String key : opMapping.keySet()) {
+ for (final String rawName : key.split(",")) {
+ if (DELETE.equals(rawName.trim())) {
Review Comment:
added
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java:
##########
Review Comment:
added
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]