This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 347c932 [KYUUBI #1032] Keep server session id and engien session id
consistent
347c932 is described below
commit 347c93276812ec46bc1f62e7f0d6668615729939
Author: ulysses-you <[email protected]>
AuthorDate: Tue Sep 7 09:51:08 2021 +0800
[KYUUBI #1032] Keep server session id and engien session id consistent
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
Closes https://github.com/apache/incubator-kyuubi/issues/1032
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1033 from ulysses-you/engine-session-id.
Closes #1032
5aa5f5f8 [ulysses-you] indention
a97ae1b5 [ulysses-you] nit
aef754b4 [ulysses-you] engine ref id
62399c40 [ulysses-you] cleanup
2bf13a63 [ulysses-you] nit
11e925dd [ulysses-you] engine side generate
da3f51be [ulysses-you] nit
6136f66b [ulysses-you] keep server session id and engien session id
consistent
Authored-by: ulysses-you <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../apache/kyuubi/ha/HighAvailabilityConf.scala | 40 +++++++++----------
.../apache/kyuubi/ha/client/ServiceDiscovery.scala | 20 +++++-----
.../kyuubi/client/KyuubiSyncThriftClient.scala | 7 +++-
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 34 +++++++---------
.../apache/kyuubi/events/KyuubiSessionEvent.scala | 8 ++--
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 21 +++++++---
.../kyuubi/session/KyuubiSessionManager.scala | 7 ++--
.../org/apache/kyuubi/engine/EngineRefSuite.scala | 46 +++++++++++-----------
.../kyuubi/events/EventLoggingServiceSuite.scala | 35 ++++++++++++++++
9 files changed, 133 insertions(+), 85 deletions(-)
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
index fdee134..dfc169f 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
@@ -85,27 +85,27 @@ object HighAvailabilityConf {
val HA_ZK_CONN_RETRY_POLICY: ConfigEntry[String] =
buildConf("ha.zookeeper.connection.retry.policy")
- .doc("The retry policy for connecting to the zookeeper ensemble, all
candidates are:" +
- s" ${RetryPolicies.values.mkString("<ul><li>", "</li><li> ",
"</li></ul>")}")
- .version("1.0.0")
- .stringConf
- .checkValues(RetryPolicies.values.map(_.toString))
- .createWithDefault(RetryPolicies.EXPONENTIAL_BACKOFF.toString)
+ .doc("The retry policy for connecting to the zookeeper ensemble, all
candidates are:" +
+ s" ${RetryPolicies.values.mkString("<ul><li>", "</li><li> ",
"</li></ul>")}")
+ .version("1.0.0")
+ .stringConf
+ .checkValues(RetryPolicies.values.map(_.toString))
+ .createWithDefault(RetryPolicies.EXPONENTIAL_BACKOFF.toString)
val HA_ZK_NODE_TIMEOUT: ConfigEntry[Long] =
buildConf("ha.zookeeper.node.creation.timeout")
- .doc("Timeout for creating zookeeper node")
- .version("1.2.0")
- .timeConf
- .checkValue(_ > 0, "Must be positive")
- .createWithDefault(Duration.ofSeconds(120).toMillis)
-
- val HA_ZK_ENGINE_SESSION_ID: OptionalConfigEntry[String] =
- buildConf("ha.engine.session.id")
- .doc("The sessionId will be attached to zookeeper node when engine
started, " +
- "and the kyuubi server will check it cyclically.")
- .internal
- .version("1.4.0")
- .stringConf
- .createOptional
+ .doc("Timeout for creating zookeeper node")
+ .version("1.2.0")
+ .timeConf
+ .checkValue(_ > 0, "Must be positive")
+ .createWithDefault(Duration.ofSeconds(120).toMillis)
+
+ val HA_ZK_ENGINE_REF_ID: OptionalConfigEntry[String] =
+ buildConf("ha.engine.ref.id")
+ .doc("The engine reference id will be attached to zookeeper node when
engine started, " +
+ "and the kyuubi server will check it cyclically.")
+ .internal
+ .version("1.4.0")
+ .stringConf
+ .createOptional
}
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
index df106bc..2b640b8 100644
---
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
@@ -170,12 +170,12 @@ object ServiceDiscovery extends Logging {
}
}
- def getEngineBySessionId(
- zkClient: CuratorFramework,
- namespace: String,
- sessionId: String): Option[(String, Int)] = {
+ def getEngineByRefId(
+ zkClient: CuratorFramework,
+ namespace: String,
+ engineRefId: String): Option[(String, Int)] = {
getServiceNodesInfo(zkClient, namespace, silent = true)
- .find(_.createSessionId.exists(_.equals(sessionId)))
+ .find(_.engineRefId.exists(_.equals(engineRefId)))
.map(data => (data.host, data.port))
}
@@ -194,9 +194,9 @@ object ServiceDiscovery extends Logging {
val host = strings.head
val port = strings(1).toInt
val version =
p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version="))
- val sessionId =
p.split(";").find(_.startsWith("session=")).map(_.stripPrefix("session="))
+ val engineRefId =
p.split(";").find(_.startsWith("refId=")).map(_.stripPrefix("refId="))
info(s"Get service instance:$instance and version:$version under
$namespace")
- ServiceNodeInfo(namespace, p, host, port, version, sessionId)
+ ServiceNodeInfo(namespace, p, host, port, version, engineRefId)
}
} catch {
case _: Exception if silent => Nil
@@ -226,8 +226,8 @@ object ServiceDiscovery extends Logging {
throw new KyuubiException(s"Failed to create namespace '$ns'", e)
}
- val session = conf.get(HA_ZK_ENGINE_SESSION_ID)
- .map(sid => s"session=$sid;").getOrElse("")
+ val session = conf.get(HA_ZK_ENGINE_REF_ID)
+ .map(refId => s"refId=$refId;").getOrElse("")
val pathPrefix = ZKPaths.makePath(
namespace,
s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};${session}sequence=")
@@ -265,6 +265,6 @@ case class ServiceNodeInfo(
host: String,
port: Int,
version: Option[String],
- createSessionId: Option[String]) {
+ engineRefId: Option[String]) {
def instance: String = s"$host:$port"
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 75ecc96..6a386dd 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -27,6 +27,7 @@ import org.apache.thrift.protocol.TProtocol
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.FetchOrientation
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
+import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.ThriftUtils
class KyuubiSyncThriftClient(protocol: TProtocol) extends
TCLIService.Client(protocol) {
@@ -48,11 +49,14 @@ class KyuubiSyncThriftClient(protocol: TProtocol) extends
TCLIService.Client(pro
} finally lock.unlock()
}
+ /**
+ * Return the engine SessionHandle for kyuubi session so that we can get the
same session id
+ */
def openSession(
protocol: TProtocolVersion,
user: String,
password: String,
- configs: Map[String, String]): Unit = {
+ configs: Map[String, String]): SessionHandle = {
val req = new TOpenSessionReq(protocol)
req.setUsername(user)
req.setPassword(password)
@@ -60,6 +64,7 @@ class KyuubiSyncThriftClient(protocol: TProtocol) extends
TCLIService.Client(pro
val resp = withLockAcquired(OpenSession(req))
ThriftUtils.verifyTStatus(resp.getStatus)
_remoteSessionHandle = resp.getSessionHandle
+ SessionHandle(_remoteSessionHandle, protocol)
}
def closeSession(): Unit = {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 219dcf8..61ef636 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.engine
+import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.util.Random
@@ -32,24 +33,25 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, SERVER, ShareLevel}
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_SESSION_ID
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
-import org.apache.kyuubi.ha.client.ServiceDiscovery.getEngineBySessionId
+import org.apache.kyuubi.ha.client.ServiceDiscovery.getEngineByRefId
import org.apache.kyuubi.ha.client.ServiceDiscovery.getServerHost
import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL,
ENGINE_TIMEOUT, ENGINE_TOTAL}
import org.apache.kyuubi.metrics.MetricsSystem
-import org.apache.kyuubi.session.SessionHandle
/**
* The description and functionality of an engine at server side
*
* @param conf Engine configuration
* @param user Caller of the engine
- * @param sessionId Id of the corresponding session in which the engine is
created
+ * @param engineRefId Id of the corresponding session in which the engine is
created
*/
-private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String,
sessionId: String)
+private[kyuubi] class EngineRef(
+ conf: KyuubiConf,
+ user: String,
+ engineRefId: String = UUID.randomUUID().toString)
extends Logging {
-
// The corresponding ServerSpace where the engine belongs to
private val serverSpace: String = conf.get(HA_ZK_NAMESPACE)
@@ -93,10 +95,10 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf,
user: String, sessionI
*/
@VisibleForTesting
private[kyuubi] val defaultEngineName: String = shareLevel match {
- case CONNECTION => s"kyuubi_${shareLevel}_${appUser}_$sessionId"
+ case CONNECTION => s"kyuubi_${shareLevel}_${appUser}_$engineRefId"
case _ => subdomain match {
- case Some(domain) =>
s"kyuubi_${shareLevel}_${appUser}_${domain}_$sessionId"
- case _ => s"kyuubi_${shareLevel}_${appUser}_$sessionId"
+ case Some(domain) =>
s"kyuubi_${shareLevel}_${appUser}_${domain}_$engineRefId"
+ case _ => s"kyuubi_${shareLevel}_${appUser}_$engineRefId"
}
}
@@ -104,14 +106,14 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf,
user: String, sessionI
* The EngineSpace used to expose itself to the KyuubiServers in
`serverSpace`
*
* For `CONNECTION` share level:
- * /`serverSpace_CONNECTION`/`user`/`sessionId`
+ * /`serverSpace_CONNECTION`/`user`/`engineRefId`
* For `USER` share level:
* /`serverSpace_USER`/`user`[/`subdomain`]
*
*/
@VisibleForTesting
private[kyuubi] lazy val engineSpace: String = shareLevel match {
- case CONNECTION => ZKPaths.makePath(s"${serverSpace}_$shareLevel",
appUser, sessionId)
+ case CONNECTION => ZKPaths.makePath(s"${serverSpace}_$shareLevel",
appUser, engineRefId)
case _ => subdomain match {
case Some(domain) => ZKPaths.makePath(s"${serverSpace}_$shareLevel",
appUser, domain)
case None => ZKPaths.makePath(s"${serverSpace}_$shareLevel", appUser)
@@ -159,7 +161,7 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf,
user: String, sessionI
conf.set(SparkProcessBuilder.TAG_KEY,
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") +
"KYUUBI")
conf.set(HA_ZK_NAMESPACE, engineSpace)
- conf.set(HA_ZK_ENGINE_SESSION_ID, sessionId)
+ conf.set(HA_ZK_ENGINE_REF_ID, engineRefId)
val builder = new SparkProcessBuilder(appUser, conf)
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
try {
@@ -186,7 +188,7 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf,
user: String, sessionI
s"Timeout($timeout ms) to launched Spark with $builder",
builder.getError)
}
- engineRef = getEngineBySessionId(zkClient, engineSpace, sessionId)
+ engineRef = getEngineByRefId(zkClient, engineSpace, engineRefId)
}
engineRef.get
} finally {
@@ -206,9 +208,3 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf,
user: String, sessionI
}
}
}
-
-private[kyuubi] object EngineRef {
- def apply(conf: KyuubiConf, user: String, handle: SessionHandle): EngineRef
= {
- new EngineRef(conf, user, handle.identifier.toString)
- }
-}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
index 668c80a..8e1115e 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
@@ -32,18 +32,20 @@ import org.apache.kyuubi.session.KyuubiSessionImpl
* @param clientVersion client version
* @param conf session config
* @param startTime session create time
+ * @param openedTime session opened time
* @param endTime session end time
* @param totalOperations how many queries and meta calls
*/
case class KyuubiSessionEvent(
- sessionId: String,
sessionName: String,
user: String,
clientIP: String,
serverIP: String,
- clientVersion: Int,
conf: Map[String, String],
startTime: Long,
+ var sessionId: String = "",
+ var clientVersion: Int = -1,
+ var openedTime: Long = -1L,
var endTime: Long = -1L,
var totalOperations: Int = 0) extends KyuubiServerEvent {
override def partitions: Seq[(String, String)] =
@@ -56,12 +58,10 @@ object KyuubiSessionEvent {
val serverIP = KyuubiServer.kyuubiServer.connectionUrl
val sessionName: String =
session.normalizedConf.getOrElse(KyuubiConf.SESSION_NAME.key, "")
KyuubiSessionEvent(
- session.handle.identifier.toString,
sessionName,
session.user,
session.ipAddress,
serverIP,
- session.handle.protocol.getValue,
session.conf,
session.createTime)
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 973e4c2..9f0067c 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -53,23 +53,27 @@ class KyuubiSessionImpl(
case (key, value) => sessionConf.set(key, value)
}
- private val engine: EngineRef = EngineRef(sessionConf, user, handle)
+ private val engine: EngineRef = new EngineRef(sessionConf, user)
private val sessionEvent = KyuubiSessionEvent(this)
EventLoggingService.onEvent(sessionEvent)
private var transport: TTransport = _
private var client: KyuubiSyncThriftClient = _
+ private var _handle: SessionHandle = _
+ override def handle: SessionHandle = _handle
+
override def open(): Unit = {
MetricsSystem.tracing { ms =>
ms.incCount(CONN_TOTAL)
ms.incCount(MetricRegistry.name(CONN_OPEN, user))
}
- super.open()
withZkClient(sessionConf) { zkClient =>
val (host, port) = engine.getOrCreate(zkClient)
openSession(host, port)
}
+ // we should call super.open after kyuubi session is already opened
+ super.open()
}
private def openSession(host: String, port: Int): Unit = {
@@ -82,8 +86,13 @@ class KyuubiSessionImpl(
logSessionInfo(s"Connected to engine [$host:$port]")
}
client = new KyuubiSyncThriftClient(new TBinaryProtocol(transport))
- client.openSession(protocol, user, passwd, normalizedConf)
+ // use engine SessionHandle directly
+ _handle = client.openSession(protocol, user, passwd, normalizedConf)
sessionManager.operationManager.setConnection(handle, client)
+ sessionEvent.openedTime = System.currentTimeMillis()
+ sessionEvent.sessionId = handle.identifier.toString
+ sessionEvent.clientVersion = handle.protocol.getValue
+ EventLoggingService.onEvent(sessionEvent)
}
override protected def runOperation(operation: Operation): OperationHandle =
{
@@ -93,8 +102,10 @@ class KyuubiSessionImpl(
override def close(): Unit = {
super.close()
- sessionManager.operationManager.removeConnection(handle)
-
sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString)
+ if (handle != null) {
+ sessionManager.operationManager.removeConnection(handle)
+
sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString)
+ }
try {
if (client != null) client.closeSession()
} catch {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 760c57e..8bbb23b 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -56,9 +56,9 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
conf,
this,
this.getConf.getUserDefaults(user))
- val handle = sessionImpl.handle
try {
sessionImpl.open()
+ val handle = sessionImpl.handle
setSession(handle, sessionImpl)
info(s"$username's session with $handle is opened, current opening
sessions" +
s" $getOpenSessionCount")
@@ -72,10 +72,11 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
try {
sessionImpl.close()
} catch {
- case t: Throwable => warn(s"Error closing session $handle for
$username", t)
+ case t: Throwable =>
+ warn(s"Error closing session for $username client ip: $ipAddress",
t)
}
throw KyuubiSQLException(
- s"Error opening session $handle for $username due to
${e.getMessage}", e)
+ s"Error opening session for $username client ip $ipAddress, due to
${e.getMessage}", e)
}
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
index 71cd73d..54bf41d 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
@@ -17,8 +17,9 @@
package org.apache.kyuubi.engine
+import java.util.UUID
+
import org.apache.curator.utils.ZKPaths
-import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
@@ -26,7 +27,6 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
-import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.NamedThreadFactory
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
@@ -58,85 +58,85 @@ class EngineRefSuite extends KyuubiFunSuite {
}
test("CONNECTION shared level engine name") {
- val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
+ val id = UUID.randomUUID().toString
Seq(None, Some("suffix")).foreach { domain =>
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString)
domain.foreach(conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, _))
- val engine = EngineRef(conf, user, id)
+ val engine = new EngineRef(conf, user, id)
assert(engine.engineSpace ===
- ZKPaths.makePath(s"kyuubi_$CONNECTION", user, id.identifier.toString))
- assert(engine.defaultEngineName ===
s"kyuubi_${CONNECTION}_${user}_${id.identifier}")
+ ZKPaths.makePath(s"kyuubi_$CONNECTION", user, id))
+ assert(engine.defaultEngineName === s"kyuubi_${CONNECTION}_${user}_$id")
}
}
test("USER shared level engine name") {
- val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
+ val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
- val appName = EngineRef(conf, user, id)
+ val appName = new EngineRef(conf, user, id)
assert(appName.engineSpace === ZKPaths.makePath(s"kyuubi_$USER", user))
- assert(appName.defaultEngineName ===
s"kyuubi_${USER}_${user}_${id.identifier}")
+ assert(appName.defaultEngineName === s"kyuubi_${USER}_${user}_$id")
Seq(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN,
KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN).foreach { k =>
conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN)
conf.set(k.key, "abc")
- val appName2 = EngineRef(conf, user, id)
+ val appName2 = new EngineRef(conf, user, id)
assert(appName2.engineSpace ===
ZKPaths.makePath(s"kyuubi_$USER", user, "abc"))
- assert(appName2.defaultEngineName ===
s"kyuubi_${USER}_${user}_abc_${id.identifier}")
+ assert(appName2.defaultEngineName === s"kyuubi_${USER}_${user}_abc_$id")
}
}
test("SERVER shared level engine name") {
- val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
+ val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, SERVER.toString)
- val appName = EngineRef(conf, user, id)
+ val appName = new EngineRef(conf, user, id)
assert(appName.engineSpace ===
ZKPaths.makePath(s"kyuubi_$SERVER", user))
- assert(appName.defaultEngineName ===
s"kyuubi_${SERVER}_${user}_${id.identifier}")
+ assert(appName.defaultEngineName === s"kyuubi_${SERVER}_${user}_$id")
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
- val appName2 = EngineRef(conf, user, id)
+ val appName2 = new EngineRef(conf, user, id)
assert(appName2.engineSpace ===
ZKPaths.makePath(s"kyuubi_$SERVER", user, "abc"))
- assert(appName2.defaultEngineName ===
s"kyuubi_${SERVER}_${user}_abc_${id.identifier}")
+ assert(appName2.defaultEngineName === s"kyuubi_${SERVER}_${user}_abc_$id")
}
test("check the engine space of engine pool") {
- val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
+ val id = UUID.randomUUID().toString
// test subdomain
conf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
- val engine1 = EngineRef(conf, user, id)
+ val engine1 = new EngineRef(conf, user, id)
assert(engine1.subdomain === Some("abc"))
// unset domain
conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
- val engine2 = EngineRef(conf, user, id)
+ val engine2 = new EngineRef(conf, user, id)
assert(engine2.subdomain === None)
// 1 <= engine pool size < threshold
conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
conf.set(ENGINE_POOL_SIZE, 3)
- val engine3 = EngineRef(conf, user, id)
+ val engine3 = new EngineRef(conf, user, id)
assert(engine3.subdomain.get.startsWith("engine-pool-"))
// engine pool size > threshold
conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
conf.set(ENGINE_POOL_SIZE, 100)
- val engine4 = EngineRef(conf, user, id)
+ val engine4 = new EngineRef(conf, user, id)
val engineNumber = Integer.parseInt(engine4.subdomain.get.substring(12))
val threshold = ENGINE_POOL_SIZE_THRESHOLD.defaultVal.get
assert(engineNumber <= threshold)
}
test("start and get engine address with lock") {
- val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
+ val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test")
conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
- val engine = EngineRef(conf, user, id)
+ val engine = new EngineRef(conf, user, id)
var port1 = 0
var port2 = 0
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
index 4249787..4dc182f 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.events
import java.net.InetAddress
import java.nio.file.Paths
+import java.util.UUID
import org.apache.kyuubi.{Utils, WithKyuubiServer}
import org.apache.kyuubi.config.KyuubiConf
@@ -96,13 +97,47 @@ class EventLoggingServiceSuite extends WithKyuubiServer
with JDBCTestUtils {
assert(res.next())
assert(res.getString("user") == Utils.currentUser)
assert(res.getString("sessionName") == "test1")
+ assert(res.getString("sessionId") == "")
assert(res.getLong("startTime") > 0)
assert(res.getInt("totalOperations") == 0)
assert(res.next())
+ assert(res.getInt("totalOperations") == 0)
+ assert(res.getString("sessionId") != "")
+ assert(res.getLong("openedTime") > 0)
+ assert(res.next())
assert(res.getInt("totalOperations") == 1)
assert(res.getLong("endTime") > 0)
assert(!res.next())
}
}
}
+
+ test("engine session id should be same with server session id") {
+ val name = UUID.randomUUID().toString
+ withSessionConf()(Map.empty)(Map(KyuubiConf.SESSION_NAME.key -> name)) {
+ withJdbcStatement() { statement =>
+ statement.execute("SELECT 1")
+ }
+ }
+
+ val serverSessionEventPath =
+ Paths.get(logRoot.toString, "kyuubi_session", s"day=$currentDate")
+ val engineSessionEventPath =
+ Paths.get(logRoot.toString, "session", s"day=$currentDate")
+ withSessionConf()(Map.empty)(Map.empty) {
+ withJdbcStatement() { statement =>
+ val res = statement.executeQuery(
+ s"SELECT * FROM `json`.`$serverSessionEventPath` " +
+ s"where sessionName = '$name' and sessionId != '' limit 1")
+ assert(res.next())
+ val serverSessionId = res.getString("sessionId")
+ assert(!res.next())
+
+ val res2 = statement.executeQuery(
+ s"SELECT * FROM `json`.`$engineSessionEventPath` " +
+ s"where sessionId = '$serverSessionId' limit 1")
+ assert(res2.next())
+ }
+ }
+ }
}