raminqaf commented on code in PR #28235:
URL: https://github.com/apache/flink/pull/28235#discussion_r3311145532


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java:
##########
@@ -187,12 +198,63 @@ private static Optional<List<DataType>> 
validateOpMappingKeys(
         return Optional.empty();
     }
 
-    private static String resolveOpColumnName(final CallContext callContext) {
-        return callContext
-                .getArgumentValue(1, ColumnList.class)
-                .filter(cl -> !cl.getNames().isEmpty())
-                .map(cl -> cl.getNames().get(0))
-                .orElse(DEFAULT_OP_COLUMN_NAME);
+    @SuppressWarnings("rawtypes")
+    private static Optional<List<DataType>> validateProducesFullDeletes(
+            final CallContext callContext, final boolean throwOnFailure) {
+        final boolean isExplicit = 
!callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES);
+        if (!isExplicit) {
+            return Optional.empty();
+        }
+        if (!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "The 'produces_full_deletes' argument must be a constant 
BOOLEAN literal.");
+        }
+        final boolean producesFullDeletes =
+                callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES, 
Boolean.class).orElse(true);
+        if (!producesFullDeletes) {
+            return Optional.empty();
+        }
+        // The check against the input changelog mode lives in the function 
constructor since
+        // TableSemantics#changelogMode() returns empty here at type-inference 
time. The mapping
+        // check below only needs the literal op_mapping argument, so it lives 
here. Only runs
+        // when the user explicitly set produces_full_deletes=true; the 
default true is not
+        // validated since it is a safe no-op for any input.
+        final Optional<Map> opMapping = 
callContext.getArgumentValue(ARG_OP_MAPPING, Map.class);
+        if (opMapping.isPresent() && !mapsDelete((Map<String, String>) 
opMapping.get())) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "Invalid 'produces_full_deletes' for TO_CHANGELOG: the 
active 'op_mapping' "
+                            + "does not map DELETE rows, so no DELETE rows are 
emitted. Remove "
+                            + "the 'produces_full_deletes' argument or add a 
DELETE entry to "
+                            + "'op_mapping'.");
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Returns {@code true} when at least one {@code op_mapping} key 
references {@code DELETE}. Keys
+     * may be comma-separated (e.g., {@code "INSERT, DELETE"}) per the 
user-facing contract.
+     */
+    private static boolean mapsDelete(final Map<String, String> opMapping) {
+        for (final String key : opMapping.keySet()) {
+            for (final String rawName : key.split(",")) {
+                if (DELETE.equals(rawName.trim())) {

Review Comment:
   added



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java:
##########


Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to