This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 64dd50876 [KYUUBI #4940] Implement Kyuubi UDF in Hive engine
64dd50876 is described below

commit 64dd50876f2e1535f65f69e48486c6fc1dea2dc5
Author: senmiaoliu <[email protected]>
AuthorDate: Tue Aug 1 17:07:07 2023 +0800

    [KYUUBI #4940] Implement Kyuubi UDF in Hive engine
    
    ### _Why are the changes needed?_
    
    close #4940
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    Closes #5110 from lsm1/features/kyuubi_4940.
    
    Closes #4940
    
    6c0a9a37f [senmiaoliu] add kdf for hive engine
    
    Authored-by: senmiaoliu <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../apache/kyuubi/engine/hive/HiveSQLEngine.scala  |   4 +
 .../engine/hive/operation/HiveOperation.scala      |   3 +
 .../engine/hive/session/HiveSessionImpl.scala      |   2 +
 .../kyuubi/engine/hive/udf/KDFRegistry.scala       | 169 +++++++++++++++++++++
 .../engine/hive/udf/KyuubiDefinedFunction.scala    |  34 +++++
 .../engine/hive/operation/HiveOperationSuite.scala |  18 ++-
 .../KyuubiOperationHiveEnginePerUserSuite.scala    |  17 +++
 .../scala/org/apache/kyuubi/engine/EngineRef.scala |   1 +
 .../kyuubi/engine/hive/HiveProcessBuilder.scala    |   5 +-
 9 files changed, 251 insertions(+), 2 deletions(-)

diff --git 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
index 839da710e..3cc426c43 100644
--- 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
+++ 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
@@ -18,6 +18,7 @@
 package org.apache.kyuubi.engine.hive
 
 import java.security.PrivilegedExceptionAction
+import java.time.Instant
 
 import scala.util.control.NonFatal
 
@@ -65,6 +66,7 @@ object HiveSQLEngine extends Logging {
   var currentEngine: Option[HiveSQLEngine] = None
   val hiveConf = new HiveConf()
   val kyuubiConf = new KyuubiConf()
+  val user = UserGroupInformation.getCurrentUser.getShortUserName
 
   def startEngine(): Unit = {
     try {
@@ -97,6 +99,8 @@ object HiveSQLEngine extends Logging {
     }
 
     val engine = new HiveSQLEngine()
+    val appName = s"kyuubi_${user}_hive_${Instant.now}"
+    hiveConf.setIfUnset("hive.engine.name", appName)
     info(s"Starting ${engine.getName}")
     engine.initialize(kyuubiConf)
     EventBus.post(HiveEngineEvent(engine))
diff --git 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
index 2df4a072a..c7166356d 100644
--- 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
+++ 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
@@ -24,6 +24,7 @@ import org.apache.hive.service.cli.session.{HiveSession, 
SessionManager => HiveS
 import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
 
 import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
 import org.apache.kyuubi.engine.hive.session.HiveSessionImpl
 import org.apache.kyuubi.operation.{AbstractOperation, FetchOrientation, 
OperationState, OperationStatus}
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@@ -43,12 +44,14 @@ abstract class HiveOperation(session: Session) extends 
AbstractOperation(session
 
   override def beforeRun(): Unit = {
     setState(OperationState.RUNNING)
+    hive.getHiveConf.set(KYUUBI_SESSION_USER_KEY, session.user)
   }
 
   override def afterRun(): Unit = {
     withLockRequired {
       if (!isTerminalState(state)) {
         setState(OperationState.FINISHED)
+        hive.getHiveConf.unset(KYUUBI_SESSION_USER_KEY)
       }
     }
   }
diff --git 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
index 3b85f94df..5069b1379 100644
--- 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
+++ 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
@@ -27,6 +27,7 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, 
TGetInfoValue, TProtoco
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.engine.hive.events.HiveSessionEvent
+import org.apache.kyuubi.engine.hive.udf.KDFRegistry
 import org.apache.kyuubi.events.EventBus
 import org.apache.kyuubi.operation.{Operation, OperationHandle}
 import org.apache.kyuubi.session.{AbstractSession, SessionHandle, 
SessionManager}
@@ -48,6 +49,7 @@ class HiveSessionImpl(
     val confClone = new HashMap[String, String]()
     confClone.putAll(conf.asJava) // pass conf.asScala not support `put` method
     hive.open(confClone)
+    KDFRegistry.registerAll()
     EventBus.post(sessionEvent)
   }
 
diff --git 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/udf/KDFRegistry.scala
 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/udf/KDFRegistry.scala
new file mode 100644
index 000000000..5ff468b77
--- /dev/null
+++ 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/udf/KDFRegistry.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.hive.udf
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, 
UDFArgumentLengthException}
+import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.{PrimitiveObjectInspectorFactory,
 StringObjectInspector}
+
+import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_ID, 
KYUUBI_SESSION_USER_KEY}
+
+object KDFRegistry {
+
+  @transient
+  val registeredFunctions = new ArrayBuffer[KyuubiDefinedFunction]()
+
+  val kyuubi_version: KyuubiDefinedFunction = create(
+    "kyuubi_version",
+    new KyuubiVersionFunction,
+    "Return the version of Kyuubi Server",
+    "string",
+    "1.8.0")
+
+  val engine_name: KyuubiDefinedFunction = create(
+    "engine_name",
+    new EngineNameFunction,
+    "Return the name of engine",
+    "string",
+    "1.8.0")
+
+  val engine_id: KyuubiDefinedFunction = create(
+    "engine_id",
+    new EngineIdFunction,
+    "Return the id of engine",
+    "string",
+    "1.8.0")
+
+  val system_user: KyuubiDefinedFunction = create(
+    "system_user",
+    new SystemUserFunction,
+    "Return the system user",
+    "string",
+    "1.8.0")
+
+  val session_user: KyuubiDefinedFunction = create(
+    "session_user",
+    new SessionUserFunction,
+    "Return the session user",
+    "string",
+    "1.8.0")
+
+  def create(
+      name: String,
+      udf: GenericUDF,
+      description: String,
+      returnType: String,
+      since: String): KyuubiDefinedFunction = {
+    val kdf = KyuubiDefinedFunction(name, udf, description, returnType, since)
+    registeredFunctions += kdf
+    kdf
+  }
+
+  def registerAll(): Unit = {
+    for (func <- registeredFunctions) {
+      FunctionRegistry.registerTemporaryUDF(func.name, func.udf.getClass)
+    }
+  }
+}
+
+class KyuubiVersionFunction() extends GenericUDF {
+  private val returnOI: StringObjectInspector =
+    PrimitiveObjectInspectorFactory.javaStringObjectInspector
+  override def initialize(arguments: Array[ObjectInspector]): ObjectInspector 
= {
+    if (arguments.length != 0) {
+      throw new UDFArgumentLengthException("The function kyuubi_version() 
takes no arguments, got "
+        + arguments.length)
+    }
+    returnOI
+  }
+
+  override def evaluate(arguments: Array[GenericUDF.DeferredObject]): AnyRef = 
KYUUBI_VERSION
+
+  override def getDisplayString(children: Array[String]): String = 
"kyuubi_version()"
+}
+
+class EngineNameFunction() extends GenericUDF {
+  private val returnOI: StringObjectInspector =
+    PrimitiveObjectInspectorFactory.javaStringObjectInspector
+  override def initialize(arguments: Array[ObjectInspector]): ObjectInspector 
= {
+    if (arguments.length != 0) {
+      throw new UDFArgumentLengthException("The function engine_name() takes 
no arguments, got "
+        + arguments.length)
+    }
+    returnOI
+  }
+  override def evaluate(arguments: Array[GenericUDF.DeferredObject]): AnyRef =
+    SessionState.get.getConf.get("hive.engine.name", "")
+  override def getDisplayString(children: Array[String]): String = 
"engine_name()"
+}
+
+class EngineIdFunction() extends GenericUDF {
+  private val returnOI: StringObjectInspector =
+    PrimitiveObjectInspectorFactory.javaStringObjectInspector
+  override def initialize(arguments: Array[ObjectInspector]): ObjectInspector 
= {
+    if (arguments.length != 0) {
+      throw new UDFArgumentLengthException("The function engine_id() takes no 
arguments, got "
+        + arguments.length)
+    }
+    returnOI
+  }
+
+  override def evaluate(arguments: Array[GenericUDF.DeferredObject]): AnyRef =
+    SessionState.get.getConf.get(KYUUBI_ENGINE_ID, "")
+
+  override def getDisplayString(children: Array[String]): String = 
"engine_id()"
+}
+
+class SystemUserFunction() extends GenericUDF {
+  private val returnOI: StringObjectInspector =
+    PrimitiveObjectInspectorFactory.javaStringObjectInspector
+  override def initialize(arguments: Array[ObjectInspector]): ObjectInspector 
= {
+    if (arguments.length != 0) {
+      throw new UDFArgumentLengthException("The function system_user() takes 
no arguments, got "
+        + arguments.length)
+    }
+    returnOI
+  }
+
+  override def evaluate(arguments: Array[GenericUDF.DeferredObject]): AnyRef = 
Utils.currentUser
+
+  override def getDisplayString(children: Array[String]): String = 
"system_user()"
+}
+
+class SessionUserFunction() extends GenericUDF {
+  private val returnOI: StringObjectInspector =
+    PrimitiveObjectInspectorFactory.javaStringObjectInspector
+  override def initialize(arguments: Array[ObjectInspector]): ObjectInspector 
= {
+    if (arguments.length != 0) {
+      throw new UDFArgumentLengthException("The function session_user() takes 
no arguments, got "
+        + arguments.length)
+    }
+    returnOI
+  }
+
+  override def evaluate(arguments: Array[GenericUDF.DeferredObject]): AnyRef = 
{
+    SessionState.get.getConf.get(KYUUBI_SESSION_USER_KEY, "")
+  }
+
+  override def getDisplayString(children: Array[String]): String = 
"session_user()"
+}
diff --git 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/udf/KyuubiDefinedFunction.scala
 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/udf/KyuubiDefinedFunction.scala
new file mode 100644
index 000000000..ee91a804e
--- /dev/null
+++ 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/udf/KyuubiDefinedFunction.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.hive.udf
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF
+
+/**
+ * A wrapper for Hive's [[UserDefinedFunction]]
+ *
+ * @param name function name
+ * @param udf user-defined function
+ * @param description function description
+ */
+case class KyuubiDefinedFunction(
+    name: String,
+    udf: GenericUDF,
+    description: String,
+    returnType: String,
+    since: String)
diff --git 
a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
 
b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
index f949ec37a..eb10e0b41 100644
--- 
a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
+++ 
b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.hive.operation
 
 import org.apache.commons.lang3.{JavaVersion, SystemUtils}
 
-import org.apache.kyuubi.{HiveEngineTests, Utils}
+import org.apache.kyuubi.{HiveEngineTests, KYUUBI_VERSION, Utils}
 import org.apache.kyuubi.engine.hive.HiveSQLEngine
 import org.apache.kyuubi.jdbc.hive.KyuubiStatement
 
@@ -49,4 +49,20 @@ class HiveOperationSuite extends HiveEngineTests {
       assert(kyuubiStatement.getQueryId != null)
     }
   }
+
+  test("kyuubi defined function - kyuubi_version") {
+    withJdbcStatement("hive_engine_test") { statement =>
+      val rs = statement.executeQuery("SELECT kyuubi_version()")
+      assert(rs.next())
+      assert(rs.getString(1) == KYUUBI_VERSION)
+    }
+  }
+
+  test("kyuubi defined function - engine_name") {
+    withJdbcStatement("hive_engine_test") { statement =>
+      val rs = statement.executeQuery("SELECT engine_name()")
+      assert(rs.next())
+      assert(rs.getString(1).nonEmpty)
+    }
+  }
 }
diff --git 
a/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEnginePerUserSuite.scala
 
b/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEnginePerUserSuite.scala
index a4e6bb150..07e2bc0f2 100644
--- 
a/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEnginePerUserSuite.scala
+++ 
b/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEnginePerUserSuite.scala
@@ -61,4 +61,21 @@ class KyuubiOperationHiveEnginePerUserSuite extends 
WithKyuubiServer with HiveEn
       }
     }
   }
