This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 347c932  [KYUUBI #1032] Keep server session id and engien session id 
consistent
347c932 is described below

commit 347c93276812ec46bc1f62e7f0d6668615729939
Author: ulysses-you <[email protected]>
AuthorDate: Tue Sep 7 09:51:08 2021 +0800

    [KYUUBI #1032] Keep server session id and engien session id consistent
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    Closes https://github.com/apache/incubator-kyuubi/issues/1032
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1033 from ulysses-you/engine-session-id.
    
    Closes #1032
    
    5aa5f5f8 [ulysses-you] indention
    a97ae1b5 [ulysses-you] nit
    aef754b4 [ulysses-you] engine ref id
    62399c40 [ulysses-you] cleanup
    2bf13a63 [ulysses-you] nit
    11e925dd [ulysses-you] engine side generate
    da3f51be [ulysses-you] nit
    6136f66b [ulysses-you] keep server session id and engien session id 
consistent
    
    Authored-by: ulysses-you <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../apache/kyuubi/ha/HighAvailabilityConf.scala    | 40 +++++++++----------
 .../apache/kyuubi/ha/client/ServiceDiscovery.scala | 20 +++++-----
 .../kyuubi/client/KyuubiSyncThriftClient.scala     |  7 +++-
 .../scala/org/apache/kyuubi/engine/EngineRef.scala | 34 +++++++---------
 .../apache/kyuubi/events/KyuubiSessionEvent.scala  |  8 ++--
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  | 21 +++++++---
 .../kyuubi/session/KyuubiSessionManager.scala      |  7 ++--
 .../org/apache/kyuubi/engine/EngineRefSuite.scala  | 46 +++++++++++-----------
 .../kyuubi/events/EventLoggingServiceSuite.scala   | 35 ++++++++++++++++
 9 files changed, 133 insertions(+), 85 deletions(-)

diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
index fdee134..dfc169f 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
@@ -85,27 +85,27 @@ object HighAvailabilityConf {
 
   val HA_ZK_CONN_RETRY_POLICY: ConfigEntry[String] =
     buildConf("ha.zookeeper.connection.retry.policy")
-    .doc("The retry policy for connecting to the zookeeper ensemble, all 
candidates are:" +
-      s" ${RetryPolicies.values.mkString("<ul><li>", "</li><li> ", 
"</li></ul>")}")
-    .version("1.0.0")
-    .stringConf
-    .checkValues(RetryPolicies.values.map(_.toString))
-    .createWithDefault(RetryPolicies.EXPONENTIAL_BACKOFF.toString)
+      .doc("The retry policy for connecting to the zookeeper ensemble, all 
candidates are:" +
+        s" ${RetryPolicies.values.mkString("<ul><li>", "</li><li> ", 
"</li></ul>")}")
+      .version("1.0.0")
+      .stringConf
+      .checkValues(RetryPolicies.values.map(_.toString))
+      .createWithDefault(RetryPolicies.EXPONENTIAL_BACKOFF.toString)
 
   val HA_ZK_NODE_TIMEOUT: ConfigEntry[Long] =
     buildConf("ha.zookeeper.node.creation.timeout")
-    .doc("Timeout for creating zookeeper node")
-    .version("1.2.0")
-    .timeConf
-    .checkValue(_ > 0, "Must be positive")
-    .createWithDefault(Duration.ofSeconds(120).toMillis)
-
-  val HA_ZK_ENGINE_SESSION_ID: OptionalConfigEntry[String] =
-    buildConf("ha.engine.session.id")
-    .doc("The sessionId will be attached to zookeeper node when engine 
started, " +
-      "and the kyuubi server will check it cyclically.")
-    .internal
-    .version("1.4.0")
-    .stringConf
-    .createOptional
+      .doc("Timeout for creating zookeeper node")
+      .version("1.2.0")
+      .timeConf
+      .checkValue(_ > 0, "Must be positive")
+      .createWithDefault(Duration.ofSeconds(120).toMillis)
+
+  val HA_ZK_ENGINE_REF_ID: OptionalConfigEntry[String] =
+    buildConf("ha.engine.ref.id")
+      .doc("The engine reference id will be attached to zookeeper node when 
engine started, " +
+        "and the kyuubi server will check it cyclically.")
+      .internal
+      .version("1.4.0")
+      .stringConf
+      .createOptional
 }
diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
index df106bc..2b640b8 100644
--- 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++ 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
@@ -170,12 +170,12 @@ object ServiceDiscovery extends Logging {
     }
   }
 
-  def getEngineBySessionId(
-     zkClient: CuratorFramework,
-     namespace: String,
-     sessionId: String): Option[(String, Int)] = {
+  def getEngineByRefId(
+      zkClient: CuratorFramework,
+      namespace: String,
+      engineRefId: String): Option[(String, Int)] = {
     getServiceNodesInfo(zkClient, namespace, silent = true)
-      .find(_.createSessionId.exists(_.equals(sessionId)))
+      .find(_.engineRefId.exists(_.equals(engineRefId)))
       .map(data => (data.host, data.port))
   }
 
@@ -194,9 +194,9 @@ object ServiceDiscovery extends Logging {
         val host = strings.head
         val port = strings(1).toInt
         val version = 
p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version="))
-        val sessionId = 
p.split(";").find(_.startsWith("session=")).map(_.stripPrefix("session="))
+        val engineRefId = 
p.split(";").find(_.startsWith("refId=")).map(_.stripPrefix("refId="))
         info(s"Get service instance:$instance and version:$version under 
$namespace")
-        ServiceNodeInfo(namespace, p, host, port, version, sessionId)
+        ServiceNodeInfo(namespace, p, host, port, version, engineRefId)
       }
     } catch {
       case _: Exception if silent => Nil
@@ -226,8 +226,8 @@ object ServiceDiscovery extends Logging {
         throw new KyuubiException(s"Failed to create namespace '$ns'", e)
     }
 
-    val session = conf.get(HA_ZK_ENGINE_SESSION_ID)
-      .map(sid => s"session=$sid;").getOrElse("")
+    val session = conf.get(HA_ZK_ENGINE_REF_ID)
+      .map(refId => s"refId=$refId;").getOrElse("")
     val pathPrefix = ZKPaths.makePath(
       namespace,
       
s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};${session}sequence=")
@@ -265,6 +265,6 @@ case class ServiceNodeInfo(
     host: String,
     port: Int,
     version: Option[String],
-    createSessionId: Option[String]) {
+    engineRefId: Option[String]) {
   def instance: String = s"$host:$port"
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 75ecc96..6a386dd 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -27,6 +27,7 @@ import org.apache.thrift.protocol.TProtocol
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.operation.FetchOrientation
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
+import org.apache.kyuubi.session.SessionHandle
 import org.apache.kyuubi.util.ThriftUtils
 
 class KyuubiSyncThriftClient(protocol: TProtocol) extends 
TCLIService.Client(protocol) {
@@ -48,11 +49,14 @@ class KyuubiSyncThriftClient(protocol: TProtocol) extends 
TCLIService.Client(pro
     } finally lock.unlock()
   }
 
+  /**
+   * Return the engine SessionHandle for kyuubi session so that we can get the 
same session id
+   */
   def openSession(
       protocol: TProtocolVersion,
       user: String,
       password: String,
-      configs: Map[String, String]): Unit = {
+      configs: Map[String, String]): SessionHandle = {
     val req = new TOpenSessionReq(protocol)
     req.setUsername(user)
     req.setPassword(password)
@@ -60,6 +64,7 @@ class KyuubiSyncThriftClient(protocol: TProtocol) extends 
TCLIService.Client(pro
     val resp = withLockAcquired(OpenSession(req))
     ThriftUtils.verifyTStatus(resp.getStatus)
     _remoteSessionHandle = resp.getSessionHandle
+    SessionHandle(_remoteSessionHandle, protocol)
   }
 
   def closeSession(): Unit = {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 219dcf8..61ef636 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -17,6 +17,7 @@
 
 package org.apache.kyuubi.engine
 
+import java.util.UUID
 import java.util.concurrent.TimeUnit
 
 import scala.util.Random
@@ -32,24 +33,25 @@ import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, SERVER, ShareLevel}
 import org.apache.kyuubi.engine.spark.SparkProcessBuilder
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_SESSION_ID
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
 import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
-import org.apache.kyuubi.ha.client.ServiceDiscovery.getEngineBySessionId
+import org.apache.kyuubi.ha.client.ServiceDiscovery.getEngineByRefId
 import org.apache.kyuubi.ha.client.ServiceDiscovery.getServerHost
 import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, 
ENGINE_TIMEOUT, ENGINE_TOTAL}
 import org.apache.kyuubi.metrics.MetricsSystem
-import org.apache.kyuubi.session.SessionHandle
 
 /**
  * The description and functionality of an engine at server side
  *
  * @param conf Engine configuration
  * @param user Caller of the engine
- * @param sessionId Id of the corresponding session in which the engine is 
created
+ * @param engineRefId Id of the corresponding session in which the engine is 
created
  */
-private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, 
sessionId: String)
+private[kyuubi] class EngineRef(
+    conf: KyuubiConf,
+    user: String,
+    engineRefId: String = UUID.randomUUID().toString)
   extends Logging {
-
   // The corresponding ServerSpace where the engine belongs to
   private val serverSpace: String = conf.get(HA_ZK_NAMESPACE)
 
@@ -93,10 +95,10 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, 
user: String, sessionI
    */
   @VisibleForTesting
   private[kyuubi] val defaultEngineName: String = shareLevel match {
-    case CONNECTION => s"kyuubi_${shareLevel}_${appUser}_$sessionId"
+    case CONNECTION => s"kyuubi_${shareLevel}_${appUser}_$engineRefId"
     case _ => subdomain match {
-      case Some(domain) => 
s"kyuubi_${shareLevel}_${appUser}_${domain}_$sessionId"
-      case _ => s"kyuubi_${shareLevel}_${appUser}_$sessionId"
+      case Some(domain) => 
s"kyuubi_${shareLevel}_${appUser}_${domain}_$engineRefId"
+      case _ => s"kyuubi_${shareLevel}_${appUser}_$engineRefId"
     }
   }
 
@@ -104,14 +106,14 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, 
user: String, sessionI
    * The EngineSpace used to expose itself to the KyuubiServers in 
`serverSpace`
    *
    * For `CONNECTION` share level:
-   *   /`serverSpace_CONNECTION`/`user`/`sessionId`
+   *   /`serverSpace_CONNECTION`/`user`/`engineRefId`
    * For `USER` share level:
    *   /`serverSpace_USER`/`user`[/`subdomain`]
    *
    */
   @VisibleForTesting
   private[kyuubi] lazy val engineSpace: String = shareLevel match {
-    case CONNECTION => ZKPaths.makePath(s"${serverSpace}_$shareLevel", 
appUser, sessionId)
+    case CONNECTION => ZKPaths.makePath(s"${serverSpace}_$shareLevel", 
appUser, engineRefId)
     case _ => subdomain match {
       case Some(domain) => ZKPaths.makePath(s"${serverSpace}_$shareLevel", 
appUser, domain)
       case None => ZKPaths.makePath(s"${serverSpace}_$shareLevel", appUser)
@@ -159,7 +161,7 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, 
user: String, sessionI
     conf.set(SparkProcessBuilder.TAG_KEY,
       conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + 
"KYUUBI")
     conf.set(HA_ZK_NAMESPACE, engineSpace)
-    conf.set(HA_ZK_ENGINE_SESSION_ID, sessionId)
+    conf.set(HA_ZK_ENGINE_REF_ID, engineRefId)
     val builder = new SparkProcessBuilder(appUser, conf)
     MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
     try {
@@ -186,7 +188,7 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, 
user: String, sessionI
             s"Timeout($timeout ms) to launched Spark with $builder",
             builder.getError)
         }
-        engineRef = getEngineBySessionId(zkClient, engineSpace, sessionId)
+        engineRef = getEngineByRefId(zkClient, engineSpace, engineRefId)
       }
       engineRef.get
     } finally {
@@ -206,9 +208,3 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, 
user: String, sessionI
       }
   }
 }
