lvyanquan commented on code in PR #3753:
URL: https://github.com/apache/flink-cdc/pull/3753#discussion_r1856106643
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java:
##########
@@ -85,9 +99,27 @@ public DataStream<Event> translatePostTransform(
postTransformFunctionBuilder.addTimezone(timezone);
postTransformFunctionBuilder.addUdfFunctions(
udfFunctions.stream()
- .map(udf -> Tuple2.of(udf.getName(),
udf.getClasspath()))
+ .map(this::udfDefToNameAndClasspathTuple)
+ .collect(Collectors.toList()));
+ postTransformFunctionBuilder.addUdfFunctions(
+ models.stream()
+ .map(this::modelToNameAndClasspathTuple)
.collect(Collectors.toList()));
return input.transform(
"Transform:Data", new EventTypeInfo(),
postTransformFunctionBuilder.build());
}
+
+ private Tuple2<String, String> modelToNameAndClasspathTuple(ModelDef
model) {
+ try {
+ // A tricky way to pass parameters to UDF
+ String serializedParams =
objectMapper.writeValueAsString(model.getParameters());
+ return Tuple2.of(serializedParams, model.getModel());
+ } catch (Exception e) {
+ throw new IllegalArgumentException("ModelDef is illegal, ModelDef
is " + model, e);
+ }
+ }
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]