yuxiqian commented on code in PR #3642:
URL: https://github.com/apache/flink-cdc/pull/3642#discussion_r1799217964
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java:
##########
@@ -85,11 +80,45 @@ public DataStream<Event> translatePostTransform(
}
}
postTransformFunctionBuilder.addTimezone(timezone);
+
+ List<UdfDef> allFunctions = new ArrayList<>(udfFunctions);
+ allFunctions.addAll(convertModelsToUdfs(models));
+
postTransformFunctionBuilder.addUdfFunctions(
- udfFunctions.stream()
- .map(udf -> Tuple2.of(udf.getName(),
udf.getClasspath()))
- .collect(Collectors.toList()));
+
allFunctions.stream().map(this::udfDefToTuple2).collect(Collectors.toList()));
return input.transform(
"Transform:Data", new EventTypeInfo(),
postTransformFunctionBuilder.build());
}
+
+ private List<UdfDef> convertModelsToUdfs(List<ModelDef> models) {
+ return
models.stream().map(this::modelToUdf).collect(Collectors.toList());
+ }
Review Comment:
This conversion is quite worrying, since obviously `ModelDef` carries more
information (the params), and the conversion could not be done without some
hacky approaches like pasting params into the name field.
Modifying the `addUdfFunctions` function, making it accepting a more
universal data structure (instead of `Name - Classpath` `Tuple2`) makes more
sense to me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]