ruanhang1993 commented on code in PR #3465:
URL: https://github.com/apache/flink-cdc/pull/3465#discussion_r1708813058


##########
docs/content.zh/docs/core-concept/transform.md:
##########
@@ -266,6 +266,75 @@ transform:
     description: classification mapping example
 ```
 
+## User-defined Functions
+
+User-defined functions (UDFs) can be used in transform rules.
+
+Classes could be used as a UDF if:
+
+* implements `org.apache.flink.cdc.common.udf.UserDefinedFunction` interface
+* has a public constructor with no parameters
+* has at least one public method named `eval`
+
+It may also:
+
+* overrides `getReturnType` method to indicate its return CDC type
+* overrides `open` and `close` method to do some initialization and cleanup 
work
+
+For example, this is a valid UDF class:
+
+```java
+public class AddOneFunctionClass implements UserDefinedFunction {
+    
+    public Object eval(Integer num) {
+        return num + 1;
+    }
+    
+    @Override
+    public DataType getReturnType() {
+        return DataTypes.INT();
+    }
+    
+    @Override
+    public void open() throws Exception {
+        // ...
+    }
+
+    @Override
+    public void close() throws Exception {
+        // ...
+    }
+}
+```
+
+To ease the migration from Flink SQL to Flink CDC, a Flink `ScalarFunction` 
could also be used as a transform UDF, with some limitations:
+
+* `ScalarFunction` with parameters is not supported.

Review Comment:
   `ScalarFunction` which has a constructor with parameters is not supported.



##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java:
##########
@@ -199,6 +211,23 @@ private RouteDef toRouteDef(JsonNode routeNode) {
         return new RouteDef(sourceTable, sinkTable, replaceSymbol, 
description);
     }
 
+    private UdfDef toUdfDef(JsonNode udfNode) {
+        String functionName =
+                checkNotNull(
+                                udfNode.get(UDF_FUNCTION_NAME_KEY),
+                                "Missing required field \"%s\" in UDF 
configuration",
+                                UDF_FUNCTION_NAME_KEY)
+                        .asText();
+        String classPath =

Review Comment:
   ```suggestion
           String classpath =
   ```



##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java:
##########
@@ -96,6 +98,11 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
         int parallelism = 
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
         env.getConfig().setParallelism(parallelism);
 
+        List<Tuple2<String, String>> udfFunctions =
+                pipelineDef.getUdfs().stream()
+                        .map(udf -> Tuple2.of(udf.getName(), 
udf.getClassPath()))
+                        .collect(Collectors.toList());

Review Comment:
   This translation should be placed in the `transformTranslator`.



##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java:
##########
@@ -106,7 +113,9 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
 
         // Build TransformSchemaOperator for processing Schema Event
         TransformTranslator transformTranslator = new TransformTranslator();
-        stream = transformTranslator.translateSchema(stream, 
pipelineDef.getTransforms());
+        stream =
+                transformTranslator.translateSchema(
+                        stream, pipelineDef.getTransforms(), udfFunctions);

Review Comment:
   ```suggestion
                           stream, pipelineDef.getTransforms(), 
pipelineDef.getUdfs());
   ```



##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java:
##########
@@ -125,7 +134,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
                         stream,
                         pipelineDef.getTransforms(),
                         schemaOperatorIDGenerator.generate(),
-                        
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
+                        
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
+                        udfFunctions);

Review Comment:
   ```suggestion
                           pipelineDef.getUdfs());
   ```



##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.definition;
+
+import java.util.Objects;
+
+/**
+ * Definition of a transformation.

Review Comment:
   ```suggestion
    * Definition of a user-defined function.
   ```



##########
flink-cdc-pipeline-udf-examples/pom.xml:
##########
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-cdc</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>flink-cdc-pipeline-udf-examples</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <scala.plugin.version>4.9.2</scala.plugin.version>
+        <compiler.encoding>UTF-8</compiler.encoding>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-cdc-common</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <!-- other plugins ... -->
+
+            <!-- set scala plugin before compiler plugin -->
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>${scala.plugin.version}</version>
+                <configuration>
+                    <recompileMode>incremental</recompileMode>
+                    <javacArgs>
+                        <javacArg>-Xlint:unchecked</javacArg>
+                        <javacArg>-Xlint:deprecation</javacArg>
+                        <javacArg>-encoding</javacArg>
+                        <javacArg>${compiler.encoding}</javacArg>
+                    </javacArgs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>compile</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+        </plugins>
+    </build>
+</project>

Review Comment:
   Add an empty line.



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/udf/UserDefinedFunction.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.udf;
+
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.types.DataType;
+
+/**
+ * Base interface for creating a UDF in transform projection and filtering 
expressions. You should
+ * define at least one {@code eval} method.
+ */
+@PublicEvolving
+public interface UserDefinedFunction {
+    default DataType getReturnType() {
+        return null;
+    }

Review Comment:
   ```suggestion
       default Optional<DataType> getReturnType() {
           return Optional.empty();
       }
   ```



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/UserDefinedFunctionDescriptor.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Descriptor of a UDF function. */
+@Internal
+public class UserDefinedFunctionDescriptor implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String name;
+    private final String classPath;

Review Comment:
   ```suggestion
       private final String classpath;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to