This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 63befd5c [Feature][flink-transform] add transform for register user 
define function (#1826)
63befd5c is described below

commit 63befd5cb1b14d9c893c28d3d89ac28c3282f532
Author: dijie <[email protected]>
AuthorDate: Tue May 10 18:09:18 2022 +0800

    [Feature][flink-transform] add transform for register user define function 
(#1826)
    
    * add transform for register user define function
---
 docs/en/transform/udf.md                           |  42 ++++++++
 seatunnel-core/seatunnel-core-flink/pom.xml        |   6 ++
 .../seatunnel-transforms-flink/pom.xml             |   1 +
 .../{ => seatunnel-transform-flink-udf}/pom.xml    |  33 ++++--
 .../org/apache/seatunnel/flink/transform/UDF.java  | 120 +++++++++++++++++++++
 .../org.apache.seatunnel.flink.BaseFlinkTransform  |  18 ++++
 6 files changed, 210 insertions(+), 10 deletions(-)

diff --git a/docs/en/transform/udf.md b/docs/en/transform/udf.md
new file mode 100644
index 00000000..2f00d068
--- /dev/null
+++ b/docs/en/transform/udf.md
@@ -0,0 +1,42 @@
+# udf
+
+## Description
+
+Supports using UDF in data integration by the transform.
+Need to specify the function name and class name and put UDF jars in Flink's 
classpath or import them via 'Flink run -c xxx.jar'
+
+:::tip
+
+This transform **ONLY** supported by Flink.
+
+:::
+
+## Options
+
+| name           | type        | required | default value |
+| -------------- | ----------- | -------- | ------------- |
+| function       | string      | yes      | -             |
+
+### function [string]
+
+A config prefix, use like `function.test="xxx.Test"`.
+
+### common options [string]
+
+Transform plugin common parameters, please refer to [Transform 
Plugin](common-options.mdx) for details
+
+## Examples
+
+Use `udf` in sql.
+
+```bash
+  udf {
+    function.test_1 = "com.example.udf.flink.TestUDF"
+    function.test_2 = "com.example.udf.flink.TestUDTF"
+  }
+  
+  # Use the specify function (confirm that the fake table exists)
+  sql {
+    sql = "select test_1(name), age from fake"
+  }
+```
diff --git a/seatunnel-core/seatunnel-core-flink/pom.xml 
b/seatunnel-core/seatunnel-core-flink/pom.xml
index d985628a..58df75c4 100644
--- a/seatunnel-core/seatunnel-core-flink/pom.xml
+++ b/seatunnel-core/seatunnel-core-flink/pom.xml
@@ -72,6 +72,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transform-flink-udf</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
diff --git a/seatunnel-transforms/seatunnel-transforms-flink/pom.xml 
b/seatunnel-transforms/seatunnel-transforms-flink/pom.xml
index c4fabee7..bcf2d36d 100644
--- a/seatunnel-transforms/seatunnel-transforms-flink/pom.xml
+++ b/seatunnel-transforms/seatunnel-transforms-flink/pom.xml
@@ -35,6 +35,7 @@
         <module>seatunnel-transform-flink-datastream2table</module>
         <module>seatunnel-transform-flink-sql</module>
         <module>seatunnel-transform-flink-split</module>
+        <module>seatunnel-transform-flink-udf</module>
     </modules>
 
 </project>
diff --git a/seatunnel-transforms/seatunnel-transforms-flink/pom.xml 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/pom.xml
similarity index 58%
copy from seatunnel-transforms/seatunnel-transforms-flink/pom.xml
copy to 
seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/pom.xml
index c4fabee7..612a3270 100644
--- a/seatunnel-transforms/seatunnel-transforms-flink/pom.xml
+++ 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/pom.xml
@@ -22,19 +22,32 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-transforms</artifactId>
+        <artifactId>seatunnel-transforms-flink</artifactId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-transforms-flink</artifactId>
-    <packaging>pom</packaging>
+    <artifactId>seatunnel-transform-flink-udf</artifactId>
 
-    <modules>
-        <module>seatunnel-transform-flink-table2datastream</module>
-        <module>seatunnel-transform-flink-datastream2table</module>
-        <module>seatunnel-transform-flink-sql</module>
-        <module>seatunnel-transform-flink-split</module>
-    </modules>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api-flink</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
 
-</project>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/java/org/apache/seatunnel/flink/transform/UDF.java
 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/java/org/apache/seatunnel/flink/transform/UDF.java
new file mode 100644
index 00000000..7c5d7f2e
--- /dev/null
+++ 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/java/org/apache/seatunnel/flink/transform/UDF.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.transform;
+
+import org.apache.seatunnel.common.PropertiesUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchTransform;
+import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@SuppressWarnings("PMD")
+public class UDF implements FlinkStreamTransform, FlinkBatchTransform {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(UDF.class);
+    private static final String UDF_CONFIG_PREFIX = "function.";
+
+    private Config config;
+    private List<String> classNames;
+    private List<String> functionNames;
+
+    @Override
+    public DataSet<Row> processBatch(FlinkEnvironment env, DataSet<Row> data) {
+        return data;
+    }
+
+    @Override
+    public DataStream<Row> processStream(FlinkEnvironment env, DataStream<Row> 
dataStream) {
+        return dataStream;
+    }
+
+    @Override
+    public void registerFunction(FlinkEnvironment flinkEnvironment) {
+        TableEnvironment tEnv = flinkEnvironment.isStreaming() ?
+                flinkEnvironment.getStreamTableEnvironment() : 
flinkEnvironment.getBatchTableEnvironment();
+
+        for (int i = 0; i < functionNames.size(); i++) {
+            try {
+                tEnv.createTemporarySystemFunction(functionNames.get(i), 
(Class<? extends UserDefinedFunction>) Class.forName(classNames.get(i)));
+            } catch (ClassNotFoundException e) {
+                LOGGER.error("The udf class {} not founded, make sure you 
enter the correct class name", classNames.get(i));
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        hasSubConfig(UDF_CONFIG_PREFIX);
+        return CheckResult.success();
+    }
+
+    @Override
+    public void prepare(FlinkEnvironment prepareEnv) {
+        final Properties properties = new Properties();
+        PropertiesUtil.setProperties(config, properties, UDF_CONFIG_PREFIX, 
false);
+
+        classNames = new ArrayList<>(properties.size());
+        functionNames = new ArrayList<>(properties.size());
+
+        properties.forEach((k, v) -> {
+            classNames.add(String.valueOf(k));
+            functionNames.add(String.valueOf(k));
+        });
+    }
+
+    @Override
+    public String getPluginName() {
+        return "udf";
+    }
+
+    private void hasSubConfig(String prefix){
+        for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
+            if (entry.getKey().startsWith(prefix)){
+                return;
+            }
+        }
+        throw new RuntimeException(String.format("No config start with %s!, 
please check your transform config!", prefix));
+    }
+}
diff --git 
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkTransform
 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkTransform
new file mode 100644
index 00000000..e65f8fb5
--- /dev/null
+++ 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkTransform
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.flink.transform.UDF

Reply via email to