-
-private[kyuubi] object EngineRef {
-  def apply(conf: KyuubiConf, user: String, handle: SessionHandle): EngineRef 
= {
-    new EngineRef(conf, user, handle.identifier.toString)
-  }
-}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
index 668c80a..8e1115e 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
@@ -32,18 +32,20 @@ import org.apache.kyuubi.session.KyuubiSessionImpl
  * @param clientVersion client version
  * @param conf session config
  * @param startTime session create time
+ * @param openedTime session opened time
  * @param endTime session end time
  * @param totalOperations how many queries and meta calls
  */
 case class KyuubiSessionEvent(
-    sessionId: String,
     sessionName: String,
     user: String,
     clientIP: String,
     serverIP: String,
-    clientVersion: Int,
     conf: Map[String, String],
     startTime: Long,
+    var sessionId: String = "",
+    var clientVersion: Int = -1,
+    var openedTime: Long = -1L,
     var endTime: Long = -1L,
     var totalOperations: Int = 0) extends KyuubiServerEvent {
   override def partitions: Seq[(String, String)] =
@@ -56,12 +58,10 @@ object KyuubiSessionEvent {
     val serverIP = KyuubiServer.kyuubiServer.connectionUrl
     val sessionName: String = 
session.normalizedConf.getOrElse(KyuubiConf.SESSION_NAME.key, "")
     KyuubiSessionEvent(
-      session.handle.identifier.toString,
       sessionName,
       session.user,
       session.ipAddress,
       serverIP,
-      session.handle.protocol.getValue,
       session.conf,
       session.createTime)
   }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 973e4c2..9f0067c 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -53,23 +53,27 @@ class KyuubiSessionImpl(
     case (key, value) => sessionConf.set(key, value)
   }
 
-  private val engine: EngineRef = EngineRef(sessionConf, user, handle)
+  private val engine: EngineRef = new EngineRef(sessionConf, user)
   private val sessionEvent = KyuubiSessionEvent(this)
   EventLoggingService.onEvent(sessionEvent)
 
   private var transport: TTransport = _
   private var client: KyuubiSyncThriftClient = _
 
+  private var _handle: SessionHandle = _
+  override def handle: SessionHandle = _handle
+
   override def open(): Unit = {
     MetricsSystem.tracing { ms =>
       ms.incCount(CONN_TOTAL)
       ms.incCount(MetricRegistry.name(CONN_OPEN, user))
     }
-    super.open()
     withZkClient(sessionConf) { zkClient =>
       val (host, port) = engine.getOrCreate(zkClient)
       openSession(host, port)
     }
+    // we should call super.open after kyuubi session is already opened
+    super.open()
   }
 
   private def openSession(host: String, port: Int): Unit = {
@@ -82,8 +86,13 @@ class KyuubiSessionImpl(
       logSessionInfo(s"Connected to engine [$host:$port]")
     }
     client = new KyuubiSyncThriftClient(new TBinaryProtocol(transport))
-    client.openSession(protocol, user, passwd, normalizedConf)
+    // use engine SessionHandle directly
+    _handle = client.openSession(protocol, user, passwd, normalizedConf)
     sessionManager.operationManager.setConnection(handle, client)
+    sessionEvent.openedTime = System.currentTimeMillis()
+    sessionEvent.sessionId = handle.identifier.toString
+    sessionEvent.clientVersion = handle.protocol.getValue
+    EventLoggingService.onEvent(sessionEvent)
   }
 
   override protected def runOperation(operation: Operation): OperationHandle = 
{
@@ -93,8 +102,10 @@ class KyuubiSessionImpl(
 
   override def close(): Unit = {
     super.close()
-    sessionManager.operationManager.removeConnection(handle)
-    
sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString)
+    if (handle != null) {
+      sessionManager.operationManager.removeConnection(handle)
+      
sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString)
+    }
     try {
       if (client != null) client.closeSession()
     } catch {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 760c57e..8bbb23b 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -56,9 +56,9 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
       conf,
       this,
       this.getConf.getUserDefaults(user))
-    val handle = sessionImpl.handle
     try {
       sessionImpl.open()
+      val handle = sessionImpl.handle
       setSession(handle, sessionImpl)
       info(s"$username's session with $handle is opened, current opening 
sessions" +
       s" $getOpenSessionCount")
@@ -72,10 +72,11 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
         try {
           sessionImpl.close()
         } catch {
-          case t: Throwable => warn(s"Error closing session $handle for 
$username", t)
+          case t: Throwable =>
+            warn(s"Error closing session for $username client ip: $ipAddress", 
t)
         }
         throw KyuubiSQLException(
-          s"Error opening session $handle for $username due to 
${e.getMessage}", e)
+          s"Error opening session for $username client ip $ipAddress, due to 
${e.getMessage}", e)
     }
   }
 
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
index 71cd73d..54bf41d 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
@@ -17,8 +17,9 @@
 
 package org.apache.kyuubi.engine
 
+import java.util.UUID
+
 import org.apache.curator.utils.ZKPaths
-import org.apache.hive.service.rpc.thrift.TProtocolVersion
 import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 
 import org.apache.kyuubi.{KyuubiFunSuite, Utils}
@@ -26,7 +27,6 @@ import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.ha.HighAvailabilityConf
 import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
-import org.apache.kyuubi.session.SessionHandle
 import org.apache.kyuubi.util.NamedThreadFactory
 import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
 
@@ -58,85 +58,85 @@ class EngineRefSuite extends KyuubiFunSuite {
   }
 
   test("CONNECTION shared level engine name") {
-    val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
+    val id = UUID.randomUUID().toString
     Seq(None, Some("suffix")).foreach { domain =>
       conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString)
       domain.foreach(conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, _))
-      val engine = EngineRef(conf, user, id)
+      val engine = new EngineRef(conf, user, id)
       assert(engine.engineSpace ===
-        ZKPaths.makePath(s"kyuubi_$CONNECTION", user, id.identifier.toString))
-      assert(engine.defaultEngineName === 
s"kyuubi_${CONNECTION}_${user}_${id.identifier}")
+        ZKPaths.makePath(s"kyuubi_$CONNECTION", user, id))
+      assert(engine.defaultEngineName === s"kyuubi_${CONNECTION}_${user}_$id")
     }
   }
 
   test("USER shared level engine name") {
-    val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
+    val id = UUID.randomUUID().toString
     conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
-    val appName = EngineRef(conf, user, id)
+    val appName = new EngineRef(conf, user, id)
     assert(appName.engineSpace === ZKPaths.makePath(s"kyuubi_$USER", user))
-    assert(appName.defaultEngineName === 
s"kyuubi_${USER}_${user}_${id.identifier}")
+    assert(appName.defaultEngineName === s"kyuubi_${USER}_${user}_$id")
 
     Seq(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN,
       KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN).foreach { k =>
       conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN)
       conf.set(k.key, "abc")
-      val appName2 = EngineRef(conf, user, id)
+      val appName2 = new EngineRef(conf, user, id)
       assert(appName2.engineSpace ===
         ZKPaths.makePath(s"kyuubi_$USER", user, "abc"))
-      assert(appName2.defaultEngineName === 
s"kyuubi_${USER}_${user}_abc_${id.identifier}")
+      assert(appName2.defaultEngineName === s"kyuubi_${USER}_${user}_abc_$id")
     }
   }
 
   test("SERVER shared level engine name") {
-    val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
+    val id = UUID.randomUUID().toString
     conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, SERVER.toString)
