This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d0010dee [KYUUBI #4938][FLINK] Implement Kyuubi UDF in Flink engine
8d0010dee is described below

commit 8d0010dee079c06b5fca87da4cbf13cdd7429c7d
Author: Paul Lin <[email protected]>
AuthorDate: Mon Jul 10 12:29:55 2023 +0800

    [KYUUBI #4938][FLINK] Implement Kyuubi UDF in Flink engine
    
    ### _Why are the changes needed?_
    As titled.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    Closes #5014 from link3280/KYUUBI-4938.
    
    Closes #4938
    
    c43d480f9 [Paul Lin] [KYUUBI #4938][FLINK] Update function description
    0dd991f03 [Paul Lin] [KYUUBI #4938][FLINK] Fix compatibility problems with 
Flink 1.16
    7e6a3b184 [Paul Lin] [KYUUBI #4938][FLINK] Fix inconsistent istant in 
engine names
    6ecde4c60 [Paul Lin] [KYUUBI #4938][FLINK] Implement Kyuubi UDF in Flink 
engine
    
    Authored-by: Paul Lin <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/engine/flink/FlinkSQLEngine.scala       |  53 +++++---
 .../engine/flink/session/FlinkSessionImpl.scala    |   5 +-
 .../kyuubi/engine/flink/udf/KDFRegistry.scala      | 150 +++++++++++++++++++++
 .../engine/flink}/udf/KyuubiDefinedFunction.scala  |   6 +-
 .../flink/operation/FlinkOperationLocalSuite.scala |  29 +++-
 .../operation/FlinkOperationOnYarnSuite.scala      |  29 +++-
 .../engine/spark/udf/KyuubiDefinedFunction.scala   |   2 +-
 7 files changed, 247 insertions(+), 27 deletions(-)

diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index 48a354b0f..99a9ee56b 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -31,6 +31,7 @@ import 
org.apache.flink.table.gateway.service.context.DefaultContext
 import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, 
FLINK_ENGINE_SHUTDOWN_PRIORITY}
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, 
KYUUBI_SESSION_USER_KEY}
 import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, 
currentEngine}
 import org.apache.kyuubi.service.Serverable
 import org.apache.kyuubi.util.SignalRegister
@@ -92,26 +93,7 @@ object FlinkSQLEngine extends Logging {
       flinkConf.addAll(Configuration.fromMap(flinkConfFromArgs.asJava))
 
       val executionTarget = flinkConf.getString(DeploymentOptions.TARGET)
-      // set cluster name for per-job and application mode
-      executionTarget match {
-        case "yarn-per-job" | "yarn-application" =>
-          if (!flinkConf.containsKey("yarn.application.name")) {
-            val appName = s"kyuubi_${user}_flink_${Instant.now}"
-            flinkConf.setString("yarn.application.name", appName)
-          }
-          if (flinkConf.containsKey("high-availability.cluster-id")) {
-            flinkConf.setString(
-              "yarn.application.id",
-              flinkConf.toMap.get("high-availability.cluster-id"))
-          }
-        case "kubernetes-application" =>
-          if (!flinkConf.containsKey("kubernetes.cluster-id")) {
-            val appName = s"kyuubi-${user}-flink-${Instant.now}"
-            flinkConf.setString("kubernetes.cluster-id", appName)
-          }
-        case other =>
-          debug(s"Skip generating app name for execution target $other")
-      }
+      setDeploymentConf(executionTarget, flinkConf)
 
       kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
 
@@ -153,4 +135,35 @@ object FlinkSQLEngine extends Logging {
     res.await()
     info("Initial Flink SQL finished.")
   }
+
+  private def setDeploymentConf(executionTarget: String, flinkConf: 
Configuration): Unit = {
+    // forward kyuubi engine variables to flink configuration
+    val instant = Instant.now
+    val engineName = s"kyuubi_${user}_flink_$instant"
+    flinkConf.setString(KYUUBI_ENGINE_NAME, engineName)
+
+    kyuubiConf.getOption(KYUUBI_SESSION_USER_KEY).foreach(user =>
+      flinkConf.setString(KYUUBI_SESSION_USER_KEY, user))
+
+    // set cluster name for per-job and application mode
+    executionTarget match {
+      case "yarn-per-job" | "yarn-application" =>
+        if (!flinkConf.containsKey("yarn.application.name")) {
+          val appName = engineName
+          flinkConf.setString("yarn.application.name", appName)
+        }
+        if (flinkConf.containsKey("high-availability.cluster-id")) {
+          flinkConf.setString(
+            "yarn.application.id",
+            flinkConf.toMap.get("high-availability.cluster-id"))
+        }
+      case "kubernetes-application" =>
+        if (!flinkConf.containsKey("kubernetes.cluster-id")) {
+          val appName = s"kyuubi-${user}-flink-$instant"
+          flinkConf.setString("kubernetes.cluster-id", appName)
+        }
+      case other =>
+        debug(s"Skip generating app name for execution target $other")
+    }
+  }
 }
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index 10a48f1a1..b8d1f8569 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -30,6 +30,7 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, 
TGetInfoValue, TProtoco
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.flink.FlinkEngineUtils
+import org.apache.kyuubi.engine.flink.udf.KDFRegistry
 import org.apache.kyuubi.session.{AbstractSession, SessionHandle, 
SessionManager, USE_CATALOG, USE_DATABASE}
 
 class FlinkSessionImpl(
@@ -46,10 +47,12 @@ class FlinkSessionImpl(
     conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID)
       
.getOrElse(SessionHandle.fromUUID(fSession.getSessionHandle.getIdentifier.toString))
 
-  lazy val sessionContext: SessionContext = {
+  val sessionContext: SessionContext = {
     FlinkEngineUtils.getSessionContext(fSession)
   }
 
+  KDFRegistry.registerAll(sessionContext)
+
   private def setModifiableConfig(key: String, value: String): Unit = {
     try {
       sessionContext.set(key, value)
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala
new file mode 100644
index 000000000..b6729cff3
--- /dev/null
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.udf
+
+import java.util
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.{ScalarFunction, UserDefinedFunction}
+import org.apache.flink.table.gateway.service.context.SessionContext
+
+import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, 
KYUUBI_SESSION_USER_KEY}
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils
+import org.apache.kyuubi.util.reflect.DynMethods
+
+object KDFRegistry {
+
+  def createKyuubiDefinedFunctions(sessionContext: SessionContext): 
Array[KyuubiDefinedFunction] = {
+
+    val kyuubiDefinedFunctions = new ArrayBuffer[KyuubiDefinedFunction]
+
+    val flinkConfigMap: util.Map[String, String] = {
+      if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+        DynMethods
+          .builder("getConfigMap")
+          .impl(classOf[SessionContext])
+          .build()
+          .invoke(sessionContext)
+          .asInstanceOf[util.Map[String, String]]
+      } else {
+        DynMethods
+          .builder("getSessionConf")
+          .impl(classOf[SessionContext])
+          .build()
+          .invoke(sessionContext)
+          .asInstanceOf[Configuration]
+          .toMap
+      }
+    }
+
+    val kyuubi_version: KyuubiDefinedFunction = create(
+      "kyuubi_version",
+      new KyuubiVersionFunction(flinkConfigMap),
+      "Return the version of Kyuubi Server",
+      "string",
+      "1.8.0")
+    kyuubiDefinedFunctions += kyuubi_version
+
+    val engineName: KyuubiDefinedFunction = create(
+      "kyuubi_engine_name",
+      new EngineNameFunction(flinkConfigMap),
+      "Return the application name for the associated query engine",
+      "string",
+      "1.8.0")
+    kyuubiDefinedFunctions += engineName
+
+    val engineId: KyuubiDefinedFunction = create(
+      "kyuubi_engine_id",
+      new EngineIdFunction(flinkConfigMap),
+      "Return the application id for the associated query engine",
+      "string",
+      "1.8.0")
+    kyuubiDefinedFunctions += engineId
+
+    val systemUser: KyuubiDefinedFunction = create(
+      "kyuubi_system_user",
+      new SystemUserFunction(flinkConfigMap),
+      "Return the system user name for the associated query engine",
+      "string",
+      "1.8.0")
+    kyuubiDefinedFunctions += systemUser
+
+    val sessionUser: KyuubiDefinedFunction = create(
+      "kyuubi_session_user",
+      new SessionUserFunction(flinkConfigMap),
+      "Return the session username for the associated query engine",
+      "string",
+      "1.8.0")
+    kyuubiDefinedFunctions += sessionUser
+
+    kyuubiDefinedFunctions.toArray
+  }
+
+  def create(
+      name: String,
+      udf: UserDefinedFunction,
+      description: String,
+      returnType: String,
+      since: String): KyuubiDefinedFunction = {
+    val kdf = KyuubiDefinedFunction(name, udf, description, returnType, since)
+    kdf
+  }
+
+  def registerAll(sessionContext: SessionContext): Unit = {
+    val functions = createKyuubiDefinedFunctions(sessionContext)
+    for (func <- functions) {
+      sessionContext.getSessionState.functionCatalog
+        .registerTemporarySystemFunction(func.name, func.udf, true)
+    }
+  }
+}
+
+class KyuubiVersionFunction(confMap: util.Map[String, String]) extends 
ScalarFunction {
+  def eval(): String = KYUUBI_VERSION
+}
+
+class EngineNameFunction(confMap: util.Map[String, String]) extends 
ScalarFunction {
+  def eval(): String = {
+    confMap match {
+      case m if m.containsKey("yarn.application.name") => 
m.get("yarn.application.name")
+      case m if m.containsKey("kubernetes.cluster-id") => 
m.get("kubernetes.cluster-id")
+      case m => m.getOrDefault(KYUUBI_ENGINE_NAME, "unknown-engine-name")
+    }
+  }
+}
+
+class EngineIdFunction(confMap: util.Map[String, String]) extends 
ScalarFunction {
+  def eval(): String = {
+    confMap match {
+      case m if m.containsKey("yarn.application.id") => 
m.get("yarn.application.id")
+      case m if m.containsKey("kubernetes.cluster-id") => 
m.get("kubernetes.cluster-id")
+      case m => m.getOrDefault("high-availability.cluster-id", 
"unknown-engine-id")
+    }
+  }
+}
+
+class SystemUserFunction(confMap: util.Map[String, String]) extends 
ScalarFunction {
+  def eval(): String = Utils.currentUser
+}
+
+class SessionUserFunction(confMap: util.Map[String, String]) extends 
ScalarFunction {
+  def eval(): String = confMap.getOrDefault(KYUUBI_SESSION_USER_KEY, 
"unknown-user")
+}
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KyuubiDefinedFunction.scala
similarity index 87%
copy from 
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala
copy to 
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KyuubiDefinedFunction.scala
index 30228bf72..5cfce86d6 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KyuubiDefinedFunction.scala
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.engine.spark.udf
+package org.apache.kyuubi.engine.flink.udf
 
-import org.apache.spark.sql.expressions.UserDefinedFunction
+import org.apache.flink.table.functions.UserDefinedFunction
 
 /**
- * A wrapper for Spark' [[UserDefinedFunction]]
+ * A wrapper for Flink's [[UserDefinedFunction]]
  *
  * @param name function name
  * @param udf user-defined function
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
index 0f4b38d36..b8f6768cc 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
@@ -19,7 +19,9 @@ package org.apache.kyuubi.engine.flink.operation
 
 import java.util.UUID
 
+import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
 import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, 
WithFlinkSQLEngineLocal}
 import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, 
HA_NAMESPACE}
@@ -33,11 +35,13 @@ class FlinkOperationLocalSuite extends FlinkOperationSuite
   override def withKyuubiConf: Map[String, String] = {
     Map(
       "flink.execution.target" -> "remote",
+      "flink.high-availability.cluster-id" -> "flink-mini-cluster",
       HA_NAMESPACE.key -> namespace,
       HA_ENGINE_REF_ID.key -> engineRefId,
       ENGINE_TYPE.key -> "FLINK_SQL",
       ENGINE_SHARE_LEVEL.key -> shareLevel,
-      OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name) ++ testExtraConf
+      OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name,
+      KYUUBI_SESSION_USER_KEY -> "paullin") ++ testExtraConf
   }
 
   override protected def engineRefId: String = UUID.randomUUID().toString
@@ -48,4 +52,27 @@ class FlinkOperationLocalSuite extends FlinkOperationSuite
 
   def engineType: String = "flink"
 
+  test("execute statement - kyuubi defined functions") {
+    withJdbcStatement() { statement =>
+      var resultSet = statement.executeQuery("select kyuubi_version() as 
kyuubi_version")
+      assert(resultSet.next())
+      assert(resultSet.getString(1) === KYUUBI_VERSION)
+
+      resultSet = statement.executeQuery("select kyuubi_engine_name() as 
engine_name")
+      assert(resultSet.next())
+      
assert(resultSet.getString(1).startsWith(s"kyuubi_${Utils.currentUser}_flink"))
+
+      resultSet = statement.executeQuery("select kyuubi_engine_id() as 
engine_id")
+      assert(resultSet.next())
+      assert(resultSet.getString(1) === "flink-mini-cluster")
+
+      resultSet = statement.executeQuery("select kyuubi_system_user() as 
`system_user`")
+      assert(resultSet.next())
+      assert(resultSet.getString(1) === Utils.currentUser)
+
+      resultSet = statement.executeQuery("select kyuubi_session_user() as 
`session_user`")
+      assert(resultSet.next())
+      assert(resultSet.getString(1) === "paullin")
+    }
+  }
 }
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
index 931d500f7..25e23d82a 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
@@ -19,7 +19,9 @@ package org.apache.kyuubi.engine.flink.operation
 
 import java.util.UUID
 
+import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
 import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SHARE_LEVEL, ENGINE_TYPE}
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, 
WithFlinkSQLEngineOnYarn}
 import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, 
HA_NAMESPACE}
@@ -34,7 +36,8 @@ class FlinkOperationOnYarnSuite extends FlinkOperationSuite
       HA_NAMESPACE.key -> namespace,
       HA_ENGINE_REF_ID.key -> engineRefId,
       ENGINE_TYPE.key -> "FLINK_SQL",
-      ENGINE_SHARE_LEVEL.key -> shareLevel) ++ testExtraConf
+      ENGINE_SHARE_LEVEL.key -> shareLevel,
+      KYUUBI_SESSION_USER_KEY -> "paullin") ++ testExtraConf
   }
 
   override protected def engineRefId: String = UUID.randomUUID().toString
@@ -44,4 +47,28 @@ class FlinkOperationOnYarnSuite extends FlinkOperationSuite
   def shareLevel: String = ShareLevel.USER.toString
 
   def engineType: String = "flink"
+
+  test("execute statement - kyuubi defined functions") {
+    withJdbcStatement() { statement =>
+      var resultSet = statement.executeQuery("select kyuubi_version() as 
kyuubi_version")
+      assert(resultSet.next())
+      assert(resultSet.getString(1) === KYUUBI_VERSION)
+
+      resultSet = statement.executeQuery("select kyuubi_engine_name() as 
engine_name")
+      assert(resultSet.next())
+      
assert(resultSet.getString(1).startsWith(s"kyuubi_${Utils.currentUser}_flink"))
+
+      resultSet = statement.executeQuery("select kyuubi_engine_id() as 
engine_id")
+      assert(resultSet.next())
+      assert(resultSet.getString(1).startsWith("application_"))
+
+      resultSet = statement.executeQuery("select kyuubi_system_user() as 
`system_user`")
+      assert(resultSet.next())
+      assert(resultSet.getString(1) === Utils.currentUser)
+
+      resultSet = statement.executeQuery("select kyuubi_session_user() as 
`session_user`")
+      assert(resultSet.next())
+      assert(resultSet.getString(1) === "paullin")
+    }
+  }
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala
index 30228bf72..6bc2e3ddb 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark.udf
 import org.apache.spark.sql.expressions.UserDefinedFunction
 
 /**
- * A wrapper for Spark' [[UserDefinedFunction]]
+ * A wrapper for Spark's [[UserDefinedFunction]]
  *
  * @param name function name
  * @param udf user-defined function

Reply via email to