This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1554032  [KYUUBI #1346] Support launch query engine asynchronously 
during opening session
1554032 is described below

commit 1554032df2576695f6345232dc61c0162644aec3
Author: fwang12 <[email protected]>
AuthorDate: Sat Nov 13 17:25:57 2021 +0800

    [KYUUBI #1346] Support launch query engine asynchronously during opening 
session
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    Support launch query engine asynchronously during opening session, make it 
more user-friendly.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1346 from turboFei/kyuubi_hive_jdbc_ext.
    
    Closes #1346
    
    513691d6 [fwang12] address comments
    63b8f901 [fwang12] remove hanle to client
    4590ad98 [fwang12] remove synchronized
    f3b136b4 [fwang12] add ut for open session with failure
    2cae9afa [fwang12] address comments
    35113d0f [fwang12] add engine session id to session event
    34c27f9c [fwang12] refactor log
    b5f7e639 [fwang12] block
    b8b0cf9d [fwang12] to support aync client
    66eb07f3 [fwang12] address comments
    623c0511 [fwang12] volatile
    282c9f07 [fwang12] refactor conf
    5047d91d [fwang12] server session handle is not same with that of engine
    dc1cb8bc [fwang12] update docs
    b773fd9d [fwang12] refactor
    5f657e44 [fwang12] add ut
    9e56cbab [fwang12] exclude engine init engine
    6abdb52c [fwang12] refactor
    d3e37a8c [fwang12] block until engine finished
    085b324d [fwang12] save
    
    Authored-by: fwang12 <[email protected]>
    Signed-off-by: fwang12 <[email protected]>
---
 docs/deployment/settings.md                        |  1 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      | 10 +++
 .../apache/kyuubi/operation/OperationType.scala    |  5 +-
 .../kyuubi/operation/HiveJDBCTestHelper.scala      |  3 +
 .../apache/kyuubi/events/KyuubiSessionEvent.scala  |  2 +
 .../apache/kyuubi/operation/ExecuteStatement.scala |  4 +-
 .../org/apache/kyuubi/operation/GetCatalogs.scala  |  4 +-
 .../org/apache/kyuubi/operation/GetColumns.scala   |  4 +-
 .../org/apache/kyuubi/operation/GetFunctions.scala |  4 +-
 .../org/apache/kyuubi/operation/GetSchemas.scala   |  4 +-
 .../apache/kyuubi/operation/GetTableTypes.scala    |  7 +-
 .../org/apache/kyuubi/operation/GetTables.scala    |  4 +-
 .../org/apache/kyuubi/operation/GetTypeInfo.scala  |  4 +-
 .../apache/kyuubi/operation/KyuubiOperation.scala  | 11 ++-
 .../kyuubi/operation/KyuubiOperationManager.scala  | 56 ++++----------
 .../org/apache/kyuubi/operation/LaunchEngine.scala | 64 +++++++++++++++
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  | 90 +++++++++++++++-------
 .../kyuubi/events/EventLoggingServiceSuite.scala   |  6 +-
 .../KyuubiOperationPerConnectionSuite.scala        | 35 +++++++++
 .../operation/KyuubiOperationPerUserSuite.scala    | 18 +++++
 20 files changed, 232 insertions(+), 104 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index bd213a1..fa658cd 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -295,6 +295,7 @@ kyuubi\.session\.conf<br>\.restrict\.list|<div 
style='width: 65pt;word-wrap: bre
 kyuubi\.session\.engine<br>\.check\.interval|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>PT5M</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>The check interval for engine 
timeout</div>|<div style='width: 30pt'>duration</div>|<div style='width: 
20pt'>1.0.0</div>
 kyuubi\.session\.engine<br>\.idle\.timeout|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>PT30M</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>engine timeout, the engine will self-terminate 
when it's not accessed for this duration</div>|<div style='width: 
30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
 kyuubi\.session\.engine<br>\.initialize\.timeout|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>PT3M</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>Timeout for starting the 
background engine, e.g. SparkSQLEngine.</div>|<div style='width: 
30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
+kyuubi\.session\.engine<br>\.launch\.async|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>When opening kyuubi session, whether to launch 
backend engine asynchronously. When true, the Kyuubi server will set up the 
connection with the client without delay as the backend engine will be created 
asynchronously.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 
20pt'>1.4.0</div>
 kyuubi\.session\.engine<br>\.log\.timeout|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>PT24H</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>If we use Spark as the engine then the session 
submit log is the console output of spark-submit. We will retain the session 
submit log until over the config value.</div>|<div style='width: 
30pt'>duration</div>|<div style='width: 20pt'>1.1.0</div>
 kyuubi\.session\.engine<br>\.login\.timeout|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>PT15S</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>The timeout of creating the connection to 
remote sql query engine</div>|<div style='width: 30pt'>duration</div>|<div 
style='width: 20pt'>1.0.0</div>
 kyuubi\.session\.engine<br>\.request\.timeout|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>The timeout of awaiting 
response after sending request to remote sql query engine</div>|<div 
style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 0a3eac3..a5c309c 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -657,6 +657,16 @@ object KyuubiConf {
       .checkValue(_ > 0, "the maximum must be positive integer.")
       .createWithDefault(10)
 
+  // TODO: make it true by default
+  val SESSION_ENGINE_LAUNCH_ASYNC: ConfigEntry[Boolean] =
+    buildConf("session.engine.launch.async")
+      .doc("When opening kyuubi session, whether to launch backend engine 
asynchronously." +
+        " When true, the Kyuubi server will set up the connection with the 
client without delay" +
+        " as the backend engine will be created asynchronously.")
+      .version("1.4.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val SERVER_EXEC_POOL_SIZE: ConfigEntry[Int] =
     buildConf("backend.server.exec.pool.size")
       .doc("Number of threads in the operation execution thread pool of Kyuubi 
server")
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
index e4686aa..abc7bf3 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
@@ -30,7 +30,8 @@ object OperationType extends Enumeration {
       GET_TABLES,
       GET_TABLE_TYPES,
       GET_COLUMNS,
-      GET_FUNCTIONS = Value
+      GET_FUNCTIONS,
+      LAUNCH_ENGINE = Value
 
   def getOperationType(from: TOperationType): OperationType = {
     from match {
@@ -57,6 +58,8 @@ object OperationType extends Enumeration {
       case GET_TABLE_TYPES => TOperationType.GET_TABLE_TYPES
       case GET_COLUMNS => TOperationType.GET_COLUMNS
       case GET_FUNCTIONS => TOperationType.GET_FUNCTIONS
+      // LAUNCH_ENGINE is not an OperationType defined in Hive thrift protocol
+      case LAUNCH_ENGINE => TOperationType.UNKNOWN
       case other =>
         throw new UnsupportedOperationException(s"Unsupported Operation type: 
${other.toString}")
     }
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
index c21392a..44bdb65 100644
--- 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
@@ -20,6 +20,8 @@ package org.apache.kyuubi.operation
 import java.sql.{DriverManager, ResultSet, SQLException, Statement}
 import java.util.Locale
 
+import scala.collection.JavaConverters._
+
 import org.apache.hive.service.rpc.thrift._
 import org.apache.hive.service.rpc.thrift.TCLIService.Iface
 import org.apache.hive.service.rpc.thrift.TOperationState._
@@ -154,6 +156,7 @@ trait HiveJDBCTestHelper extends KyuubiFunSuite {
       val req = new TOpenSessionReq()
       req.setUsername(user)
       req.setPassword("anonymous")
+      req.setConfiguration(_sessionConfigs.asJava)
       val resp = client.OpenSession(req)
       val handle = resp.getSessionHandle
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
index 44a9517..60f2286 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
@@ -24,6 +24,7 @@ import org.apache.kyuubi.session.KyuubiSessionImpl
 
 /**
  * @param sessionId server session id
+ * @param remoteSessionId remote engine session id
  * @param sessionName if user not specify it, we use empty string instead
  * @param user session user
  * @param clientIP client ip address
@@ -44,6 +45,7 @@ case class KyuubiSessionEvent(
     conf: Map[String, String],
     startTime: Long,
     var sessionId: String = "",
+    var remoteSessionId: String = "",
     var clientVersion: Int = -1,
     var openedTime: Long = -1L,
     var endTime: Long = -1L,
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 643080a..82a14f3 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -24,7 +24,6 @@ import org.apache.hive.service.rpc.thrift.TOperationState._
 import org.apache.thrift.TException
 
 import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.events.KyuubiStatementEvent
 import org.apache.kyuubi.metrics.MetricsConstants._
@@ -37,11 +36,10 @@ import org.apache.kyuubi.session.{KyuubiSessionImpl, 
KyuubiSessionManager, Sessi
 
 class ExecuteStatement(
     session: Session,
-    client: KyuubiSyncThriftClient,
     override val statement: String,
     override val shouldRunAsync: Boolean,
     queryTimeout: Long)
-  extends KyuubiOperation(OperationType.EXECUTE_STATEMENT, session, client) {
+  extends KyuubiOperation(OperationType.EXECUTE_STATEMENT, session) {
   EventLoggingService.onEvent(KyuubiStatementEvent(this))
 
   private final val _operationLog: OperationLog = if (shouldRunAsync) {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetCatalogs.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetCatalogs.scala
index f46dc7a..7553373 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetCatalogs.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetCatalogs.scala
@@ -17,11 +17,9 @@
 
 package org.apache.kyuubi.operation
 
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
 import org.apache.kyuubi.session.Session
 
-class GetCatalogs(session: Session, client: KyuubiSyncThriftClient)
-  extends KyuubiOperation(OperationType.GET_CATALOGS, session, client) {
+class GetCatalogs(session: Session) extends 
KyuubiOperation(OperationType.GET_CATALOGS, session) {
 
   override protected def runInternal(): Unit = {
     try {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetColumns.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetColumns.scala
index f732029..705372c 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetColumns.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetColumns.scala
@@ -17,17 +17,15 @@
 
 package org.apache.kyuubi.operation
 
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
 import org.apache.kyuubi.session.Session
 
 class GetColumns(
     session: Session,
-    client: KyuubiSyncThriftClient,
     catalogName: String,
     schemaName: String,
     tableName: String,
     columnName: String)
-  extends KyuubiOperation(OperationType.GET_COLUMNS, session, client) {
+  extends KyuubiOperation(OperationType.GET_COLUMNS, session) {
 
   override protected def runInternal(): Unit = {
     try {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetFunctions.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetFunctions.scala
index d8c6bcb..ecc475c 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetFunctions.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetFunctions.scala
@@ -17,16 +17,14 @@
 
 package org.apache.kyuubi.operation
 
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
 import org.apache.kyuubi.session.Session
 
 class GetFunctions(
     session: Session,
-    client: KyuubiSyncThriftClient,
     catalogName: String,
     schemaName: String,
     functionName: String)
-  extends KyuubiOperation(OperationType.GET_FUNCTIONS, session, client)  {
+  extends KyuubiOperation(OperationType.GET_FUNCTIONS, session)  {
 
   override protected def runInternal(): Unit = {
     try {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetSchemas.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetSchemas.scala
index 236e801..b6d3d54 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetSchemas.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetSchemas.scala
@@ -17,15 +17,13 @@
 
 package org.apache.kyuubi.operation
 
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
 import org.apache.kyuubi.session.Session
 
 class GetSchemas(
     session: Session,
-    client: KyuubiSyncThriftClient,
     catalogName: String,
     schemaName: String)
-  extends KyuubiOperation(OperationType.GET_SCHEMAS, session, client) {
+  extends KyuubiOperation(OperationType.GET_SCHEMAS, session) {
 
   override protected def runInternal(): Unit = {
     try {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTableTypes.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTableTypes.scala
index 0eea8da..8ed42e7 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTableTypes.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTableTypes.scala
@@ -17,13 +17,10 @@
 
 package org.apache.kyuubi.operation
 
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
 import org.apache.kyuubi.session.Session
 
-class GetTableTypes(
-    session: Session,
-    client: KyuubiSyncThriftClient)
-  extends KyuubiOperation(OperationType.GET_TABLE_TYPES, session, client) {
+class GetTableTypes(session: Session) extends
+  KyuubiOperation(OperationType.GET_TABLE_TYPES, session) {
 
   override protected def runInternal(): Unit = {
     try {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTables.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTables.scala
index 4400f4d..697c7d6 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTables.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTables.scala
@@ -17,17 +17,15 @@
 
 package org.apache.kyuubi.operation
 
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
 import org.apache.kyuubi.session.Session
 
 class GetTables(
     session: Session,
-    client: KyuubiSyncThriftClient,
     catalogName: String,
     schemaName: String,
     tableName: String,
     tableTypes: java.util.List[String])
-  extends KyuubiOperation(OperationType.GET_TABLES, session, client) {
+  extends KyuubiOperation(OperationType.GET_TABLES, session) {
 
   override protected def runInternal(): Unit = {
     try {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTypeInfo.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTypeInfo.scala
index c98a238..4a1b605 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTypeInfo.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTypeInfo.scala
@@ -17,11 +17,9 @@
 
 package org.apache.kyuubi.operation
 
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
 import org.apache.kyuubi.session.Session
 
-class GetTypeInfo(session: Session, client: KyuubiSyncThriftClient)
-  extends KyuubiOperation(OperationType.GET_TYPE_INFO, session, client) {
+class GetTypeInfo(session: Session) extends 
KyuubiOperation(OperationType.GET_TYPE_INFO, session) {
 
   override protected def runInternal(): Unit = {
     try {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index a69814d..83922c4 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -26,18 +26,17 @@ import org.apache.thrift.TException
 import org.apache.thrift.transport.TTransportException
 
 import org.apache.kyuubi.{KyuubiSQLException, Utils}
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
 import org.apache.kyuubi.metrics.MetricsConstants.STATEMENT_FAIL
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.operation.OperationType.OperationType
-import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.session.{KyuubiSessionImpl, Session}
 import org.apache.kyuubi.util.ThriftUtils
 
-abstract class KyuubiOperation(
-    opType: OperationType,
-    session: Session,
-    client: KyuubiSyncThriftClient) extends AbstractOperation(opType, session) 
{
+abstract class KyuubiOperation(opType: OperationType, session: Session)
+  extends AbstractOperation(opType, session) {
+
+  protected[operation] lazy val client = 
session.asInstanceOf[KyuubiSessionImpl].client
 
   @volatile protected var _remoteOpHandle: TOperationHandle = _
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index 35d7076..4af1c35 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -17,24 +17,20 @@
 
 package org.apache.kyuubi.operation
 
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.TimeUnit
 
 import org.apache.hive.service.rpc.thrift.TRowSet
 
-import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
-import org.apache.kyuubi.session.{Session, SessionHandle}
+import org.apache.kyuubi.session.{KyuubiSessionImpl, Session}
 import org.apache.kyuubi.util.ThriftUtils
 
 class KyuubiOperationManager private (name: String) extends 
OperationManager(name) {
 
   def this() = this(classOf[KyuubiOperationManager].getSimpleName)
 
-  private val handleToClient = new ConcurrentHashMap[SessionHandle, 
KyuubiSyncThriftClient]()
-
   private var queryTimeout: Option[Long] = None
 
   override def initialize(conf: KyuubiConf): Unit = {
@@ -42,14 +38,6 @@ class KyuubiOperationManager private (name: String) extends 
OperationManager(nam
     super.initialize(conf)
   }
 
-  private def getThriftClient(sessionHandle: SessionHandle): 
KyuubiSyncThriftClient = {
-    val client = handleToClient.get(sessionHandle)
-    if (client == null) {
-      throw KyuubiSQLException(s"$sessionHandle has not been initialized or 
already been closed")
-    }
-    client
-  }
-
   private def getQueryTimeout(clientQueryTimeout: Long): Long = {
     // If clientQueryTimeout is smaller than systemQueryTimeout value,
     // we use the clientQueryTimeout value.
@@ -61,34 +49,23 @@ class KyuubiOperationManager private (name: String) extends 
OperationManager(nam
     }
   }
 
-  def setConnection(sessionHandle: SessionHandle, client: 
KyuubiSyncThriftClient): Unit = {
-    handleToClient.put(sessionHandle, client)
-  }
-
-  def removeConnection(sessionHandle: SessionHandle): Unit = {
-    handleToClient.remove(sessionHandle)
-  }
-
   override def newExecuteStatementOperation(
       session: Session,
       statement: String,
       runAsync: Boolean,
       queryTimeout: Long): Operation = {
-    val client = getThriftClient(session.handle)
-    val operation = new ExecuteStatement(session, client, statement, runAsync,
+    val operation = new ExecuteStatement(session, statement, runAsync,
       getQueryTimeout(queryTimeout))
     addOperation(operation)
   }
 
   override def newGetTypeInfoOperation(session: Session): Operation = {
-    val client = getThriftClient(session.handle)
-    val operation = new GetTypeInfo(session, client)
+    val operation = new GetTypeInfo(session)
     addOperation(operation)
   }
 
   override def newGetCatalogsOperation(session: Session): Operation = {
-    val client = getThriftClient(session.handle)
-    val operation = new GetCatalogs(session, client)
+    val operation = new GetCatalogs(session)
     addOperation(operation)
   }
 
@@ -96,8 +73,7 @@ class KyuubiOperationManager private (name: String) extends 
OperationManager(nam
       session: Session,
       catalog: String,
       schema: String): Operation = {
-    val client = getThriftClient(session.handle)
-    val operation = new GetSchemas(session, client, catalog, schema)
+    val operation = new GetSchemas(session, catalog, schema)
     addOperation(operation)
   }
 
@@ -107,15 +83,12 @@ class KyuubiOperationManager private (name: String) 
extends OperationManager(nam
       schemaName: String,
       tableName: String,
       tableTypes: java.util.List[String]): Operation = {
-    val client = getThriftClient(session.handle)
-    val operation = new GetTables(
-      session, client, catalogName, schemaName, tableName, tableTypes)
+    val operation = new GetTables(session, catalogName, schemaName, tableName, 
tableTypes)
     addOperation(operation)
   }
 
   override def newGetTableTypesOperation(session: Session): Operation = {
-    val client = getThriftClient(session.handle)
-    val operation = new GetTableTypes(session, client)
+    val operation = new GetTableTypes(session)
     addOperation(operation)
   }
 
@@ -125,8 +98,7 @@ class KyuubiOperationManager private (name: String) extends 
OperationManager(nam
       schemaName: String,
       tableName: String,
       columnName: String): Operation = {
-    val client = getThriftClient(session.handle)
-    val operation = new GetColumns(session, client, catalogName, schemaName, 
tableName, columnName)
+    val operation = new GetColumns(session, catalogName, schemaName, 
tableName, columnName)
     addOperation(operation)
   }
 
@@ -135,8 +107,12 @@ class KyuubiOperationManager private (name: String) 
extends OperationManager(nam
       catalogName: String,
       schemaName: String,
       functionName: String): Operation = {
-    val client = getThriftClient(session.handle)
-    val operation = new GetFunctions(session, client, catalogName, schemaName, 
functionName)
+    val operation = new GetFunctions(session, catalogName, schemaName, 
functionName)
+    addOperation(operation)
+  }
+
+  def newLaunchEngineOperation(session: KyuubiSessionImpl, shouldRunAsync: 
Boolean): Operation = {
+    val operation = new LaunchEngine(session, shouldRunAsync)
     addOperation(operation)
   }
 
@@ -150,7 +126,7 @@ class KyuubiOperationManager private (name: String) extends 
OperationManager(nam
       case Some(log) => log.read(maxRows)
       case None =>
         val remoteHandle = operation.remoteOpHandle()
-        val client = getThriftClient(operation.getSession.handle)
+        val client = operation.client
         if (remoteHandle != null) {
           client.fetchResults(remoteHandle, order, maxRows, fetchLog = true)
         } else {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
new file mode 100644
index 0000000..9b89481
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.session.KyuubiSessionImpl
+
+class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: 
Boolean) extends
+  KyuubiOperation(OperationType.LAUNCH_ENGINE, session) {
+
+  private lazy val _operationLog: OperationLog = if (shouldRunAsync) {
+    OperationLog.createOperationLog(session, getHandle)
+  } else {
+    // when launch engine synchronously, operation log is not needed
+    null
+  }
+  override def getOperationLog: Option[OperationLog] = Option(_operationLog)
+
+  override protected def beforeRun(): Unit = {
+    OperationLog.setCurrentOperationLog(_operationLog)
+    setHasResultSet(false)
+    setState(OperationState.PENDING)
+  }
+
+  override protected def afterRun(): Unit = {
+    OperationLog.removeCurrentOperationLog()
+  }
+
+  override protected def runInternal(): Unit = {
+    val asyncOperation: Runnable = () => {
+      setState(OperationState.RUNNING)
+      try {
+        session.openEngineSession()
+        setState(OperationState.FINISHED)
+      } catch {
+        onError()
+      } finally {
+        // TODO: delay to close it for async mode to enable client to get more 
launch engine log
+        session.closeOperation(getHandle)
+      }
+    }
+    try {
+      val opHandle = 
session.sessionManager.submitBackgroundOperation(asyncOperation)
+      setBackgroundHandle(opHandle)
+    } catch onError("submitting open engine operation in background, request 
rejected")
+
+    if (!shouldRunAsync) getBackgroundHandle.get()
+  }
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index d847b0c..469f06c 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -42,7 +42,7 @@ class KyuubiSessionImpl(
     password: String,
     ipAddress: String,
     conf: Map[String, String],
-    sessionManager: KyuubiSessionManager,
+    override val sessionManager: KyuubiSessionManager,
     sessionConf: KyuubiConf)
   extends AbstractSession(protocol, user, password, ipAddress, conf, 
sessionManager) {
 
@@ -54,62 +54,94 @@ class KyuubiSessionImpl(
   }
 
   val engine: EngineRef = new EngineRef(sessionConf, user)
+  val launchEngineAsync = sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC)
+  private val launchEngineOp = 
sessionManager.operationManager.newLaunchEngineOperation(
+    this, launchEngineAsync)
+  @volatile
+  var engineLaunched: Boolean = false
 
   private val sessionEvent = KyuubiSessionEvent(this)
   EventLoggingService.onEvent(sessionEvent)
 
   private var transport: TTransport = _
-  private var client: KyuubiSyncThriftClient = _
+  private var _client: KyuubiSyncThriftClient = _
+  def client: KyuubiSyncThriftClient = _client
 
-  private var _handle: SessionHandle = _
-  override def handle: SessionHandle = _handle
+  override val handle: SessionHandle = SessionHandle(protocol)
+  private var _engineSessionHandle: SessionHandle = _
 
   override def open(): Unit = {
     MetricsSystem.tracing { ms =>
       ms.incCount(CONN_TOTAL)
       ms.incCount(MetricRegistry.name(CONN_OPEN, user))
     }
-    withZkClient(sessionConf) { zkClient =>
-      val (host, port) = engine.getOrCreate(zkClient)
-      openSession(host, port)
-    }
-    // we should call super.open after kyuubi session is already opened
+
+    // we should call super.open before running launch engine operation
     super.open()
+
+    runOperation(launchEngineOp)
   }
 
-  private def openSession(host: String, port: Int): Unit = {
-    val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
-    val loginTimeout = sessionConf.get(ENGINE_LOGIN_TIMEOUT).toInt
-    val requestTimeout = sessionConf.get(ENGINE_REQUEST_TIMEOUT).toInt
-    transport = PlainSASLHelper.getPlainTransport(
-      user, passwd, new TSocket(host, port, requestTimeout, loginTimeout))
-    if (!transport.isOpen) {
-      transport.open()
-      logSessionInfo(s"Connected to engine [$host:$port]")
+  private[kyuubi] def openEngineSession(): Unit = {
+    withZkClient(sessionConf) { zkClient =>
+      val (host, port) = engine.getOrCreate(zkClient)
+      val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
+      val loginTimeout = sessionConf.get(ENGINE_LOGIN_TIMEOUT).toInt
+      val requestTimeout = sessionConf.get(ENGINE_REQUEST_TIMEOUT).toInt
+      transport = PlainSASLHelper.getPlainTransport(
+        user, passwd, new TSocket(host, port, requestTimeout, loginTimeout))
+      if (!transport.isOpen) {
+        transport.open()
+        logSessionInfo(s"Connected to engine [$host:$port]")
+      }
+      _client = new KyuubiSyncThriftClient(new TBinaryProtocol(transport))
+      _engineSessionHandle = _client.openSession(protocol, user, passwd, 
normalizedConf)
+      logSessionInfo(s"Opened engine session[${_engineSessionHandle}]")
+      sessionEvent.openedTime = System.currentTimeMillis()
+      sessionEvent.sessionId = handle.identifier.toString
+      sessionEvent.remoteSessionId = _engineSessionHandle.identifier.toString
+      sessionEvent.clientVersion = handle.protocol.getValue
+      EventLoggingService.onEvent(sessionEvent)
     }
-    client = new KyuubiSyncThriftClient(new TBinaryProtocol(transport))
-    // use engine SessionHandle directly
-    _handle = client.openSession(protocol, user, passwd, normalizedConf)
-    sessionManager.operationManager.setConnection(handle, client)
-    sessionEvent.openedTime = System.currentTimeMillis()
-    sessionEvent.sessionId = handle.identifier.toString
-    sessionEvent.clientVersion = handle.protocol.getValue
-    EventLoggingService.onEvent(sessionEvent)
   }
 
   override protected def runOperation(operation: Operation): OperationHandle = 
{
-    sessionEvent.totalOperations += 1
+    if (operation != launchEngineOp) {
+      waitForEngineLaunched()
+      sessionEvent.totalOperations += 1
+    }
     super.runOperation(operation)
   }
 
+  private def waitForEngineLaunched(): Unit = {
+    if (!engineLaunched) {
+      Option(launchEngineOp).foreach { op =>
+        val waitingStartTime = System.currentTimeMillis()
+        logSessionInfo(s"Starting to wait the launch engine operation 
finished")
+
+        op.getBackgroundHandle.get()
+
+        val elapsedTime = System.currentTimeMillis() - waitingStartTime
+        logSessionInfo(s"Engine has been launched, elapsed time: ${elapsedTime 
/ 1000} s")
+
+        if (_engineSessionHandle == null) {
+          val ex = op.getStatus.exception.getOrElse(
+            KyuubiSQLException(s"Failed to launch engine for 
session[$handle]"))
+          throw ex
+        }
+
+        engineLaunched = true
+      }
+    }
+  }
+
   override def close(): Unit = {
     super.close()
     if (handle != null) {
-      sessionManager.operationManager.removeConnection(handle)
       
sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString)
     }
     try {
-      if (client != null) client.closeSession()
+      if (_client != null) _client.closeSession()
     } catch {
       case e: TException =>
         throw KyuubiSQLException("Error while cleaning up the engine 
resources", e)
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
index 7739861..b2fb9a7 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
@@ -107,11 +107,13 @@ class EventLoggingServiceSuite extends WithKyuubiServer 
with HiveJDBCTestHelper
         assert(res.getString("user") == Utils.currentUser)
         assert(res.getString("sessionName") == "test1")
         assert(res.getString("sessionId") == "")
+        assert(res.getString("remoteSessionId") == "")
         assert(res.getLong("startTime") > 0)
         assert(res.getInt("totalOperations") == 0)
         assert(res.next())
         assert(res.getInt("totalOperations") == 0)
         assert(res.getString("sessionId") != "")
+        assert(res.getString("remoteSessionId") != "")
         assert(res.getLong("openedTime") > 0)
         assert(res.next())
         assert(res.getInt("totalOperations") == 1)
@@ -121,7 +123,7 @@ class EventLoggingServiceSuite extends WithKyuubiServer 
with HiveJDBCTestHelper
     }
   }
 
-  test("engine session id should be same with server session id") {
+  test("engine session id is not same with server session id") {
     val name = UUID.randomUUID().toString
     withSessionConf()(Map.empty)(Map(KyuubiConf.SESSION_NAME.key -> name)) {
       withJdbcStatement() { statement =>
@@ -145,7 +147,7 @@ class EventLoggingServiceSuite extends WithKyuubiServer 
with HiveJDBCTestHelper
         val res2 = statement.executeQuery(
           s"SELECT * FROM `json`.`$engineSessionEventPath` " +
             s"where sessionId = '$serverSessionId' limit 1")
-        assert(res2.next())
+        assert(!res2.next())
       }
     }
   }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index 0a16da9..af3062d 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -97,4 +97,39 @@ class KyuubiOperationPerConnectionSuite extends 
WithKyuubiServer with HiveJDBCTe
         "Caused by: java.net.SocketException: Broken pipe (Write failed)")
     }
   }
+
+  test("test asynchronous open kyuubi session") {
+    withSessionConf(Map(
+      KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "true"
+      ))(Map.empty)(Map.empty) {
+      withSessionHandle { (client, handle) =>
+        val executeStmtReq = new TExecuteStatementReq()
+        executeStmtReq.setStatement("select engine_name()")
+        executeStmtReq.setSessionHandle(handle)
+        executeStmtReq.setRunAsync(false)
+        val executeStmtResp = client.ExecuteStatement(executeStmtReq)
+        val getOpStatusReq = new 
TGetOperationStatusReq(executeStmtResp.getOperationHandle)
+        val getOpStatusResp = client.GetOperationStatus(getOpStatusReq)
+        assert(getOpStatusResp.getStatus.getStatusCode === 
TStatusCode.SUCCESS_STATUS)
+        assert(getOpStatusResp.getOperationState === 
TOperationState.FINISHED_STATE)
+      }
+    }
+  }
+
+  test("test asynchronous open kyuubi session failure") {
+    withSessionConf(Map(
+      KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "true",
+      "spark.master" -> "invalid"
+    ))(Map.empty)(Map.empty) {
+      withSessionHandle { (client, handle) =>
+        val executeStmtReq = new TExecuteStatementReq()
+        executeStmtReq.setStatement("select engine_name()")
+        executeStmtReq.setSessionHandle(handle)
+        executeStmtReq.setRunAsync(false)
+        val executeStmtResp = client.ExecuteStatement(executeStmtReq)
+        assert(executeStmtResp.getStatus.getStatusCode == 
TStatusCode.ERROR_STATUS)
+        
assert(executeStmtResp.getStatus.getErrorMessage.contains("kyuubi-spark-sql-engine.log"))
+      }
+    }
+  }
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
index a6aa3a3..ee31b27 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
@@ -65,6 +65,24 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer 
with SparkQueryTests
     assert(r1 === r2)
   }
 
+  test("ensure open session asynchronously for USER mode still share the same 
engine") {
+    withJdbcStatement() { statement =>
+      val resultSet = statement.executeQuery("SELECT engine_id()")
+      assert(resultSet.next())
+      val engineId = resultSet.getString(1)
+
+      withSessionConf(Map(
+        KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "true"
+      ))(Map.empty)(Map.empty) {
+        withJdbcStatement() { stmt =>
+          val rs = stmt.executeQuery("SELECT engine_id()")
+          assert(rs.next())
+          assert(rs.getString(1) == engineId)
+        }
+      }
+    }
+  }
+
   test("ensure two connections share the same engine when specifying 
subDomain.") {
     withSessionConf()(
       Map(

Reply via email to