This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new c1cedec70 [KYUUBI #4847][FOLLOWUP] Exclude the alive probe sessions in
terminating checker
c1cedec70 is described below
commit c1cedec708ca63cff4d66221d295ff2e1906d5c3
Author: Wang, Fei <[email protected]>
AuthorDate: Thu May 9 08:48:21 2024 -0700
[KYUUBI #4847][FOLLOWUP] Exclude the alive probe sessions in terminating
checker
# :mag: Description
## Issue References ๐
This pull request fixes #
follow up of #4847
Address comments:
https://github.com/apache/kyuubi/issues/4847#issuecomment-2072945805
## Describe Your Solution ๐ง
In this pr, when checking the engine terminating, it will ignore the alive
probe sessions.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6355 from turboFei/engine_idle.
Closes #4847
a8e26e71d [Wang, Fei] comments
418d0b41c [Wang, Fei] val
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit 88b24601d07991dedb46a416cd507537effb7b7a)
Signed-off-by: Wang, Fei <[email protected]>
---
.../org/apache/kyuubi/engine/spark/SparkSQLEngine.scala | 8 ++++----
.../org/apache/kyuubi/config/KyuubiReservedKeys.scala | 1 +
.../scala/org/apache/kyuubi/session/AbstractSession.scala | 4 ++++
.../main/scala/org/apache/kyuubi/session/Session.scala | 2 ++
.../scala/org/apache/kyuubi/session/SessionManager.scala | 15 ++++++++++-----
.../org/apache/kyuubi/service/TFrontendServiceSuite.scala | 6 +++---
.../org/apache/kyuubi/ha/client/ServiceDiscovery.scala | 5 +++--
.../org/apache/kyuubi/client/KyuubiSyncThriftClient.scala | 5 +++--
.../apache/kyuubi/server/api/v1/SessionsResource.scala | 2 +-
.../scala/org/apache/kyuubi/session/KyuubiSession.scala | 2 ++
.../org/apache/kyuubi/session/KyuubiSessionManager.scala | 2 +-
.../kyuubi/server/api/v1/BatchesResourceSuite.scala | 4 ++--
12 files changed, 36 insertions(+), 20 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 756c04969..3e3563f0b 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -109,7 +109,7 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
if (!shutdown.get) {
info(s"Spark engine is de-registering from engine discovery space.")
frontendServices.flatMap(_.discoveryService).foreach(_.stop())
- while (backendService.sessionManager.getOpenSessionCount > 0) {
+ while (backendService.sessionManager.getActiveUserSessionCount > 0) {
Thread.sleep(TimeUnit.SECONDS.toMillis(10))
}
info(s"Spark engine has no open session now, terminating.")
@@ -126,12 +126,12 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
Utils.tryLogNonFatalError {
ThreadUtils.runInNewThread("spark-engine-failfast-checker") {
if (!shutdown.get) {
- while (backendService.sessionManager.getOpenSessionCount <= 0 &&
+ while (backendService.sessionManager.getActiveUserSessionCount <= 0
&&
System.currentTimeMillis() - startedTime < maxTimeout) {
info(s"Waiting for the initial connection")
Thread.sleep(Duration(10, TimeUnit.SECONDS).toMillis)
}
- if (backendService.sessionManager.getOpenSessionCount <= 0) {
+ if (backendService.sessionManager.getActiveUserSessionCount <= 0) {
error(s"Spark engine has been terminated because no incoming
connection" +
s" for more than $maxTimeout ms, de-registering from engine
discovery space.")
assert(currentEngine.isDefined)
@@ -161,7 +161,7 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
frontendServices.flatMap(_.discoveryService).foreach(_.stop())
}
- if (backendService.sessionManager.getOpenSessionCount <= 0) {
+ if (backendService.sessionManager.getActiveUserSessionCount <= 0) {
info(s"Spark engine has been running for more than $maxLifetime
ms" +
s" and no open session now, terminating.")
stop()
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
index 592425a4b..9f22dd1f8 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
@@ -37,6 +37,7 @@ object KyuubiReservedKeys {
final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time"
final val KYUUBI_ENGINE_CREDENTIALS_KEY = "kyuubi.engine.credentials"
final val KYUUBI_SESSION_HANDLE_KEY = "kyuubi.session.handle"
+ final val KYUUBI_SESSION_ALIVE_PROBE = "kyuubi.session.alive.probe"
final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID =
"kyuubi.session.engine.launch.handle.guid"
final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_SECRET =
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index a9e33f5a0..dd13ff314 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -23,6 +23,7 @@ import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiReservedKeys
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_CLIENT_IP_KEY
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@@ -260,4 +261,7 @@ abstract class AbstractSession(
override def open(): Unit = {
OperationLog.createOperationLogRootDirectory(this)
}
+
+ val isForAliveProbe: Boolean =
+
conf.get(KyuubiReservedKeys.KYUUBI_SESSION_ALIVE_PROBE).exists(_.equalsIgnoreCase("true"))
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
index 2cdac9f3a..e1a1ec0a4 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
@@ -94,4 +94,6 @@ trait Session {
fetchLog: Boolean): TFetchResultsResp
def closeExpiredOperations(): Unit
+
+ def isForAliveProbe: Boolean
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index 3e395659c..4137b3dbd 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -93,7 +93,7 @@ abstract class SessionManager(name: String) extends
CompositeService(name) {
protected def logSessionCountInfo(session: Session, action: String): Unit = {
info(s"${session.user}'s ${session.getClass.getSimpleName} with" +
s" ${session.handle}${session.name.map("/" + _).getOrElse("")} is
$action," +
- s" current opening sessions $getOpenSessionCount")
+ s" current opening sessions $getActiveUserSessionCount")
}
def openSession(
@@ -123,11 +123,13 @@ abstract class SessionManager(name: String) extends
CompositeService(name) {
}
def closeSession(sessionHandle: SessionHandle): Unit = {
- _latestLogoutTime = System.currentTimeMillis()
val session = handleToSession.remove(sessionHandle)
if (session == null) {
throw KyuubiSQLException(s"Invalid $sessionHandle")
}
+ if (!session.isForAliveProbe) {
+ _latestLogoutTime = System.currentTimeMillis()
+ }
logSessionCountInfo(session, "closed")
try {
session.close()
@@ -160,7 +162,10 @@ abstract class SessionManager(name: String) extends
CompositeService(name) {
handleToSession.put(sessionHandle, session)
}
- def getOpenSessionCount: Int = handleToSession.size()
+ /**
+ * Get the count of active user sessions, which excludes alive probe
sessions.
+ */
+ def getActiveUserSessionCount: Int =
handleToSession.values().asScala.count(!_.isForAliveProbe)
def allSessions(): Iterable[Session] = handleToSession.values().asScala
@@ -304,7 +309,7 @@ abstract class SessionManager(name: String) extends
CompositeService(name) {
val checkTask = new Runnable {
override def run(): Unit = {
- info(s"Checking sessions timeout, current count: $getOpenSessionCount")
+ info(s"Checking sessions timeout, current count:
$getActiveUserSessionCount")
val current = System.currentTimeMillis
if (!shutdown) {
for (session <- handleToSession.values().asScala) {
@@ -342,7 +347,7 @@ abstract class SessionManager(name: String) extends
CompositeService(name) {
val checkTask = new Runnable {
override def run(): Unit = {
if (!shutdown && System.currentTimeMillis() - latestLogoutTime >
idleTimeout &&
- getOpenSessionCount <= 0) {
+ getActiveUserSessionCount <= 0) {
info(s"Idled for more than $idleTimeout ms, terminating")
stop()
}
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
index bd485a262..ad8045225 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
@@ -549,13 +549,13 @@ class TFrontendServiceSuite extends KyuubiFunSuite {
.getSession(SessionHandle(handle))
.asInstanceOf[AbstractSession]
var lastAccessTime = session.lastAccessTime
- assert(sessionManager.getOpenSessionCount === 1)
+ assert(sessionManager.getActiveUserSessionCount === 1)
assert(session.lastIdleTime > 0)
val cancelOpReq = new TCancelOperationReq(resp.getOperationHandle)
val cancelOpResp = client.CancelOperation(cancelOpReq)
assert(cancelOpResp.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
- assert(sessionManager.getOpenSessionCount === 1)
+ assert(sessionManager.getActiveUserSessionCount === 1)
assert(session.lastIdleTime === 0)
lastAccessTime = session.lastAccessTime
@@ -569,7 +569,7 @@ class TFrontendServiceSuite extends KyuubiFunSuite {
assert(session.lastAccessTime > lastAccessTime)
}
info("session is terminated")
- assert(sessionManager.getOpenSessionCount === 0)
+ assert(sessionManager.getActiveUserSessionCount === 0)
}
}
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
index c7eee1503..2968c4f96 100644
---
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
@@ -66,8 +66,9 @@ abstract class ServiceDiscovery(
// stop the server genteelly
def stopGracefully(isLost: Boolean = false): Unit = {
- while (fe.be.sessionManager.getOpenSessionCount > 0) {
- info(s"${fe.be.sessionManager.getOpenSessionCount} connection(s) are
active, delay shutdown")
+ val activeSessionCount = fe.be.sessionManager.getActiveUserSessionCount
+ while (activeSessionCount > 0) {
+ info(s"$activeSessionCount connection(s) are active, delay shutdown")
Thread.sleep(TimeUnit.SECONDS.toMillis(10))
}
isServerLost.set(isLost)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 9691852b9..160492fcb 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -185,7 +185,7 @@ class KyuubiSyncThriftClient private (
val req = new TOpenSessionReq(protocol)
req.setUsername(user)
req.setPassword(password)
- req.setConfiguration(configs.asJava)
+ req.setConfiguration((configs ++ Map(KYUUBI_SESSION_ALIVE_PROBE ->
"false")).asJava)
val resp = withLockAcquired(OpenSession(req))
ThriftUtils.verifyTStatus(resp.getStatus)
_remoteSessionHandle = resp.getSessionHandle
@@ -205,7 +205,8 @@ class KyuubiSyncThriftClient private (
req.setConfiguration((configs ++ Map(
KyuubiConf.SESSION_NAME.key -> sessionName,
KYUUBI_SESSION_HANDLE_KEY -> UUID.randomUUID().toString,
- KyuubiConf.ENGINE_SESSION_INITIALIZE_SQL.key -> "")).asJava)
+ KyuubiConf.ENGINE_SESSION_INITIALIZE_SQL.key -> "",
+ KYUUBI_SESSION_ALIVE_PROBE -> "true")).asJava)
val resp = aliveProbeClient.OpenSession(req)
ThriftUtils.verifyTStatus(resp.getStatus)
_aliveProbeSessionHandle = resp.getSessionHandle
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
index 10a557867..65210fe1a 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
@@ -128,7 +128,7 @@ private[v1] class SessionsResource extends
ApiRequestContext with Logging {
@GET
@Path("count")
def sessionCount(): SessionOpenCount = {
- new SessionOpenCount(sessionManager.getOpenSessionCount)
+ new SessionOpenCount(sessionManager.getActiveUserSessionCount)
}
@ApiResponse(
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
index a4c345af3..78974e65a 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
@@ -70,4 +70,6 @@ abstract class KyuubiSession(
ms.decCount(MetricRegistry.name(CONN_OPEN, user, sessionType.toString))
ms.decCount(MetricRegistry.name(CONN_OPEN, sessionType.toString))
}
+
+ override val isForAliveProbe: Boolean = false
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 5500a4711..bab6606c8 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -285,7 +285,7 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
override def start(): Unit = synchronized {
MetricsSystem.tracing { ms =>
- ms.registerGauge(CONN_OPEN, getOpenSessionCount, 0)
+ ms.registerGauge(CONN_OPEN, getActiveUserSessionCount, 0)
ms.registerGauge(EXEC_POOL_ALIVE, getExecPoolSize, 0)
ms.registerGauge(EXEC_POOL_ACTIVE, getActiveCount, 0)
ms.registerGauge(EXEC_POOL_WORK_QUEUE_SIZE, getWorkQueueSize, 0)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index 7f749d8d7..de72e8fb2 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -524,7 +524,7 @@ abstract class BatchesResourceSuiteBase extends
KyuubiFunSuite
val sessionManager =
fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
val kyuubiInstance = fe.connectionUrl
- assert(sessionManager.getOpenSessionCount === 0)
+ assert(sessionManager.getActiveUserSessionCount === 0)
val batchId1 = UUID.randomUUID().toString
val batchId2 = UUID.randomUUID().toString
@@ -584,7 +584,7 @@ abstract class BatchesResourceSuiteBase extends
KyuubiFunSuite
val restFe = fe.asInstanceOf[KyuubiRestFrontendService]
restFe.recoverBatchSessions()
- assert(sessionManager.getOpenSessionCount === 2)
+ assert(sessionManager.getActiveUserSessionCount === 2)
val sessionHandle1 = SessionHandle.fromUUID(batchId1)
val sessionHandle2 = SessionHandle.fromUUID(batchId2)