This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new b8fd37852 [KYUUBI #2029] Hive Backend Engine - Operation Logs
b8fd37852 is described below
commit b8fd37852611652665e53ebda20ad4b061f5e2f4
Author: Kent Yao <[email protected]>
AuthorDate: Tue May 10 10:19:36 2022 +0800
[KYUUBI #2029] Hive Backend Engine - Operation Logs
### _Why are the changes needed?_
Taking over #2174, all credits belongs to KenjiFujima
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2586 from yaooqinn/PR_2174.
Closes #2029
6b45a4f6 [Kent Yao] followup
7861fb20 [KenjiFujima] [KYUUBI #2029] Hive Backend Engine - Operation Logs
Lead-authored-by: Kent Yao <[email protected]>
Co-authored-by: KenjiFujima <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../engine/hive/operation/HiveOperation.scala | 19 ++++++++---
.../hive/operation/HiveOperationManager.scala | 37 +---------------------
.../spark/session/SparkSQLSessionManager.scala | 9 +-----
.../engine/trino/session/TrinoSessionManager.scala | 9 ------
.../org/apache/kyuubi/session/SessionManager.scala | 13 +++++---
.../kyuubi/operation/log/OperationLogSuite.scala | 6 ++--
.../apache/kyuubi/session/NoopSessionManager.scala | 10 ------
.../kyuubi/session/KyuubiSessionManager.scala | 4 +--
8 files changed, 29 insertions(+), 78 deletions(-)
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
index 93ec8ca72..7a92904b5 100644
---
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
@@ -19,9 +19,7 @@ package org.apache.kyuubi.engine.hive.operation
import java.util.concurrent.Future
-import org.apache.hive.service.cli.{OperationState => HiveOperationState}
-import org.apache.hive.service.cli.operation.Operation
-import org.apache.hive.service.cli.operation.OperationManager
+import org.apache.hive.service.cli.operation.{Operation, OperationManager}
import org.apache.hive.service.cli.session.{HiveSession, SessionManager =>
HiveSessionManager}
import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
@@ -67,11 +65,11 @@ abstract class HiveOperation(opType: OperationType,
session: Session)
}
override def cancel(): Unit = {
- internalHiveOperation.cancel(HiveOperationState.CANCELED)
+ delegatedOperationManager.cancelOperation(internalHiveOperation.getHandle)
}
override def close(): Unit = {
- internalHiveOperation.close()
+ delegatedOperationManager.closeOperation(internalHiveOperation.getHandle)
}
override def getStatus: OperationStatus = {
@@ -99,6 +97,17 @@ abstract class HiveOperation(opType: OperationType, session:
Session)
rowSet.toTRowSet
}
+ def getOperationLogRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet
= {
+ val tOrder = FetchOrientation.toTFetchOrientation(order)
+ val hiveOrder =
org.apache.hive.service.cli.FetchOrientation.getFetchOrientation(tOrder)
+ val handle = internalHiveOperation.getHandle
+ delegatedOperationManager.getOperationLogRowSet(
+ handle,
+ hiveOrder,
+ rowSetSize,
+ hive.getHiveConf).toTRowSet
+ }
+
override def isTimedOut: Boolean =
internalHiveOperation.isTimedOut(System.currentTimeMillis)
override def shouldRunAsync: Boolean = false
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
index eb46cb8bc..cdfea54fa 100644
---
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
@@ -17,17 +17,11 @@
package org.apache.kyuubi.engine.hive.operation
-import java.sql.SQLException
import java.util.List
-import scala.collection.JavaConverters._
-
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
-import org.apache.hive.service.cli.{RowSetFactory, TableSchema}
import org.apache.hive.service.rpc.thrift.TRowSet
-import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.{Operation, OperationHandle,
OperationManager}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.Session
@@ -130,37 +124,8 @@ class HiveOperationManager() extends
OperationManager("HiveOperationManager") {
opHandle: OperationHandle,
order: FetchOrientation,
maxRows: Int): TRowSet = {
- def getLogSchema: TableSchema = {
- val schema = new Schema
- val fieldSchema = new FieldSchema
- fieldSchema.setName("operation_log")
- fieldSchema.setType("string")
- schema.addToFieldSchemas(fieldSchema)
- new TableSchema(schema)
- }
-
val operation = getOperation(opHandle).asInstanceOf[HiveOperation]
- val internalHiveOperation = operation.internalHiveOperation
-
- val rowSet = RowSetFactory.create(getLogSchema,
operation.getProtocolVersion, false)
- val operationLog = internalHiveOperation.getOperationLog
- if (operationLog == null) {
- // TODO: #2029 Operation Log support: set and read hive one directly
- // throw KyuubiSQLException("Couldn't find log associated with operation
handle: " + opHandle)
- return rowSet.toTRowSet
- }
-
- try {
- val logs = operationLog.readOperationLog(false, maxRows)
- for (log <- logs.asScala) {
- rowSet.addRow(Array(log))
- }
- } catch {
- case e: SQLException =>
- throw new KyuubiSQLException(e.getMessage, e.getCause)
- }
-
- rowSet.toTRowSet
+ operation.getOperationLogRowSet(order, maxRows)
}
override def getQueryId(operation: Operation): String = {
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index 5b07f2f57..d0cf793ec 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -22,8 +22,7 @@ import java.util.concurrent.{ScheduledExecutorService,
TimeUnit}
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.spark.sql.SparkSession
-import org.apache.kyuubi.{KyuubiSQLException, Utils}
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel._
@@ -45,12 +44,6 @@ class SparkSQLSessionManager private (name: String, spark:
SparkSession)
def this(spark: SparkSession) =
this(classOf[SparkSQLSessionManager].getSimpleName, spark)
- override def initialize(conf: KyuubiConf): Unit = {
- val absPath =
Utils.getAbsolutePathFromWork(conf.get(ENGINE_OPERATION_LOG_DIR_ROOT))
- _operationLogRoot = Some(absPath.toAbsolutePath.toString)
- super.initialize(conf)
- }
-
val operationManager = new SparkSQLOperationManager()
private lazy val singleSparkSession = conf.get(ENGINE_SINGLE_SPARK_SESSION)
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
index aa1823bc7..6d56d5c05 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
@@ -19,9 +19,6 @@ package org.apache.kyuubi.engine.trino.session
import org.apache.hive.service.rpc.thrift.TProtocolVersion
-import org.apache.kyuubi.Utils
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_OPERATION_LOG_DIR_ROOT
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.trino.TrinoSqlEngine
@@ -33,12 +30,6 @@ class TrinoSessionManager
val operationManager = new TrinoOperationManager()
- override def initialize(conf: KyuubiConf): Unit = {
- val absPath =
Utils.getAbsolutePathFromWork(conf.get(ENGINE_OPERATION_LOG_DIR_ROOT))
- _operationLogRoot = Some(absPath.toAbsolutePath.toString)
- super.initialize(conf)
- }
-
override protected def createSession(
protocol: TProtocolVersion,
user: String,
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index ccc263915..2ed469fee 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -50,10 +50,14 @@ abstract class SessionManager(name: String) extends
CompositeService(name) {
private def initOperationLogRootDir(): Unit = {
try {
- _operationLogRoot.foreach { logRoot =>
- val rootPath = Paths.get(logRoot)
- Files.createDirectories(rootPath)
- }
+ val logRoot =
+ if (isServer) {
+ conf.get(SERVER_OPERATION_LOG_DIR_ROOT)
+ } else {
+ conf.get(ENGINE_OPERATION_LOG_DIR_ROOT)
+ }
+ val logPath =
Files.createDirectories(Utils.getAbsolutePathFromWork(logRoot))
+ _operationLogRoot = Some(logPath.toString)
} catch {
case e: IOException =>
error(s"Failed to initialize operation log root directory:
${_operationLogRoot}", e)
@@ -227,6 +231,7 @@ abstract class SessionManager(name: String) extends
CompositeService(name) {
}
override def initialize(conf: KyuubiConf): Unit = synchronized {
+ this.conf = conf
addService(operationManager)
initOperationLogRootDir()
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index 7523e005b..77df2bf32 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -172,9 +172,9 @@ class OperationLogSuite extends KyuubiFunSuite {
val tempDir = Utils.createTempDir().toFile
tempDir.setExecutable(false)
- sessionManager.setOperationLogRootDir(tempDir.getAbsolutePath +
"/operation_logs")
- assert(sessionManager.operationLogRoot.isDefined)
- sessionManager.initialize(KyuubiConf())
+ val conf = KyuubiConf()
+ .set(KyuubiConf.SERVER_OPERATION_LOG_DIR_ROOT, tempDir.getAbsolutePath +
"/operation_logs")
+ sessionManager.initialize(conf)
assert(sessionManager.operationLogRoot.isEmpty)
tempDir.setExecutable(true)
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
index 392759ede..3a4088ed2 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
@@ -20,21 +20,11 @@ package org.apache.kyuubi.session
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.{NoopOperationManager, OperationManager}
class NoopSessionManager extends SessionManager("noop") {
override val operationManager: OperationManager = new NoopOperationManager()
- def setOperationLogRootDir(logRoot: String): Unit = {
- _operationLogRoot = Some(logRoot)
- }
-
- override def initialize(conf: KyuubiConf): Unit = {
- _operationLogRoot = _operationLogRoot.orElse(Some("target/operation_logs"))
- super.initialize(conf)
- }
-
override protected def createSession(
protocol: TProtocolVersion,
user: String,
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 9e215168e..8b8382cd1 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -22,7 +22,7 @@ import java.util.UUID
import com.codahale.metrics.MetricRegistry
import org.apache.hive.service.rpc.thrift.TProtocolVersion
-import org.apache.kyuubi.{KyuubiSQLException, Utils}
+import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.cli.HandleIdentifier
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
@@ -50,8 +50,6 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
override def initialize(conf: KyuubiConf): Unit = {
addService(applicationManager)
addService(credentialsManager)
- val absPath =
Utils.getAbsolutePathFromWork(conf.get(SERVER_OPERATION_LOG_DIR_ROOT))
- _operationLogRoot = Some(absPath.toAbsolutePath.toString)
initSessionLimiter(conf)
super.initialize(conf)
}