This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new b8fd37852 [KYUUBI #2029] Hive Backend Engine - Operation Logs
b8fd37852 is described below

commit b8fd37852611652665e53ebda20ad4b061f5e2f4
Author: Kent Yao <[email protected]>
AuthorDate: Tue May 10 10:19:36 2022 +0800

    [KYUUBI #2029] Hive Backend Engine - Operation Logs
    
    ### _Why are the changes needed?_
    
    Taking over #2174, all credits belongs to KenjiFujima
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2586 from yaooqinn/PR_2174.
    
    Closes #2029
    
    6b45a4f6 [Kent Yao] followup
    7861fb20 [KenjiFujima] [KYUUBI #2029] Hive Backend Engine - Operation Logs
    
    Lead-authored-by: Kent Yao <[email protected]>
    Co-authored-by: KenjiFujima <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../engine/hive/operation/HiveOperation.scala      | 19 ++++++++---
 .../hive/operation/HiveOperationManager.scala      | 37 +---------------------
 .../spark/session/SparkSQLSessionManager.scala     |  9 +-----
 .../engine/trino/session/TrinoSessionManager.scala |  9 ------
 .../org/apache/kyuubi/session/SessionManager.scala | 13 +++++---
 .../kyuubi/operation/log/OperationLogSuite.scala   |  6 ++--
 .../apache/kyuubi/session/NoopSessionManager.scala | 10 ------
 .../kyuubi/session/KyuubiSessionManager.scala      |  4 +--
 8 files changed, 29 insertions(+), 78 deletions(-)

diff --git 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
index 93ec8ca72..7a92904b5 100644
--- 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
+++ 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
@@ -19,9 +19,7 @@ package org.apache.kyuubi.engine.hive.operation
 
 import java.util.concurrent.Future
 
-import org.apache.hive.service.cli.{OperationState => HiveOperationState}
-import org.apache.hive.service.cli.operation.Operation
-import org.apache.hive.service.cli.operation.OperationManager
+import org.apache.hive.service.cli.operation.{Operation, OperationManager}
 import org.apache.hive.service.cli.session.{HiveSession, SessionManager => 
HiveSessionManager}
 import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
 
@@ -67,11 +65,11 @@ abstract class HiveOperation(opType: OperationType, 
session: Session)
   }
 
   override def cancel(): Unit = {
-    internalHiveOperation.cancel(HiveOperationState.CANCELED)
+    delegatedOperationManager.cancelOperation(internalHiveOperation.getHandle)
   }
 
   override def close(): Unit = {
-    internalHiveOperation.close()
+    delegatedOperationManager.closeOperation(internalHiveOperation.getHandle)
   }
 
   override def getStatus: OperationStatus = {
@@ -99,6 +97,17 @@ abstract class HiveOperation(opType: OperationType, session: 
Session)
     rowSet.toTRowSet
   }
 
+  def getOperationLogRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet 
= {
+    val tOrder = FetchOrientation.toTFetchOrientation(order)
+    val hiveOrder = 
org.apache.hive.service.cli.FetchOrientation.getFetchOrientation(tOrder)
+    val handle = internalHiveOperation.getHandle
+    delegatedOperationManager.getOperationLogRowSet(
+      handle,
+      hiveOrder,
+      rowSetSize,
+      hive.getHiveConf).toTRowSet
+  }
+
   override def isTimedOut: Boolean = 
internalHiveOperation.isTimedOut(System.currentTimeMillis)
 
   override def shouldRunAsync: Boolean = false
diff --git 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
index eb46cb8bc..cdfea54fa 100644
--- 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
+++ 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
@@ -17,17 +17,11 @@
 
 package org.apache.kyuubi.engine.hive.operation
 
-import java.sql.SQLException
 import java.util.List
 
-import scala.collection.JavaConverters._
-
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
-import org.apache.hive.service.cli.{RowSetFactory, TableSchema}
 import org.apache.hive.service.rpc.thrift.TRowSet
 
-import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.operation.{Operation, OperationHandle, 
OperationManager}
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.session.Session
@@ -130,37 +124,8 @@ class HiveOperationManager() extends 
OperationManager("HiveOperationManager") {
       opHandle: OperationHandle,
       order: FetchOrientation,
       maxRows: Int): TRowSet = {
-    def getLogSchema: TableSchema = {
-      val schema = new Schema
-      val fieldSchema = new FieldSchema
-      fieldSchema.setName("operation_log")
-      fieldSchema.setType("string")
-      schema.addToFieldSchemas(fieldSchema)
-      new TableSchema(schema)
-    }
-
     val operation = getOperation(opHandle).asInstanceOf[HiveOperation]
-    val internalHiveOperation = operation.internalHiveOperation
-
-    val rowSet = RowSetFactory.create(getLogSchema, 
operation.getProtocolVersion, false)
-    val operationLog = internalHiveOperation.getOperationLog
-    if (operationLog == null) {
-      // TODO: #2029 Operation Log support: set and read hive one directly
-      // throw KyuubiSQLException("Couldn't find log associated with operation 
handle: " + opHandle)
-      return rowSet.toTRowSet
-    }
-
-    try {
-      val logs = operationLog.readOperationLog(false, maxRows)
-      for (log <- logs.asScala) {
-        rowSet.addRow(Array(log))
-      }
-    } catch {
-      case e: SQLException =>
-        throw new KyuubiSQLException(e.getMessage, e.getCause)
-    }
-
-    rowSet.toTRowSet
+    operation.getOperationLogRowSet(order, maxRows)
   }
 
   override def getQueryId(operation: Operation): String = {
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index 5b07f2f57..d0cf793ec 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -22,8 +22,7 @@ import java.util.concurrent.{ScheduledExecutorService, 
TimeUnit}
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 import org.apache.spark.sql.SparkSession
 
-import org.apache.kyuubi.{KyuubiSQLException, Utils}
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.ShareLevel._
@@ -45,12 +44,6 @@ class SparkSQLSessionManager private (name: String, spark: 
SparkSession)
 
   def this(spark: SparkSession) = 
this(classOf[SparkSQLSessionManager].getSimpleName, spark)
 
-  override def initialize(conf: KyuubiConf): Unit = {
-    val absPath = 
Utils.getAbsolutePathFromWork(conf.get(ENGINE_OPERATION_LOG_DIR_ROOT))
-    _operationLogRoot = Some(absPath.toAbsolutePath.toString)
-    super.initialize(conf)
-  }
-
   val operationManager = new SparkSQLOperationManager()
 
   private lazy val singleSparkSession = conf.get(ENGINE_SINGLE_SPARK_SESSION)
diff --git 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
index aa1823bc7..6d56d5c05 100644
--- 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
+++ 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
@@ -19,9 +19,6 @@ package org.apache.kyuubi.engine.trino.session
 
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
-import org.apache.kyuubi.Utils
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_OPERATION_LOG_DIR_ROOT
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.trino.TrinoSqlEngine
@@ -33,12 +30,6 @@ class TrinoSessionManager
 
   val operationManager = new TrinoOperationManager()
 
-  override def initialize(conf: KyuubiConf): Unit = {
-    val absPath = 
Utils.getAbsolutePathFromWork(conf.get(ENGINE_OPERATION_LOG_DIR_ROOT))
-    _operationLogRoot = Some(absPath.toAbsolutePath.toString)
-    super.initialize(conf)
-  }
-
   override protected def createSession(
       protocol: TProtocolVersion,
       user: String,
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index ccc263915..2ed469fee 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -50,10 +50,14 @@ abstract class SessionManager(name: String) extends 
CompositeService(name) {
 
   private def initOperationLogRootDir(): Unit = {
     try {
-      _operationLogRoot.foreach { logRoot =>
-        val rootPath = Paths.get(logRoot)
-        Files.createDirectories(rootPath)
-      }
+      val logRoot =
+        if (isServer) {
+          conf.get(SERVER_OPERATION_LOG_DIR_ROOT)
+        } else {
+          conf.get(ENGINE_OPERATION_LOG_DIR_ROOT)
+        }
+      val logPath = 
Files.createDirectories(Utils.getAbsolutePathFromWork(logRoot))
+      _operationLogRoot = Some(logPath.toString)
     } catch {
       case e: IOException =>
         error(s"Failed to initialize operation log root directory: 
${_operationLogRoot}", e)
@@ -227,6 +231,7 @@ abstract class SessionManager(name: String) extends 
CompositeService(name) {
   }
 
   override def initialize(conf: KyuubiConf): Unit = synchronized {
+    this.conf = conf
     addService(operationManager)
     initOperationLogRootDir()
 
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index 7523e005b..77df2bf32 100644
--- 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -172,9 +172,9 @@ class OperationLogSuite extends KyuubiFunSuite {
     val tempDir = Utils.createTempDir().toFile
     tempDir.setExecutable(false)
 
-    sessionManager.setOperationLogRootDir(tempDir.getAbsolutePath + 
"/operation_logs")
-    assert(sessionManager.operationLogRoot.isDefined)
-    sessionManager.initialize(KyuubiConf())
+    val conf = KyuubiConf()
+      .set(KyuubiConf.SERVER_OPERATION_LOG_DIR_ROOT, tempDir.getAbsolutePath + 
"/operation_logs")
+    sessionManager.initialize(conf)
     assert(sessionManager.operationLogRoot.isEmpty)
 
     tempDir.setExecutable(true)
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
index 392759ede..3a4088ed2 100644
--- 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
@@ -20,21 +20,11 @@ package org.apache.kyuubi.session
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.operation.{NoopOperationManager, OperationManager}
 
 class NoopSessionManager extends SessionManager("noop") {
   override val operationManager: OperationManager = new NoopOperationManager()
 
-  def setOperationLogRootDir(logRoot: String): Unit = {
-    _operationLogRoot = Some(logRoot)
-  }
-
-  override def initialize(conf: KyuubiConf): Unit = {
-    _operationLogRoot = _operationLogRoot.orElse(Some("target/operation_logs"))
-    super.initialize(conf)
-  }
-
   override protected def createSession(
       protocol: TProtocolVersion,
       user: String,
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 9e215168e..8b8382cd1 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -22,7 +22,7 @@ import java.util.UUID
 import com.codahale.metrics.MetricRegistry
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
-import org.apache.kyuubi.{KyuubiSQLException, Utils}
+import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.cli.HandleIdentifier
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
@@ -50,8 +50,6 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
   override def initialize(conf: KyuubiConf): Unit = {
     addService(applicationManager)
     addService(credentialsManager)
-    val absPath = 
Utils.getAbsolutePathFromWork(conf.get(SERVER_OPERATION_LOG_DIR_ROOT))
-    _operationLogRoot = Some(absPath.toAbsolutePath.toString)
     initSessionLimiter(conf)
     super.initialize(conf)
   }

Reply via email to