yuxiqian commented on code in PR #3753:
URL: https://github.com/apache/flink-cdc/pull/3753#discussion_r1855695505


##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java:
##########
@@ -85,9 +99,27 @@ public DataStream<Event> translatePostTransform(
         postTransformFunctionBuilder.addTimezone(timezone);
         postTransformFunctionBuilder.addUdfFunctions(
                 udfFunctions.stream()
-                        .map(udf -> Tuple2.of(udf.getName(), 
udf.getClasspath()))
+                        .map(this::udfDefToNameAndClasspathTuple)
+                        .collect(Collectors.toList()));
+        postTransformFunctionBuilder.addUdfFunctions(
+                models.stream()
+                        .map(this::modelToNameAndClasspathTuple)
                         .collect(Collectors.toList()));
         return input.transform(
                 "Transform:Data", new EventTypeInfo(), 
postTransformFunctionBuilder.build());
     }
+
+    private Tuple2<String, String> modelToNameAndClasspathTuple(ModelDef 
model) {
+        try {
+            // A tricky way to pass parameters to UDF
+            String serializedParams = 
objectMapper.writeValueAsString(model.getParameters());
+            return Tuple2.of(serializedParams, model.getModel());
+        } catch (Exception e) {
+            throw new IllegalArgumentException("ModelDef is illegal, ModelDef 
is " + model, e);
+        }
+    }

Review Comment:
   What about widening the data structure (`Tuple2<String, String>`) passed 
into Transform node? Such hacking here seems unnecessary since we don't need to 
guarantee backwards compatibility of internal APIs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to