-    val appName = EngineRef(conf, user, id)
+    val appName = new EngineRef(conf, user, id)
     assert(appName.engineSpace ===
       ZKPaths.makePath(s"kyuubi_$SERVER", user))
-    assert(appName.defaultEngineName ===  
s"kyuubi_${SERVER}_${user}_${id.identifier}")
+    assert(appName.defaultEngineName ===  s"kyuubi_${SERVER}_${user}_$id")
 
     conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
-    val appName2 = EngineRef(conf, user, id)
+    val appName2 = new EngineRef(conf, user, id)
     assert(appName2.engineSpace ===
       ZKPaths.makePath(s"kyuubi_$SERVER", user, "abc"))
-    assert(appName2.defaultEngineName ===  
s"kyuubi_${SERVER}_${user}_abc_${id.identifier}")
+    assert(appName2.defaultEngineName ===  s"kyuubi_${SERVER}_${user}_abc_$id")
   }
 
   test("check the engine space of engine pool") {
-    val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
+    val id = UUID.randomUUID().toString
 
     // test subdomain
     conf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
-    val engine1 = EngineRef(conf, user, id)
+    val engine1 = new EngineRef(conf, user, id)
     assert(engine1.subdomain === Some("abc"))
 
     // unset domain
     conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
-    val engine2 = EngineRef(conf, user, id)
+    val engine2 = new EngineRef(conf, user, id)
     assert(engine2.subdomain === None)
 
     // 1 <= engine pool size < threshold
     conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
     conf.set(ENGINE_POOL_SIZE, 3)
-    val engine3 = EngineRef(conf, user, id)
+    val engine3 = new EngineRef(conf, user, id)
     assert(engine3.subdomain.get.startsWith("engine-pool-"))
 
     // engine pool size > threshold
     conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
     conf.set(ENGINE_POOL_SIZE, 100)
-    val engine4 = EngineRef(conf, user, id)
+    val engine4 = new EngineRef(conf, user, id)
     val engineNumber = Integer.parseInt(engine4.subdomain.get.substring(12))
     val threshold = ENGINE_POOL_SIZE_THRESHOLD.defaultVal.get
     assert(engineNumber <= threshold)
   }
 
   test("start and get engine address with lock") {
-    val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
+    val id = UUID.randomUUID().toString
     conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
     conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
     conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test")
     conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
-    val engine = EngineRef(conf, user, id)
+    val engine = new EngineRef(conf, user, id)
 
     var port1 = 0
     var port2 = 0
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
index 4249787..4dc182f 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.events
 
 import java.net.InetAddress
 import java.nio.file.Paths
+import java.util.UUID
 
 import org.apache.kyuubi.{Utils, WithKyuubiServer}
 import org.apache.kyuubi.config.KyuubiConf
@@ -96,13 +97,47 @@ class EventLoggingServiceSuite extends WithKyuubiServer 
with JDBCTestUtils {
         assert(res.next())
         assert(res.getString("user") == Utils.currentUser)
         assert(res.getString("sessionName") == "test1")
+        assert(res.getString("sessionId") == "")
         assert(res.getLong("startTime") > 0)
         assert(res.getInt("totalOperations") == 0)
         assert(res.next())
+        assert(res.getInt("totalOperations") == 0)
+        assert(res.getString("sessionId") != "")
+        assert(res.getLong("openedTime") > 0)
+        assert(res.next())
         assert(res.getInt("totalOperations") == 1)
         assert(res.getLong("endTime") > 0)
         assert(!res.next())
       }
     }
   }
+
+  test("engine session id should be same with server session id") {
+    val name = UUID.randomUUID().toString
+    withSessionConf()(Map.empty)(Map(KyuubiConf.SESSION_NAME.key -> name)) {
+      withJdbcStatement() { statement =>
+        statement.execute("SELECT 1")
+      }
+    }
+
+    val serverSessionEventPath =
+      Paths.get(logRoot.toString, "kyuubi_session", s"day=$currentDate")
+    val engineSessionEventPath =
+      Paths.get(logRoot.toString, "session", s"day=$currentDate")
+    withSessionConf()(Map.empty)(Map.empty) {
+      withJdbcStatement() { statement =>
+        val res = statement.executeQuery(
+          s"SELECT * FROM `json`.`$serverSessionEventPath` " +
+            s"where sessionName = '$name' and sessionId != '' limit 1")
+        assert(res.next())
+        val serverSessionId = res.getString("sessionId")
+        assert(!res.next())
+
+        val res2 = statement.executeQuery(
+          s"SELECT * FROM `json`.`$engineSessionEventPath` " +
+            s"where sessionId = '$serverSessionId' limit 1")
+        assert(res2.next())
+      }
+    }
+  }
 }

Reply via email to