This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 63befd5c [Feature][flink-transform] add transform for register user
define function (#1826)
63befd5c is described below
commit 63befd5cb1b14d9c893c28d3d89ac28c3282f532
Author: dijie <[email protected]>
AuthorDate: Tue May 10 18:09:18 2022 +0800
[Feature][flink-transform] add transform for register user define function
(#1826)
* add transform for register user define function
---
docs/en/transform/udf.md | 42 ++++++++
seatunnel-core/seatunnel-core-flink/pom.xml | 6 ++
.../seatunnel-transforms-flink/pom.xml | 1 +
.../{ => seatunnel-transform-flink-udf}/pom.xml | 33 ++++--
.../org/apache/seatunnel/flink/transform/UDF.java | 120 +++++++++++++++++++++
.../org.apache.seatunnel.flink.BaseFlinkTransform | 18 ++++
6 files changed, 210 insertions(+), 10 deletions(-)
diff --git a/docs/en/transform/udf.md b/docs/en/transform/udf.md
new file mode 100644
index 00000000..2f00d068
--- /dev/null
+++ b/docs/en/transform/udf.md
@@ -0,0 +1,42 @@
+# udf
+
+## Description
+
+Supports using UDF in data integration by the transform.
+Need to specify the function name and class name and put UDF jars in Flink's
classpath or import them via 'Flink run -c xxx.jar'
+
+:::tip
+
+This transform **ONLY** supported by Flink.
+
+:::
+
+## Options
+
+| name | type | required | default value |
+| -------------- | ----------- | -------- | ------------- |
+| function | string | yes | - |
+
+### function [string]
+
+A config prefix, use like `function.test="xxx.Test"`.
+
+### common options [string]
+
+Transform plugin common parameters, please refer to [Transform
Plugin](common-options.mdx) for details
+
+## Examples
+
+Use `udf` in sql.
+
+```bash
+ udf {
+ function.test_1 = "com.example.udf.flink.TestUDF"
+ function.test_2 = "com.example.udf.flink.TestUDTF"
+ }
+
+ # Use the specify function (confirm that the fake table exists)
+ sql {
+ sql = "select test_1(name), age from fake"
+ }
+```
diff --git a/seatunnel-core/seatunnel-core-flink/pom.xml
b/seatunnel-core/seatunnel-core-flink/pom.xml
index d985628a..58df75c4 100644
--- a/seatunnel-core/seatunnel-core-flink/pom.xml
+++ b/seatunnel-core/seatunnel-core-flink/pom.xml
@@ -72,6 +72,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transform-flink-udf</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
diff --git a/seatunnel-transforms/seatunnel-transforms-flink/pom.xml
b/seatunnel-transforms/seatunnel-transforms-flink/pom.xml
index c4fabee7..bcf2d36d 100644
--- a/seatunnel-transforms/seatunnel-transforms-flink/pom.xml
+++ b/seatunnel-transforms/seatunnel-transforms-flink/pom.xml
@@ -35,6 +35,7 @@
<module>seatunnel-transform-flink-datastream2table</module>
<module>seatunnel-transform-flink-sql</module>
<module>seatunnel-transform-flink-split</module>
+ <module>seatunnel-transform-flink-udf</module>
</modules>
</project>
diff --git a/seatunnel-transforms/seatunnel-transforms-flink/pom.xml
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/pom.xml
similarity index 58%
copy from seatunnel-transforms/seatunnel-transforms-flink/pom.xml
copy to
seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/pom.xml
index c4fabee7..612a3270 100644
--- a/seatunnel-transforms/seatunnel-transforms-flink/pom.xml
+++
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/pom.xml
@@ -22,19 +22,32 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-transforms</artifactId>
+ <artifactId>seatunnel-transforms-flink</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>seatunnel-transforms-flink</artifactId>
- <packaging>pom</packaging>
+ <artifactId>seatunnel-transform-flink-udf</artifactId>
- <modules>
- <module>seatunnel-transform-flink-table2datastream</module>
- <module>seatunnel-transform-flink-datastream2table</module>
- <module>seatunnel-transform-flink-sql</module>
- <module>seatunnel-transform-flink-split</module>
- </modules>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api-flink</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
-</project>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/java/org/apache/seatunnel/flink/transform/UDF.java
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/java/org/apache/seatunnel/flink/transform/UDF.java
new file mode 100644
index 00000000..7c5d7f2e
--- /dev/null
+++
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/java/org/apache/seatunnel/flink/transform/UDF.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.transform;
+
+import org.apache.seatunnel.common.PropertiesUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchTransform;
+import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@SuppressWarnings("PMD")
+public class UDF implements FlinkStreamTransform, FlinkBatchTransform {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(UDF.class);
+ private static final String UDF_CONFIG_PREFIX = "function.";
+
+ private Config config;
+ private List<String> classNames;
+ private List<String> functionNames;
+
+ @Override
+ public DataSet<Row> processBatch(FlinkEnvironment env, DataSet<Row> data) {
+ return data;
+ }
+
+ @Override
+ public DataStream<Row> processStream(FlinkEnvironment env, DataStream<Row>
dataStream) {
+ return dataStream;
+ }
+
+ @Override
+ public void registerFunction(FlinkEnvironment flinkEnvironment) {
+ TableEnvironment tEnv = flinkEnvironment.isStreaming() ?
+ flinkEnvironment.getStreamTableEnvironment() :
flinkEnvironment.getBatchTableEnvironment();
+
+ for (int i = 0; i < functionNames.size(); i++) {
+ try {
+ tEnv.createTemporarySystemFunction(functionNames.get(i),
(Class<? extends UserDefinedFunction>) Class.forName(classNames.get(i)));
+ } catch (ClassNotFoundException e) {
+ LOGGER.error("The udf class {} not founded, make sure you
enter the correct class name", classNames.get(i));
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public Config getConfig() {
+ return config;
+ }
+
+ @Override
+ public void setConfig(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public CheckResult checkConfig() {
+ hasSubConfig(UDF_CONFIG_PREFIX);
+ return CheckResult.success();
+ }
+
+ @Override
+ public void prepare(FlinkEnvironment prepareEnv) {
+ final Properties properties = new Properties();
+ PropertiesUtil.setProperties(config, properties, UDF_CONFIG_PREFIX,
false);
+
+ classNames = new ArrayList<>(properties.size());
+ functionNames = new ArrayList<>(properties.size());
+
+ properties.forEach((k, v) -> {
+ classNames.add(String.valueOf(k));
+ functionNames.add(String.valueOf(k));
+ });
+ }
+
+ @Override
+ public String getPluginName() {
+ return "udf";
+ }
+
+ private void hasSubConfig(String prefix){
+ for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
+ if (entry.getKey().startsWith(prefix)){
+ return;
+ }
+ }
+ throw new RuntimeException(String.format("No config start with %s!,
please check your transform config!", prefix));
+ }
+}
diff --git
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkTransform
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkTransform
new file mode 100644
index 00000000..e65f8fb5
--- /dev/null
+++
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkTransform
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.flink.transform.UDF