This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6736fede8 Flink supports udf function (#5067)
6736fede8 is described below

commit 6736fede891e7a5f0dbe6acc426adac2db065a72
Author: ChengJie1053 <[email protected]>
AuthorDate: Fri Jan 12 23:39:26 2024 +0800

    Flink supports udf function (#5067)
    
    * Flink supports udf function
    
    * add flink udf hook
    
    * Code optimization
---
 .../deployment/ClusterDescriptorAdapter.java       |   2 +-
 .../client/sql/operation/OperationFactoryImpl.java |   1 +
 .../client/sql/operation/impl/DDLOperation.java    |   3 +
 .../flink/client/sql/parser/SqlCommand.java        |   2 +
 .../client/sql/parser/SqlCommandParserImpl.java    |   3 +
 .../flink/client/utils/FlinkUdfUtils.java          | 123 +++++++++++++++++++++
 .../main/resources/linkis-engineconn.properties    |   2 +-
 .../executor/FlinkSQLComputationExecutor.scala     |   2 +-
 .../flink/hook/FlinkJarUdfEngineHook.scala         | 107 ++++++++++++++++++
 .../launch/FlinkEngineConnLaunchBuilder.scala      |  18 ++-
 .../engineplugin/flink/LinkisFlinkUdfExample.java  |  26 +++++
 11 files changed, 285 insertions(+), 4 deletions(-)

diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
index a5ac10203..594f8dd98 100644
--- 
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
@@ -51,7 +51,7 @@ public abstract class ClusterDescriptorAdapter implements 
Closeable {
   public static final long CLIENT_REQUEST_TIMEOUT =
       FlinkEnvConfiguration.FLINK_CLIENT_REQUEST_TIMEOUT().getValue().toLong();
 
-  protected final ExecutionContext executionContext;
+  public final ExecutionContext executionContext;
   // jobId is not null only after job is submitted
   private JobID jobId;
   protected ApplicationId clusterID;
diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java
index 4329acb80..9f7d8deef 100644
--- 
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java
@@ -49,6 +49,7 @@ public class OperationFactoryImpl implements OperationFactory 
{
                 context, call.operands[0], 
Boolean.parseBoolean(call.operands[1]));
         break;
       case CREATE_TABLE:
+      case CREATE_FUNCTION:
       case DROP_TABLE:
       case ALTER_TABLE:
       case CREATE_CATALOG:
diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java
index 21f608112..ec674d515 100644
--- 
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java
@@ -59,6 +59,9 @@ public class DDLOperation implements NonJobOperation {
       case CREATE_TABLE:
         actionMsg = "create a table";
         break;
+      case CREATE_FUNCTION:
+        actionMsg = "create a function";
+        break;
       case CREATE_DATABASE:
         actionMsg = "create a database";
         break;
diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java
index 9f6ef738e..03c18c3bb 100644
--- 
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java
@@ -46,6 +46,8 @@ public enum SqlCommand {
 
   CREATE_DATABASE,
 
+  CREATE_FUNCTION,
+
   ALTER_DATABASE,
 
   DROP_DATABASE,
diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java
index f8eb32605..211d899bc 100644
--- 
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java
@@ -152,6 +152,9 @@ public class SqlCommandParserImpl implements 
SqlCommandParser {
     } else if (node instanceof SqlCreateDatabase) {
       cmd = SqlCommand.CREATE_DATABASE;
       operands = new String[] {stmt};
+    } else if (node instanceof SqlCreateFunction) {
+      cmd = SqlCommand.CREATE_FUNCTION;
+      operands = new String[] {stmt};
     } else if (node instanceof SqlDropDatabase) {
       cmd = SqlCommand.DROP_DATABASE;
       operands = new String[] {stmt};
diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java
new file mode 100644
index 000000000..e299c08f7
--- /dev/null
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.client.utils;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.functions.UserDefinedFunction;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkUdfUtils {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(FlinkUdfUtils.class);
+
+  private static final String CREATE_TEMP_FUNCTION_PATTERN =
+      "create\\s+temporary\\s+function\\s+(\\w+)\\s+as\\s+\"(.*?)\"";
+
+  private static final String CREATE_TEMP_FUNCTION_SQL =
+      "CREATE TEMPORARY FUNCTION IF NOT EXISTS %s AS '%s' ";
+
+  public static void addFlinkPipelineClasspaths(StreamExecutionEnvironment 
env, String path) {
+    logger.info("Flink udf start add pipeline classpaths, jar path: {}", path);
+
+    try {
+      Field configuration = 
StreamExecutionEnvironment.class.getDeclaredField("configuration");
+      configuration.setAccessible(true);
+      Configuration conf = (Configuration) configuration.get(env);
+
+      Field confData = Configuration.class.getDeclaredField("confData");
+      confData.setAccessible(true);
+      Map<String, Object> map = (Map<String, Object>) confData.get(conf);
+      List<String> jarList = new ArrayList<>();
+      List<String> oldList =
+          
conf.getOptional(PipelineOptions.CLASSPATHS).orElseGet(Collections::emptyList);
+      if (CollectionUtils.isNotEmpty(oldList)) {
+        jarList.addAll(oldList);
+      }
+      jarList.add(path);
+      map.put(PipelineOptions.CLASSPATHS.key(), jarList);
+    } catch (Exception e) {
+      logger.warn("Flink udf add pipeline classpaths failed", e);
+    }
+  }
+
+  public static void loadJar(String jarPath) {
+    logger.info("Flink udf URLClassLoader start loadJar: {}", jarPath);
+
+    Method method = null;
+    Boolean accessible = null;
+    try {
+      method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
+      accessible = method.isAccessible();
+
+      if (accessible == false) {
+        method.setAccessible(true);
+      }
+      URLClassLoader classLoader = (URLClassLoader) 
ClassLoader.getSystemClassLoader();
+      method.invoke(classLoader, new URL(jarPath));
+
+    } catch (Exception e) {
+      logger.warn("Flink udf URLClassLoader loadJar failed", e);
+    } finally {
+      if (accessible != null) {
+        method.setAccessible(accessible);
+      }
+    }
+  }
+
+  public static String extractUdfClass(String statement) {
+    Pattern pattern = Pattern.compile(CREATE_TEMP_FUNCTION_PATTERN);
+    Matcher matcher = pattern.matcher(statement);
+    if (matcher.find() && matcher.groupCount() >= 2) {
+      return matcher.group(2);
+    }
+    return "";
+  }
+
+  public static boolean isFlinkUdf(ClassLoader classLoader, String className) {
+    try {
+      Class<?> udfClass = classLoader.loadClass(className);
+      if (UserDefinedFunction.class.isAssignableFrom(udfClass)) {
+        return true;
+      }
+
+    } catch (ClassNotFoundException e) {
+      logger.warn("flink udf load isFlinkUdf failed, ClassNotFoundException: 
{}", className);
+    }
+    return false;
+  }
+
+  public static String generateFlinkUdfSql(String name, String className) {
+    return String.format(CREATE_TEMP_FUNCTION_SQL, name, className);
+  }
+}
diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties
index 201202307..587a150ed 100644
--- 
a/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties
@@ -18,5 +18,5 @@ wds.linkis.server.version=v1
 wds.linkis.engineconn.debug.enable=true
 #wds.linkis.keytab.enable=true
 
wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineconnplugin.flink.FlinkEngineConnPlugin
-wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook
+wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconnplugin.flink.hook.FlinkJarUdfEngineHook
 
wds.linkis.engineconn.executor.manager.class=org.apache.linkis.engineconnplugin.flink.executormanager.FlinkExecutorManager
\ No newline at end of file
diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
index 60f1d9088..f835db969 100644
--- 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
@@ -80,7 +80,7 @@ class FlinkSQLComputationExecutor(
     with FlinkExecutor {
 
   private var operation: JobOperation = _
-  private var clusterDescriptor: AbstractSessionClusterDescriptorAdapter = _
+  var clusterDescriptor: AbstractSessionClusterDescriptorAdapter = _
 
   override def init(): Unit = {
     setCodeParser(new SQLCodeParser)
diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala
new file mode 100644
index 000000000..bc3d0f1f4
--- /dev/null
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.hook
+
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.common.engineconn.EngineConn
+import 
org.apache.linkis.engineconn.computation.executor.hook.UDFLoadEngineConnHook
+import org.apache.linkis.engineconn.core.executor.ExecutorManager
+import org.apache.linkis.engineconnplugin.flink.client.utils.FlinkUdfUtils
+import 
org.apache.linkis.engineconnplugin.flink.executor.FlinkSQLComputationExecutor
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, 
EngineTypeLabel, RunType}
+import org.apache.linkis.udf.utils.ConstantVar
+import org.apache.linkis.udf.vo.UDFInfoVo
+
+import org.apache.commons.lang3.StringUtils
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+class FlinkJarUdfEngineHook extends UDFLoadEngineConnHook {
+  override val udfType: BigInt = ConstantVar.UDF_JAR
+  override val category: String = ConstantVar.UDF
+  override val runType = RunType.SQL
+
+  var labels: Array[Label[_]] = null
+
+  override protected def constructCode(udfInfo: UDFInfoVo): String = {
+    val path: String = udfInfo.getPath
+    val registerFormat: String = udfInfo.getRegisterFormat
+
+    if (StringUtils.isBlank(path) && StringUtils.isBlank(registerFormat)) {
+      logger.warn("Flink udfInfo path or registerFormat cannot is empty")
+      return ""
+    }
+
+    val udfClassName: String = FlinkUdfUtils.extractUdfClass(registerFormat)
+    if (StringUtils.isBlank(udfClassName)) {
+      logger.warn("Flink extract udf class name cannot is empty")
+      return ""
+    }
+
+    FlinkUdfUtils.loadJar(path)
+
+    if (!FlinkUdfUtils.isFlinkUdf(ClassLoader.getSystemClassLoader(), 
udfClassName)) {
+      logger.warn(
+        "There is no extends Flink UserDefinedFunction, skip loading flink 
udf: {} ",
+        path
+      )
+      return ""
+    }
+
+    val flinkUdfSql: String =
+      FlinkUdfUtils.generateFlinkUdfSql(udfInfo.getUdfName, udfClassName)
+
+    logger.info(
+      s"Flink start load udf, udfName:${udfInfo.getUdfName}, udfJar:${path}, 
udfClass:${udfClassName}\n"
+    )
+
+    if (labels != null && labels.nonEmpty) {
+      val executor = ExecutorManager.getInstance.getExecutorByLabels(labels)
+      executor match {
+        case computationExecutor: FlinkSQLComputationExecutor =>
+          FlinkUdfUtils.addFlinkPipelineClasspaths(
+            
computationExecutor.clusterDescriptor.executionContext.getStreamExecutionEnvironment,
+            path
+          )
+        case _ =>
+      }
+    }
+
+    "%sql\n" + flinkUdfSql
+  }
+
+  override def afterExecutionExecute(
+      engineCreationContext: EngineCreationContext,
+      engineConn: EngineConn
+  ): Unit = {
+    val codeLanguageLabel = new CodeLanguageLabel
+    
engineCreationContext.getLabels().asScala.find(_.isInstanceOf[EngineTypeLabel]) 
match {
+      case Some(engineTypeLabel) =>
+        codeLanguageLabel.setCodeType(
+          
getRealRunType(engineTypeLabel.asInstanceOf[EngineTypeLabel].getEngineType).toString
+        )
+      case None =>
+        codeLanguageLabel.setCodeType(runType.toString)
+    }
+    labels = Array[Label[_]](codeLanguageLabel)
+
+    super.afterExecutionExecute(engineCreationContext, engineConn)
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala
index 70b3ad1b2..13a5bae4d 100644
--- 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala
@@ -37,12 +37,15 @@ import 
org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConsta
   addPathToClassPath,
   CLASS_PATH_SEPARATOR
 }
-import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
+import org.apache.linkis.manager.label.entity.engine.{EngineConnMode, 
UserCreatorLabel}
+import org.apache.linkis.manager.label.utils.LabelUtil
 
 import java.util
 
 import scala.collection.JavaConverters._
 
+import com.google.common.collect.Lists
+
 class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {
 
   override protected def getCommands(implicit
@@ -136,4 +139,17 @@ class FlinkEngineConnLaunchBuilder extends 
JavaProcessEngineConnLaunchBuilder {
 
   override protected def ifAddHiveConfigPath: Boolean = true
 
+  override protected def getEngineConnManagerHooks(implicit
+      engineConnBuildRequest: EngineConnBuildRequest
+  ): java.util.List[String] = if (isOnceMode) {
+    super.getEngineConnManagerHooks(engineConnBuildRequest)
+  } else {
+    Lists.newArrayList("JarUDFLoadECMHook")
+  }
+
+  def isOnceMode: Boolean = {
+    val engineConnMode = 
LabelUtil.getEngineConnMode(engineConnBuildRequest.labels)
+    EngineConnMode.toEngineConnMode(engineConnMode) == EngineConnMode.Once
+  }
+
 }
diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java
 
b/linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java
new file mode 100644
index 000000000..ce2b05e69
--- /dev/null
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.flink;
+
+import org.apache.flink.table.functions.ScalarFunction;
+
+public class LinkisFlinkUdfExample extends ScalarFunction {
+  public String eval(String str) {
+    return String.format("linkis flink udf test: %s", str);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to