This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 64dd50876 [KYUUBI #4940] Implement Kyuubi UDF in Hive engine
64dd50876 is described below
commit 64dd50876f2e1535f65f69e48486c6fc1dea2dc5
Author: senmiaoliu <[email protected]>
AuthorDate: Tue Aug 1 17:07:07 2023 +0800
[KYUUBI #4940] Implement Kyuubi UDF in Hive engine
### _Why are the changes needed?_
close #4940
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #5110 from lsm1/features/kyuubi_4940.
Closes #4940
6c0a9a37f [senmiaoliu] add kdf for hive engine
Authored-by: senmiaoliu <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../apache/kyuubi/engine/hive/HiveSQLEngine.scala | 4 +
.../engine/hive/operation/HiveOperation.scala | 3 +
.../engine/hive/session/HiveSessionImpl.scala | 2 +
.../kyuubi/engine/hive/udf/KDFRegistry.scala | 169 +++++++++++++++++++++
.../engine/hive/udf/KyuubiDefinedFunction.scala | 34 +++++
.../engine/hive/operation/HiveOperationSuite.scala | 18 ++-
.../KyuubiOperationHiveEnginePerUserSuite.scala | 17 +++
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 1 +
.../kyuubi/engine/hive/HiveProcessBuilder.scala | 5 +-
9 files changed, 251 insertions(+), 2 deletions(-)
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
index 839da710e..3cc426c43 100644
---
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.engine.hive
import java.security.PrivilegedExceptionAction
+import java.time.Instant
import scala.util.control.NonFatal
@@ -65,6 +66,7 @@ object HiveSQLEngine extends Logging {
var currentEngine: Option[HiveSQLEngine] = None
val hiveConf = new HiveConf()
val kyuubiConf = new KyuubiConf()
+ val user = UserGroupInformation.getCurrentUser.getShortUserName
def startEngine(): Unit = {
try {
@@ -97,6 +99,8 @@ object HiveSQLEngine extends Logging {
}
val engine = new HiveSQLEngine()
+ val appName = s"kyuubi_${user}_hive_${Instant.now}"
+ hiveConf.setIfUnset("hive.engine.name", appName)
info(s"Starting ${engine.getName}")
engine.initialize(kyuubiConf)
EventBus.post(HiveEngineEvent(engine))
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
index 2df4a072a..c7166356d 100644
---
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
@@ -24,6 +24,7 @@ import org.apache.hive.service.cli.session.{HiveSession,
SessionManager => HiveS
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.hive.session.HiveSessionImpl
import org.apache.kyuubi.operation.{AbstractOperation, FetchOrientation,
OperationState, OperationStatus}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@@ -43,12 +44,14 @@ abstract class HiveOperation(session: Session) extends
AbstractOperation(session
override def beforeRun(): Unit = {
setState(OperationState.RUNNING)
+ hive.getHiveConf.set(KYUUBI_SESSION_USER_KEY, session.user)
}
override def afterRun(): Unit = {
withLockRequired {
if (!isTerminalState(state)) {
setState(OperationState.FINISHED)
+ hive.getHiveConf.unset(KYUUBI_SESSION_USER_KEY)
}
}
}
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
index 3b85f94df..5069b1379 100644
---
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
@@ -27,6 +27,7 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType,
TGetInfoValue, TProtoco
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.hive.events.HiveSessionEvent
+import org.apache.kyuubi.engine.hive.udf.KDFRegistry
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.session.{AbstractSession, SessionHandle,
SessionManager}
@@ -48,6 +49,7 @@ class HiveSessionImpl(
val confClone = new HashMap[String, String]()
confClone.putAll(conf.asJava) // pass conf.asScala not support `put` method
hive.open(confClone)
+ KDFRegistry.registerAll()
EventBus.post(sessionEvent)
}
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/udf/KDFRegistry.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/udf/KDFRegistry.scala
new file mode 100644
index 000000000..5ff468b77
--- /dev/null
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/udf/KDFRegistry.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.hive.udf
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.hive.ql.exec.{FunctionRegistry,
UDFArgumentLengthException}
+import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.{PrimitiveObjectInspectorFactory,
StringObjectInspector}
+
+import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_ID,
KYUUBI_SESSION_USER_KEY}
+
+object KDFRegistry {
+
+ @transient
+ val registeredFunctions = new ArrayBuffer[KyuubiDefinedFunction]()
+
+ val kyuubi_version: KyuubiDefinedFunction = create(
+ "kyuubi_version",
+ new KyuubiVersionFunction,
+ "Return the version of Kyuubi Server",
+ "string",
+ "1.8.0")
+
+ val engine_name: KyuubiDefinedFunction = create(
+ "engine_name",
+ new EngineNameFunction,
+ "Return the name of engine",
+ "string",
+ "1.8.0")
+
+ val engine_id: KyuubiDefinedFunction = create(
+ "engine_id",
+ new EngineIdFunction,
+ "Return the id of engine",
+ "string",
+ "1.8.0")
+
+ val system_user: KyuubiDefinedFunction = create(
+ "system_user",
+ new SystemUserFunction,
+ "Return the system user",
+ "string",
+ "1.8.0")
+
+ val session_user: KyuubiDefinedFunction = create(
+ "session_user",
+ new SessionUserFunction,
+ "Return the session user",
+ "string",
+ "1.8.0")
+
+ def create(
+ name: String,
+ udf: GenericUDF,
+ description: String,
+ returnType: String,
+ since: String): KyuubiDefinedFunction = {
+ val kdf = KyuubiDefinedFunction(name, udf, description, returnType, since)
+ registeredFunctions += kdf
+ kdf
+ }
+
+ def registerAll(): Unit = {
+ for (func <- registeredFunctions) {
+ FunctionRegistry.registerTemporaryUDF(func.name, func.udf.getClass)
+ }
+ }
+}
+
+class KyuubiVersionFunction() extends GenericUDF {
+ private val returnOI: StringObjectInspector =
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector
+ override def initialize(arguments: Array[ObjectInspector]): ObjectInspector
= {
+ if (arguments.length != 0) {
+ throw new UDFArgumentLengthException("The function kyuubi_version()
takes no arguments, got "
+ + arguments.length)
+ }
+ returnOI
+ }
+
+ override def evaluate(arguments: Array[GenericUDF.DeferredObject]): AnyRef =
KYUUBI_VERSION
+
+ override def getDisplayString(children: Array[String]): String =
"kyuubi_version()"
+}
+
+class EngineNameFunction() extends GenericUDF {
+ private val returnOI: StringObjectInspector =
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector
+ override def initialize(arguments: Array[ObjectInspector]): ObjectInspector
= {
+ if (arguments.length != 0) {
+ throw new UDFArgumentLengthException("The function engine_name() takes
no arguments, got "
+ + arguments.length)
+ }
+ returnOI
+ }
+ override def evaluate(arguments: Array[GenericUDF.DeferredObject]): AnyRef =
+ SessionState.get.getConf.get("hive.engine.name", "")
+ override def getDisplayString(children: Array[String]): String =
"engine_name()"
+}
+
+class EngineIdFunction() extends GenericUDF {
+ private val returnOI: StringObjectInspector =
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector
+ override def initialize(arguments: Array[ObjectInspector]): ObjectInspector
= {
+ if (arguments.length != 0) {
+ throw new UDFArgumentLengthException("The function engine_id() takes no
arguments, got "
+ + arguments.length)
+ }
+ returnOI
+ }
+
+ override def evaluate(arguments: Array[GenericUDF.DeferredObject]): AnyRef =
+ SessionState.get.getConf.get(KYUUBI_ENGINE_ID, "")
+
+ override def getDisplayString(children: Array[String]): String =
"engine_id()"
+}
+
+class SystemUserFunction() extends GenericUDF {
+ private val returnOI: StringObjectInspector =
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector
+ override def initialize(arguments: Array[ObjectInspector]): ObjectInspector
= {
+ if (arguments.length != 0) {
+ throw new UDFArgumentLengthException("The function system_user() takes
no arguments, got "
+ + arguments.length)
+ }
+ returnOI
+ }
+
+ override def evaluate(arguments: Array[GenericUDF.DeferredObject]): AnyRef =
Utils.currentUser
+
+ override def getDisplayString(children: Array[String]): String =
"system_user()"
+}
+
+class SessionUserFunction() extends GenericUDF {
+ private val returnOI: StringObjectInspector =
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector
+ override def initialize(arguments: Array[ObjectInspector]): ObjectInspector
= {
+ if (arguments.length != 0) {
+ throw new UDFArgumentLengthException("The function session_user() takes
no arguments, got "
+ + arguments.length)
+ }
+ returnOI
+ }
+
+ override def evaluate(arguments: Array[GenericUDF.DeferredObject]): AnyRef =
{
+ SessionState.get.getConf.get(KYUUBI_SESSION_USER_KEY, "")
+ }
+
+ override def getDisplayString(children: Array[String]): String =
"session_user()"
+}
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/udf/KyuubiDefinedFunction.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/udf/KyuubiDefinedFunction.scala
new file mode 100644
index 000000000..ee91a804e
--- /dev/null
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/udf/KyuubiDefinedFunction.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.hive.udf
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF
+
+/**
+ * A wrapper for Hive's [[UserDefinedFunction]]
+ *
+ * @param name function name
+ * @param udf user-defined function
+ * @param description function description
+ */
+case class KyuubiDefinedFunction(
+ name: String,
+ udf: GenericUDF,
+ description: String,
+ returnType: String,
+ since: String)
diff --git
a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
index f949ec37a..eb10e0b41 100644
---
a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
+++
b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.hive.operation
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
-import org.apache.kyuubi.{HiveEngineTests, Utils}
+import org.apache.kyuubi.{HiveEngineTests, KYUUBI_VERSION, Utils}
import org.apache.kyuubi.engine.hive.HiveSQLEngine
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
@@ -49,4 +49,20 @@ class HiveOperationSuite extends HiveEngineTests {
assert(kyuubiStatement.getQueryId != null)
}
}
+
+ test("kyuubi defined function - kyuubi_version") {
+ withJdbcStatement("hive_engine_test") { statement =>
+ val rs = statement.executeQuery("SELECT kyuubi_version()")
+ assert(rs.next())
+ assert(rs.getString(1) == KYUUBI_VERSION)
+ }
+ }
+
+ test("kyuubi defined function - engine_name") {
+ withJdbcStatement("hive_engine_test") { statement =>
+ val rs = statement.executeQuery("SELECT engine_name()")
+ assert(rs.next())
+ assert(rs.getString(1).nonEmpty)
+ }
+ }
}
diff --git
a/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEnginePerUserSuite.scala
b/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEnginePerUserSuite.scala
index a4e6bb150..07e2bc0f2 100644
---
a/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEnginePerUserSuite.scala
+++
b/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEnginePerUserSuite.scala
@@ -61,4 +61,21 @@ class KyuubiOperationHiveEnginePerUserSuite extends
WithKyuubiServer with HiveEn
}
}
}
+
+ test("kyuubi defined function - system_user, session_user") {
+ withJdbcStatement("hive_engine_test") { statement =>
+ val rs = statement.executeQuery("SELECT system_user(), session_user()")
+ assert(rs.next())
+ assert(rs.getString(1) === Utils.currentUser)
+ assert(rs.getString(2) === Utils.currentUser)
+ }
+ }
+
+ test("kyuubi defined function - engine_id") {
+ withJdbcStatement("hive_engine_test") { statement =>
+ val rs = statement.executeQuery("SELECT engine_id()")
+ assert(rs.next())
+ assert(rs.getString(1).nonEmpty)
+ }
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index ae40fbd4f..052f1de0f 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -195,6 +195,7 @@ private[kyuubi] class EngineRef(
case TRINO =>
new TrinoProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
case HIVE_SQL =>
+ conf.setIfMissing(HiveProcessBuilder.HIVE_ENGINE_NAME,
defaultEngineName)
new HiveProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
case JDBC =>
new JdbcProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
index e86597c5c..61fe55887 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
@@ -29,7 +29,7 @@ import com.google.common.annotations.VisibleForTesting
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_HIVE_EXTRA_CLASSPATH,
ENGINE_HIVE_JAVA_OPTIONS, ENGINE_HIVE_MEMORY}
-import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_ID,
KYUUBI_SESSION_USER_KEY}
import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder}
import org.apache.kyuubi.engine.hive.HiveProcessBuilder._
import org.apache.kyuubi.operation.log.OperationLog
@@ -106,6 +106,8 @@ class HiveProcessBuilder(
buffer += "--conf"
buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
+ buffer += "--conf"
+ buffer += s"$KYUUBI_ENGINE_ID=$engineRefId"
for ((k, v) <- conf.getAll) {
buffer += "--conf"
@@ -121,4 +123,5 @@ class HiveProcessBuilder(
object HiveProcessBuilder {
final val HIVE_HADOOP_CLASSPATH_KEY = "HIVE_HADOOP_CLASSPATH"
+ final val HIVE_ENGINE_NAME = "hive.engine.name"
}