This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 6736fede8 Flink supports udf function (#5067)
6736fede8 is described below
commit 6736fede891e7a5f0dbe6acc426adac2db065a72
Author: ChengJie1053 <[email protected]>
AuthorDate: Fri Jan 12 23:39:26 2024 +0800
Flink supports udf function (#5067)
* Flink supports udf function
* add flink udf hook
* Code optimization
---
.../deployment/ClusterDescriptorAdapter.java | 2 +-
.../client/sql/operation/OperationFactoryImpl.java | 1 +
.../client/sql/operation/impl/DDLOperation.java | 3 +
.../flink/client/sql/parser/SqlCommand.java | 2 +
.../client/sql/parser/SqlCommandParserImpl.java | 3 +
.../flink/client/utils/FlinkUdfUtils.java | 123 +++++++++++++++++++++
.../main/resources/linkis-engineconn.properties | 2 +-
.../executor/FlinkSQLComputationExecutor.scala | 2 +-
.../flink/hook/FlinkJarUdfEngineHook.scala | 107 ++++++++++++++++++
.../launch/FlinkEngineConnLaunchBuilder.scala | 18 ++-
.../engineplugin/flink/LinkisFlinkUdfExample.java | 26 +++++
11 files changed, 285 insertions(+), 4 deletions(-)
diff --git
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
index a5ac10203..594f8dd98 100644
---
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
+++
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
@@ -51,7 +51,7 @@ public abstract class ClusterDescriptorAdapter implements
Closeable {
public static final long CLIENT_REQUEST_TIMEOUT =
FlinkEnvConfiguration.FLINK_CLIENT_REQUEST_TIMEOUT().getValue().toLong();
- protected final ExecutionContext executionContext;
+ public final ExecutionContext executionContext;
// jobId is not null only after job is submitted
private JobID jobId;
protected ApplicationId clusterID;
diff --git
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java
index 4329acb80..9f7d8deef 100644
---
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java
+++
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java
@@ -49,6 +49,7 @@ public class OperationFactoryImpl implements OperationFactory
{
context, call.operands[0],
Boolean.parseBoolean(call.operands[1]));
break;
case CREATE_TABLE:
+ case CREATE_FUNCTION:
case DROP_TABLE:
case ALTER_TABLE:
case CREATE_CATALOG:
diff --git
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java
index 21f608112..ec674d515 100644
---
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java
+++
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java
@@ -59,6 +59,9 @@ public class DDLOperation implements NonJobOperation {
case CREATE_TABLE:
actionMsg = "create a table";
break;
+ case CREATE_FUNCTION:
+ actionMsg = "create a function";
+ break;
case CREATE_DATABASE:
actionMsg = "create a database";
break;
diff --git
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java
index 9f6ef738e..03c18c3bb 100644
---
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java
+++
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java
@@ -46,6 +46,8 @@ public enum SqlCommand {
CREATE_DATABASE,
+ CREATE_FUNCTION,
+
ALTER_DATABASE,
DROP_DATABASE,
diff --git
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java
index f8eb32605..211d899bc 100644
---
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java
+++
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java
@@ -152,6 +152,9 @@ public class SqlCommandParserImpl implements
SqlCommandParser {
} else if (node instanceof SqlCreateDatabase) {
cmd = SqlCommand.CREATE_DATABASE;
operands = new String[] {stmt};
+ } else if (node instanceof SqlCreateFunction) {
+ cmd = SqlCommand.CREATE_FUNCTION;
+ operands = new String[] {stmt};
} else if (node instanceof SqlDropDatabase) {
cmd = SqlCommand.DROP_DATABASE;
operands = new String[] {stmt};
diff --git
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java
new file mode 100644
index 000000000..e299c08f7
--- /dev/null
+++
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.client.utils;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.functions.UserDefinedFunction;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkUdfUtils {
+
+ private static final Logger logger =
LoggerFactory.getLogger(FlinkUdfUtils.class);
+
+ private static final String CREATE_TEMP_FUNCTION_PATTERN =
+ "create\\s+temporary\\s+function\\s+(\\w+)\\s+as\\s+\"(.*?)\"";
+
+ private static final String CREATE_TEMP_FUNCTION_SQL =
+ "CREATE TEMPORARY FUNCTION IF NOT EXISTS %s AS '%s' ";
+
+ public static void addFlinkPipelineClasspaths(StreamExecutionEnvironment
env, String path) {
+ logger.info("Flink udf start add pipeline classpaths, jar path: {}", path);
+
+ try {
+ Field configuration =
StreamExecutionEnvironment.class.getDeclaredField("configuration");
+ configuration.setAccessible(true);
+ Configuration conf = (Configuration) configuration.get(env);
+
+ Field confData = Configuration.class.getDeclaredField("confData");
+ confData.setAccessible(true);
+ Map<String, Object> map = (Map<String, Object>) confData.get(conf);
+ List<String> jarList = new ArrayList<>();
+ List<String> oldList =
+
conf.getOptional(PipelineOptions.CLASSPATHS).orElseGet(Collections::emptyList);
+ if (CollectionUtils.isNotEmpty(oldList)) {
+ jarList.addAll(oldList);
+ }
+ jarList.add(path);
+ map.put(PipelineOptions.CLASSPATHS.key(), jarList);
+ } catch (Exception e) {
+ logger.warn("Flink udf add pipeline classpaths failed", e);
+ }
+ }
+
+ public static void loadJar(String jarPath) {
+ logger.info("Flink udf URLClassLoader start loadJar: {}", jarPath);
+
+ Method method = null;
+ Boolean accessible = null;
+ try {
+ method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
+ accessible = method.isAccessible();
+
+ if (accessible == false) {
+ method.setAccessible(true);
+ }
+ URLClassLoader classLoader = (URLClassLoader)
ClassLoader.getSystemClassLoader();
+ method.invoke(classLoader, new URL(jarPath));
+
+ } catch (Exception e) {
+ logger.warn("Flink udf URLClassLoader loadJar failed", e);
+ } finally {
+ if (accessible != null) {
+ method.setAccessible(accessible);
+ }
+ }
+ }
+
+ public static String extractUdfClass(String statement) {
+ Pattern pattern = Pattern.compile(CREATE_TEMP_FUNCTION_PATTERN);
+ Matcher matcher = pattern.matcher(statement);
+ if (matcher.find() && matcher.groupCount() >= 2) {
+ return matcher.group(2);
+ }
+ return "";
+ }
+
+ public static boolean isFlinkUdf(ClassLoader classLoader, String className) {
+ try {
+ Class<?> udfClass = classLoader.loadClass(className);
+ if (UserDefinedFunction.class.isAssignableFrom(udfClass)) {
+ return true;
+ }
+
+ } catch (ClassNotFoundException e) {
+ logger.warn("flink udf load isFlinkUdf failed, ClassNotFoundException:
{}", className);
+ }
+ return false;
+ }
+
+ public static String generateFlinkUdfSql(String name, String className) {
+ return String.format(CREATE_TEMP_FUNCTION_SQL, name, className);
+ }
+}
diff --git
a/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties
b/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties
index 201202307..587a150ed 100644
---
a/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties
+++
b/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties
@@ -18,5 +18,5 @@ wds.linkis.server.version=v1
wds.linkis.engineconn.debug.enable=true
#wds.linkis.keytab.enable=true
wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineconnplugin.flink.FlinkEngineConnPlugin
-wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook
+wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconnplugin.flink.hook.FlinkJarUdfEngineHook
wds.linkis.engineconn.executor.manager.class=org.apache.linkis.engineconnplugin.flink.executormanager.FlinkExecutorManager
\ No newline at end of file
diff --git
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
index 60f1d9088..f835db969 100644
---
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
+++
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
@@ -80,7 +80,7 @@ class FlinkSQLComputationExecutor(
with FlinkExecutor {
private var operation: JobOperation = _
- private var clusterDescriptor: AbstractSessionClusterDescriptorAdapter = _
+ var clusterDescriptor: AbstractSessionClusterDescriptorAdapter = _
override def init(): Unit = {
setCodeParser(new SQLCodeParser)
diff --git
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala
new file mode 100644
index 000000000..bc3d0f1f4
--- /dev/null
+++
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.hook
+
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.common.engineconn.EngineConn
+import
org.apache.linkis.engineconn.computation.executor.hook.UDFLoadEngineConnHook
+import org.apache.linkis.engineconn.core.executor.ExecutorManager
+import org.apache.linkis.engineconnplugin.flink.client.utils.FlinkUdfUtils
+import
org.apache.linkis.engineconnplugin.flink.executor.FlinkSQLComputationExecutor
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel,
EngineTypeLabel, RunType}
+import org.apache.linkis.udf.utils.ConstantVar
+import org.apache.linkis.udf.vo.UDFInfoVo
+
+import org.apache.commons.lang3.StringUtils
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+class FlinkJarUdfEngineHook extends UDFLoadEngineConnHook {
+ override val udfType: BigInt = ConstantVar.UDF_JAR
+ override val category: String = ConstantVar.UDF
+ override val runType = RunType.SQL
+
+ var labels: Array[Label[_]] = null
+
+ override protected def constructCode(udfInfo: UDFInfoVo): String = {
+ val path: String = udfInfo.getPath
+ val registerFormat: String = udfInfo.getRegisterFormat
+
+ if (StringUtils.isBlank(path) && StringUtils.isBlank(registerFormat)) {
+ logger.warn("Flink udfInfo path or registerFormat cannot is empty")
+ return ""
+ }
+
+ val udfClassName: String = FlinkUdfUtils.extractUdfClass(registerFormat)
+ if (StringUtils.isBlank(udfClassName)) {
+ logger.warn("Flink extract udf class name cannot is empty")
+ return ""
+ }
+
+ FlinkUdfUtils.loadJar(path)
+
+ if (!FlinkUdfUtils.isFlinkUdf(ClassLoader.getSystemClassLoader(),
udfClassName)) {
+ logger.warn(
+ "There is no extends Flink UserDefinedFunction, skip loading flink
udf: {} ",
+ path
+ )
+ return ""
+ }
+
+ val flinkUdfSql: String =
+ FlinkUdfUtils.generateFlinkUdfSql(udfInfo.getUdfName, udfClassName)
+
+ logger.info(
+ s"Flink start load udf, udfName:${udfInfo.getUdfName}, udfJar:${path},
udfClass:${udfClassName}\n"
+ )
+
+ if (labels != null && labels.nonEmpty) {
+ val executor = ExecutorManager.getInstance.getExecutorByLabels(labels)
+ executor match {
+ case computationExecutor: FlinkSQLComputationExecutor =>
+ FlinkUdfUtils.addFlinkPipelineClasspaths(
+
computationExecutor.clusterDescriptor.executionContext.getStreamExecutionEnvironment,
+ path
+ )
+ case _ =>
+ }
+ }
+
+ "%sql\n" + flinkUdfSql
+ }
+
+ override def afterExecutionExecute(
+ engineCreationContext: EngineCreationContext,
+ engineConn: EngineConn
+ ): Unit = {
+ val codeLanguageLabel = new CodeLanguageLabel
+
engineCreationContext.getLabels().asScala.find(_.isInstanceOf[EngineTypeLabel])
match {
+ case Some(engineTypeLabel) =>
+ codeLanguageLabel.setCodeType(
+
getRealRunType(engineTypeLabel.asInstanceOf[EngineTypeLabel].getEngineType).toString
+ )
+ case None =>
+ codeLanguageLabel.setCodeType(runType.toString)
+ }
+ labels = Array[Label[_]](codeLanguageLabel)
+
+ super.afterExecutionExecute(engineCreationContext, engineConn)
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala
index 70b3ad1b2..13a5bae4d 100644
---
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala
+++
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala
@@ -37,12 +37,15 @@ import
org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConsta
addPathToClassPath,
CLASS_PATH_SEPARATOR
}
-import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
+import org.apache.linkis.manager.label.entity.engine.{EngineConnMode,
UserCreatorLabel}
+import org.apache.linkis.manager.label.utils.LabelUtil
import java.util
import scala.collection.JavaConverters._
+import com.google.common.collect.Lists
+
class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {
override protected def getCommands(implicit
@@ -136,4 +139,17 @@ class FlinkEngineConnLaunchBuilder extends
JavaProcessEngineConnLaunchBuilder {
override protected def ifAddHiveConfigPath: Boolean = true
+ override protected def getEngineConnManagerHooks(implicit
+ engineConnBuildRequest: EngineConnBuildRequest
+ ): java.util.List[String] = if (isOnceMode) {
+ super.getEngineConnManagerHooks(engineConnBuildRequest)
+ } else {
+ Lists.newArrayList("JarUDFLoadECMHook")
+ }
+
+ def isOnceMode: Boolean = {
+ val engineConnMode =
LabelUtil.getEngineConnMode(engineConnBuildRequest.labels)
+ EngineConnMode.toEngineConnMode(engineConnMode) == EngineConnMode.Once
+ }
+
}
diff --git
a/linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java
b/linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java
new file mode 100644
index 000000000..ce2b05e69
--- /dev/null
+++
b/linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.flink;
+
+import org.apache.flink.table.functions.ScalarFunction;
+
+public class LinkisFlinkUdfExample extends ScalarFunction {
+ public String eval(String str) {
+ return String.format("linkis flink udf test: %s", str);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]