ruanhang1993 commented on code in PR #3753:
URL: https://github.com/apache/flink-cdc/pull/3753#discussion_r1857703283
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java:
##########
@@ -84,10 +92,18 @@ public DataStream<Event> translatePostTransform(
}
postTransformFunctionBuilder.addTimezone(timezone);
postTransformFunctionBuilder.addUdfFunctions(
- udfFunctions.stream()
- .map(udf -> Tuple2.of(udf.getName(),
udf.getClasspath()))
- .collect(Collectors.toList()));
+
udfFunctions.stream().map(this::udfDefToUDFTuple).collect(Collectors.toList()));
+ postTransformFunctionBuilder.addUdfFunctions(
+
models.stream().map(this::modelToUDFTuple).collect(Collectors.toList()));
return input.transform(
"Transform:Data", new EventTypeInfo(),
postTransformFunctionBuilder.build());
}
+
+ private Tuple3<String, String, Map<String, String>>
modelToUDFTuple(ModelDef model) {
+ return Tuple3.of(model.getModelName(), model.getClassName(),
model.getParameters());
Review Comment:
Add classpath prefix here to reuse the logic in UDFDescriptior.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/UserDefinedFunctionDescriptor.java:
##########
@@ -39,12 +41,29 @@ public class UserDefinedFunctionDescriptor implements
Serializable {
private final DataType returnTypeHint;
private final boolean isCdcPipelineUdf;
+ private final Map<String, String> parameters;
+
+ /** Package of built-in model. */
+ public static final String PREFIX_CLASSPATH_BUILT_IN_MODEL =
+ "org.apache.flink.cdc.runtime.model.";
+
public UserDefinedFunctionDescriptor(String name, String classpath) {
+ this(name, classpath, new HashMap<>());
+ }
+
+ public UserDefinedFunctionDescriptor(
+ String name, String classpath, Map<String, String> parameters) {
+ if (classpath.contains(".")) {
Review Comment:
Is it right to use `.` to decide whether it is an AI model?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]