This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 8d0010dee [KYUUBI #4938][FLINK] Implement Kyuubi UDF in Flink engine
8d0010dee is described below
commit 8d0010dee079c06b5fca87da4cbf13cdd7429c7d
Author: Paul Lin <[email protected]>
AuthorDate: Mon Jul 10 12:29:55 2023 +0800
[KYUUBI #4938][FLINK] Implement Kyuubi UDF in Flink engine
### _Why are the changes needed?_
As titled.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #5014 from link3280/KYUUBI-4938.
Closes #4938
c43d480f9 [Paul Lin] [KYUUBI #4938][FLINK] Update function description
0dd991f03 [Paul Lin] [KYUUBI #4938][FLINK] Fix compatibility problems with
Flink 1.16
7e6a3b184 [Paul Lin] [KYUUBI #4938][FLINK] Fix inconsistent istant in
engine names
6ecde4c60 [Paul Lin] [KYUUBI #4938][FLINK] Implement Kyuubi UDF in Flink
engine
Authored-by: Paul Lin <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/engine/flink/FlinkSQLEngine.scala | 53 +++++---
.../engine/flink/session/FlinkSessionImpl.scala | 5 +-
.../kyuubi/engine/flink/udf/KDFRegistry.scala | 150 +++++++++++++++++++++
.../engine/flink}/udf/KyuubiDefinedFunction.scala | 6 +-
.../flink/operation/FlinkOperationLocalSuite.scala | 29 +++-
.../operation/FlinkOperationOnYarnSuite.scala | 29 +++-
.../engine/spark/udf/KyuubiDefinedFunction.scala | 2 +-
7 files changed, 247 insertions(+), 27 deletions(-)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index 48a354b0f..99a9ee56b 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -31,6 +31,7 @@ import
org.apache.flink.table.gateway.service.context.DefaultContext
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.Utils.{addShutdownHook, currentUser,
FLINK_ENGINE_SHUTDOWN_PRIORITY}
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME,
KYUUBI_SESSION_USER_KEY}
import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch,
currentEngine}
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.SignalRegister
@@ -92,26 +93,7 @@ object FlinkSQLEngine extends Logging {
flinkConf.addAll(Configuration.fromMap(flinkConfFromArgs.asJava))
val executionTarget = flinkConf.getString(DeploymentOptions.TARGET)
- // set cluster name for per-job and application mode
- executionTarget match {
- case "yarn-per-job" | "yarn-application" =>
- if (!flinkConf.containsKey("yarn.application.name")) {
- val appName = s"kyuubi_${user}_flink_${Instant.now}"
- flinkConf.setString("yarn.application.name", appName)
- }
- if (flinkConf.containsKey("high-availability.cluster-id")) {
- flinkConf.setString(
- "yarn.application.id",
- flinkConf.toMap.get("high-availability.cluster-id"))
- }
- case "kubernetes-application" =>
- if (!flinkConf.containsKey("kubernetes.cluster-id")) {
- val appName = s"kyuubi-${user}-flink-${Instant.now}"
- flinkConf.setString("kubernetes.cluster-id", appName)
- }
- case other =>
- debug(s"Skip generating app name for execution target $other")
- }
+ setDeploymentConf(executionTarget, flinkConf)
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
@@ -153,4 +135,35 @@ object FlinkSQLEngine extends Logging {
res.await()
info("Initial Flink SQL finished.")
}
+
+ private def setDeploymentConf(executionTarget: String, flinkConf:
Configuration): Unit = {
+ // forward kyuubi engine variables to flink configuration
+ val instant = Instant.now
+ val engineName = s"kyuubi_${user}_flink_$instant"
+ flinkConf.setString(KYUUBI_ENGINE_NAME, engineName)
+
+ kyuubiConf.getOption(KYUUBI_SESSION_USER_KEY).foreach(user =>
+ flinkConf.setString(KYUUBI_SESSION_USER_KEY, user))
+
+ // set cluster name for per-job and application mode
+ executionTarget match {
+ case "yarn-per-job" | "yarn-application" =>
+ if (!flinkConf.containsKey("yarn.application.name")) {
+ val appName = engineName
+ flinkConf.setString("yarn.application.name", appName)
+ }
+ if (flinkConf.containsKey("high-availability.cluster-id")) {
+ flinkConf.setString(
+ "yarn.application.id",
+ flinkConf.toMap.get("high-availability.cluster-id"))
+ }
+ case "kubernetes-application" =>
+ if (!flinkConf.containsKey("kubernetes.cluster-id")) {
+ val appName = s"kyuubi-${user}-flink-$instant"
+ flinkConf.setString("kubernetes.cluster-id", appName)
+ }
+ case other =>
+ debug(s"Skip generating app name for execution target $other")
+ }
+ }
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index 10a48f1a1..b8d1f8569 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -30,6 +30,7 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType,
TGetInfoValue, TProtoco
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
+import org.apache.kyuubi.engine.flink.udf.KDFRegistry
import org.apache.kyuubi.session.{AbstractSession, SessionHandle,
SessionManager, USE_CATALOG, USE_DATABASE}
class FlinkSessionImpl(
@@ -46,10 +47,12 @@ class FlinkSessionImpl(
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID)
.getOrElse(SessionHandle.fromUUID(fSession.getSessionHandle.getIdentifier.toString))
- lazy val sessionContext: SessionContext = {
+ val sessionContext: SessionContext = {
FlinkEngineUtils.getSessionContext(fSession)
}
+ KDFRegistry.registerAll(sessionContext)
+
private def setModifiableConfig(key: String, value: String): Unit = {
try {
sessionContext.set(key, value)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala
new file mode 100644
index 000000000..b6729cff3
--- /dev/null
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.udf
+
+import java.util
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.{ScalarFunction, UserDefinedFunction}
+import org.apache.flink.table.gateway.service.context.SessionContext
+
+import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME,
KYUUBI_SESSION_USER_KEY}
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils
+import org.apache.kyuubi.util.reflect.DynMethods
+
+object KDFRegistry {
+
+ def createKyuubiDefinedFunctions(sessionContext: SessionContext):
Array[KyuubiDefinedFunction] = {
+
+ val kyuubiDefinedFunctions = new ArrayBuffer[KyuubiDefinedFunction]
+
+ val flinkConfigMap: util.Map[String, String] = {
+ if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+ DynMethods
+ .builder("getConfigMap")
+ .impl(classOf[SessionContext])
+ .build()
+ .invoke(sessionContext)
+ .asInstanceOf[util.Map[String, String]]
+ } else {
+ DynMethods
+ .builder("getSessionConf")
+ .impl(classOf[SessionContext])
+ .build()
+ .invoke(sessionContext)
+ .asInstanceOf[Configuration]
+ .toMap
+ }
+ }
+
+ val kyuubi_version: KyuubiDefinedFunction = create(
+ "kyuubi_version",
+ new KyuubiVersionFunction(flinkConfigMap),
+ "Return the version of Kyuubi Server",
+ "string",
+ "1.8.0")
+ kyuubiDefinedFunctions += kyuubi_version
+
+ val engineName: KyuubiDefinedFunction = create(
+ "kyuubi_engine_name",
+ new EngineNameFunction(flinkConfigMap),
+ "Return the application name for the associated query engine",
+ "string",
+ "1.8.0")
+ kyuubiDefinedFunctions += engineName
+
+ val engineId: KyuubiDefinedFunction = create(
+ "kyuubi_engine_id",
+ new EngineIdFunction(flinkConfigMap),
+ "Return the application id for the associated query engine",
+ "string",
+ "1.8.0")
+ kyuubiDefinedFunctions += engineId
+
+ val systemUser: KyuubiDefinedFunction = create(
+ "kyuubi_system_user",
+ new SystemUserFunction(flinkConfigMap),
+ "Return the system user name for the associated query engine",
+ "string",
+ "1.8.0")
+ kyuubiDefinedFunctions += systemUser
+
+ val sessionUser: KyuubiDefinedFunction = create(
+ "kyuubi_session_user",
+ new SessionUserFunction(flinkConfigMap),
+ "Return the session username for the associated query engine",
+ "string",
+ "1.8.0")
+ kyuubiDefinedFunctions += sessionUser
+
+ kyuubiDefinedFunctions.toArray
+ }
+
+ def create(
+ name: String,
+ udf: UserDefinedFunction,
+ description: String,
+ returnType: String,
+ since: String): KyuubiDefinedFunction = {
+ val kdf = KyuubiDefinedFunction(name, udf, description, returnType, since)
+ kdf
+ }
+
+ def registerAll(sessionContext: SessionContext): Unit = {
+ val functions = createKyuubiDefinedFunctions(sessionContext)
+ for (func <- functions) {
+ sessionContext.getSessionState.functionCatalog
+ .registerTemporarySystemFunction(func.name, func.udf, true)
+ }
+ }
+}
+
+class KyuubiVersionFunction(confMap: util.Map[String, String]) extends
ScalarFunction {
+ def eval(): String = KYUUBI_VERSION
+}
+
+class EngineNameFunction(confMap: util.Map[String, String]) extends
ScalarFunction {
+ def eval(): String = {
+ confMap match {
+ case m if m.containsKey("yarn.application.name") =>
m.get("yarn.application.name")
+ case m if m.containsKey("kubernetes.cluster-id") =>
m.get("kubernetes.cluster-id")
+ case m => m.getOrDefault(KYUUBI_ENGINE_NAME, "unknown-engine-name")
+ }
+ }
+}
+
+class EngineIdFunction(confMap: util.Map[String, String]) extends
ScalarFunction {
+ def eval(): String = {
+ confMap match {
+ case m if m.containsKey("yarn.application.id") =>
m.get("yarn.application.id")
+ case m if m.containsKey("kubernetes.cluster-id") =>
m.get("kubernetes.cluster-id")
+ case m => m.getOrDefault("high-availability.cluster-id",
"unknown-engine-id")
+ }
+ }
+}
+
+class SystemUserFunction(confMap: util.Map[String, String]) extends
ScalarFunction {
+ def eval(): String = Utils.currentUser
+}
+
+class SessionUserFunction(confMap: util.Map[String, String]) extends
ScalarFunction {
+ def eval(): String = confMap.getOrDefault(KYUUBI_SESSION_USER_KEY,
"unknown-user")
+}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KyuubiDefinedFunction.scala
similarity index 87%
copy from
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala
copy to
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KyuubiDefinedFunction.scala
index 30228bf72..5cfce86d6 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KyuubiDefinedFunction.scala
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.kyuubi.engine.spark.udf
+package org.apache.kyuubi.engine.flink.udf
-import org.apache.spark.sql.expressions.UserDefinedFunction
+import org.apache.flink.table.functions.UserDefinedFunction
/**
- * A wrapper for Spark' [[UserDefinedFunction]]
+ * A wrapper for Flink's [[UserDefinedFunction]]
*
* @param name function name
* @param udf user-defined function
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
index 0f4b38d36..b8f6768cc 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
@@ -19,7 +19,9 @@ package org.apache.kyuubi.engine.flink.operation
import java.util.UUID
+import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine,
WithFlinkSQLEngineLocal}
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID,
HA_NAMESPACE}
@@ -33,11 +35,13 @@ class FlinkOperationLocalSuite extends FlinkOperationSuite
override def withKyuubiConf: Map[String, String] = {
Map(
"flink.execution.target" -> "remote",
+ "flink.high-availability.cluster-id" -> "flink-mini-cluster",
HA_NAMESPACE.key -> namespace,
HA_ENGINE_REF_ID.key -> engineRefId,
ENGINE_TYPE.key -> "FLINK_SQL",
ENGINE_SHARE_LEVEL.key -> shareLevel,
- OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name) ++ testExtraConf
+ OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name,
+ KYUUBI_SESSION_USER_KEY -> "paullin") ++ testExtraConf
}
override protected def engineRefId: String = UUID.randomUUID().toString
@@ -48,4 +52,27 @@ class FlinkOperationLocalSuite extends FlinkOperationSuite
def engineType: String = "flink"
+ test("execute statement - kyuubi defined functions") {
+ withJdbcStatement() { statement =>
+ var resultSet = statement.executeQuery("select kyuubi_version() as
kyuubi_version")
+ assert(resultSet.next())
+ assert(resultSet.getString(1) === KYUUBI_VERSION)
+
+ resultSet = statement.executeQuery("select kyuubi_engine_name() as
engine_name")
+ assert(resultSet.next())
+
assert(resultSet.getString(1).startsWith(s"kyuubi_${Utils.currentUser}_flink"))
+
+ resultSet = statement.executeQuery("select kyuubi_engine_id() as
engine_id")
+ assert(resultSet.next())
+ assert(resultSet.getString(1) === "flink-mini-cluster")
+
+ resultSet = statement.executeQuery("select kyuubi_system_user() as
`system_user`")
+ assert(resultSet.next())
+ assert(resultSet.getString(1) === Utils.currentUser)
+
+ resultSet = statement.executeQuery("select kyuubi_session_user() as
`session_user`")
+ assert(resultSet.next())
+ assert(resultSet.getString(1) === "paullin")
+ }
+ }
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
index 931d500f7..25e23d82a 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
@@ -19,7 +19,9 @@ package org.apache.kyuubi.engine.flink.operation
import java.util.UUID
+import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SHARE_LEVEL, ENGINE_TYPE}
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine,
WithFlinkSQLEngineOnYarn}
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID,
HA_NAMESPACE}
@@ -34,7 +36,8 @@ class FlinkOperationOnYarnSuite extends FlinkOperationSuite
HA_NAMESPACE.key -> namespace,
HA_ENGINE_REF_ID.key -> engineRefId,
ENGINE_TYPE.key -> "FLINK_SQL",
- ENGINE_SHARE_LEVEL.key -> shareLevel) ++ testExtraConf
+ ENGINE_SHARE_LEVEL.key -> shareLevel,
+ KYUUBI_SESSION_USER_KEY -> "paullin") ++ testExtraConf
}
override protected def engineRefId: String = UUID.randomUUID().toString
@@ -44,4 +47,28 @@ class FlinkOperationOnYarnSuite extends FlinkOperationSuite
def shareLevel: String = ShareLevel.USER.toString
def engineType: String = "flink"
+
+ test("execute statement - kyuubi defined functions") {
+ withJdbcStatement() { statement =>
+ var resultSet = statement.executeQuery("select kyuubi_version() as
kyuubi_version")
+ assert(resultSet.next())
+ assert(resultSet.getString(1) === KYUUBI_VERSION)
+
+ resultSet = statement.executeQuery("select kyuubi_engine_name() as
engine_name")
+ assert(resultSet.next())
+
assert(resultSet.getString(1).startsWith(s"kyuubi_${Utils.currentUser}_flink"))
+
+ resultSet = statement.executeQuery("select kyuubi_engine_id() as
engine_id")
+ assert(resultSet.next())
+ assert(resultSet.getString(1).startsWith("application_"))
+
+ resultSet = statement.executeQuery("select kyuubi_system_user() as
`system_user`")
+ assert(resultSet.next())
+ assert(resultSet.getString(1) === Utils.currentUser)
+
+ resultSet = statement.executeQuery("select kyuubi_session_user() as
`session_user`")
+ assert(resultSet.next())
+ assert(resultSet.getString(1) === "paullin")
+ }
+ }
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala
index 30228bf72..6bc2e3ddb 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark.udf
import org.apache.spark.sql.expressions.UserDefinedFunction
/**
- * A wrapper for Spark' [[UserDefinedFunction]]
+ * A wrapper for Spark's [[UserDefinedFunction]]
*
* @param name function name
* @param udf user-defined function