+
+  test("kyuubi defined function - system_user, session_user") {
+    withJdbcStatement("hive_engine_test") { statement =>
+      val rs = statement.executeQuery("SELECT system_user(), session_user()")
+      assert(rs.next())
+      assert(rs.getString(1) === Utils.currentUser)
+      assert(rs.getString(2) === Utils.currentUser)
+    }
+  }
+
+  test("kyuubi defined function - engine_id") {
+    withJdbcStatement("hive_engine_test") { statement =>
+      val rs = statement.executeQuery("SELECT engine_id()")
+      assert(rs.next())
+      assert(rs.getString(1).nonEmpty)
+    }
+  }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index ae40fbd4f..052f1de0f 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -195,6 +195,7 @@ private[kyuubi] class EngineRef(
       case TRINO =>
         new TrinoProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
       case HIVE_SQL =>
+        conf.setIfMissing(HiveProcessBuilder.HIVE_ENGINE_NAME, 
defaultEngineName)
         new HiveProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
       case JDBC =>
         new JdbcProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
index e86597c5c..61fe55887 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
@@ -29,7 +29,7 @@ import com.google.common.annotations.VisibleForTesting
 import org.apache.kyuubi._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.{ENGINE_HIVE_EXTRA_CLASSPATH, 
ENGINE_HIVE_JAVA_OPTIONS, ENGINE_HIVE_MEMORY}
-import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_ID, 
KYUUBI_SESSION_USER_KEY}
 import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder}
 import org.apache.kyuubi.engine.hive.HiveProcessBuilder._
 import org.apache.kyuubi.operation.log.OperationLog
@@ -106,6 +106,8 @@ class HiveProcessBuilder(
 
     buffer += "--conf"
     buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
+    buffer += "--conf"
+    buffer += s"$KYUUBI_ENGINE_ID=$engineRefId"
 
     for ((k, v) <- conf.getAll) {
       buffer += "--conf"
@@ -121,4 +123,5 @@ class HiveProcessBuilder(
 
 object HiveProcessBuilder {
   final val HIVE_HADOOP_CLASSPATH_KEY = "HIVE_HADOOP_CLASSPATH"
+  final val HIVE_ENGINE_NAME = "hive.engine.name"
 }

Reply via email to