yuxiqian commented on code in PR #3753:
URL: https://github.com/apache/flink-cdc/pull/3753#discussion_r1855695505
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java:
##########
@@ -85,9 +99,27 @@ public DataStream<Event> translatePostTransform(
postTransformFunctionBuilder.addTimezone(timezone);
postTransformFunctionBuilder.addUdfFunctions(
udfFunctions.stream()
- .map(udf -> Tuple2.of(udf.getName(),
udf.getClasspath()))
+ .map(this::udfDefToNameAndClasspathTuple)
+ .collect(Collectors.toList()));
+ postTransformFunctionBuilder.addUdfFunctions(
+ models.stream()
+ .map(this::modelToNameAndClasspathTuple)
.collect(Collectors.toList()));
return input.transform(
"Transform:Data", new EventTypeInfo(),
postTransformFunctionBuilder.build());
}
+
+ private Tuple2<String, String> modelToNameAndClasspathTuple(ModelDef
model) {
+ try {
+ // A tricky way to pass parameters to UDF
+ String serializedParams =
objectMapper.writeValueAsString(model.getParameters());
+ return Tuple2.of(serializedParams, model.getModel());
+ } catch (Exception e) {
+ throw new IllegalArgumentException("ModelDef is illegal, ModelDef
is " + model, e);
+ }
+ }
Review Comment:
What about widening the data structure (`Tuple2<String, String>`) passed
into Transform node? Such hacking here seems unnecessary since we don't need to
guarantee backwards compatibility of internal APIs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]