This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new d17efd110 [KYUUBI #4847][FOLLOWUP] Exclude the alive probe sessions in 
terminating checker
d17efd110 is described below

commit d17efd11022242081322990a1c7ce910c685c9b1
Author: Wang, Fei <[email protected]>
AuthorDate: Thu May 9 08:48:21 2024 -0700

    [KYUUBI #4847][FOLLOWUP] Exclude the alive probe sessions in terminating 
checker
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #
    follow up of #4847
    
    Address comments: 
https://github.com/apache/kyuubi/issues/4847#issuecomment-2072945805
    ## Describe Your Solution ๐Ÿ”ง
    
    In this pr, when checking the engine terminating, it will ignore the alive 
probe sessions.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6355 from turboFei/engine_idle.
    
    Closes #4847
    
    a8e26e71d [Wang, Fei] comments
    418d0b41c [Wang, Fei] val
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit 88b24601d07991dedb46a416cd507537effb7b7a)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../org/apache/kyuubi/engine/spark/SparkSQLEngine.scala   |  8 ++++----
 .../org/apache/kyuubi/config/KyuubiReservedKeys.scala     |  1 +
 .../scala/org/apache/kyuubi/session/AbstractSession.scala |  4 ++++
 .../main/scala/org/apache/kyuubi/session/Session.scala    |  2 ++
 .../scala/org/apache/kyuubi/session/SessionManager.scala  | 15 ++++++++++-----
 .../org/apache/kyuubi/service/TFrontendServiceSuite.scala |  6 +++---
 .../org/apache/kyuubi/ha/client/ServiceDiscovery.scala    |  5 +++--
 .../org/apache/kyuubi/client/KyuubiSyncThriftClient.scala |  5 +++--
 .../apache/kyuubi/server/api/v1/SessionsResource.scala    |  2 +-
 .../scala/org/apache/kyuubi/session/KyuubiSession.scala   |  2 ++
 .../org/apache/kyuubi/session/KyuubiSessionManager.scala  |  2 +-
 .../kyuubi/server/api/v1/BatchesResourceSuite.scala       |  4 ++--
 12 files changed, 36 insertions(+), 20 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 6dd438ffd..5ed67963b 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -128,7 +128,7 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
       if (!shutdown.get) {
         info(s"Spark engine is de-registering from engine discovery space.")
         frontendServices.flatMap(_.discoveryService).foreach(_.stop())
-        while (backendService.sessionManager.getOpenSessionCount > 0) {
+        while (backendService.sessionManager.getActiveUserSessionCount > 0) {
           Thread.sleep(TimeUnit.SECONDS.toMillis(10))
         }
         info(s"Spark engine has no open session now, terminating.")
@@ -145,12 +145,12 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
     Utils.tryLogNonFatalError {
       ThreadUtils.runInNewThread("spark-engine-failfast-checker") {
         if (!shutdown.get) {
-          while (backendService.sessionManager.getOpenSessionCount <= 0 &&
+          while (backendService.sessionManager.getActiveUserSessionCount <= 0 
&&
             System.currentTimeMillis() - startedTime < maxTimeout) {
             info(s"Waiting for the initial connection")
             Thread.sleep(Duration(10, TimeUnit.SECONDS).toMillis)
           }
-          if (backendService.sessionManager.getOpenSessionCount <= 0) {
+          if (backendService.sessionManager.getActiveUserSessionCount <= 0) {
             error(s"Spark engine has been terminated because no incoming 
connection" +
               s" for more than $maxTimeout ms, de-registering from engine 
discovery space.")
             assert(currentEngine.isDefined)
@@ -180,7 +180,7 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
             frontendServices.flatMap(_.discoveryService).foreach(_.stop())
           }
 
-          if (backendService.sessionManager.getOpenSessionCount <= 0) {
+          if (backendService.sessionManager.getActiveUserSessionCount <= 0) {
             info(s"Spark engine has been running for more than $maxLifetime 
ms" +
               s" and no open session now, terminating.")
             stop()
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
index 592425a4b..9f22dd1f8 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
@@ -37,6 +37,7 @@ object KyuubiReservedKeys {
   final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time"
   final val KYUUBI_ENGINE_CREDENTIALS_KEY = "kyuubi.engine.credentials"
   final val KYUUBI_SESSION_HANDLE_KEY = "kyuubi.session.handle"
+  final val KYUUBI_SESSION_ALIVE_PROBE = "kyuubi.session.alive.probe"
   final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID =
     "kyuubi.session.engine.launch.handle.guid"
   final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_SECRET =
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index a00a12c1f..2dfbe510f 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.kyuubi.{KyuubiSQLException, Logging}
 import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiReservedKeys
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_CLIENT_IP_KEY
 import org.apache.kyuubi.operation.{Operation, OperationHandle}
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@@ -259,4 +260,7 @@ abstract class AbstractSession(
   override def open(): Unit = {
     OperationLog.createOperationLogRootDirectory(this)
   }
+
+  val isForAliveProbe: Boolean =
+    
conf.get(KyuubiReservedKeys.KYUUBI_SESSION_ALIVE_PROBE).exists(_.equalsIgnoreCase("true"))
 }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
index c618c0480..dd9f69fb9 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
@@ -93,4 +93,6 @@ trait Session {
       fetchLog: Boolean): TFetchResultsResp
 
   def closeExpiredOperations(): Unit
+
+  def isForAliveProbe: Boolean
 }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index 3a4dab54c..7751b7298 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -92,7 +92,7 @@ abstract class SessionManager(name: String) extends 
CompositeService(name) {
   protected def logSessionCountInfo(session: Session, action: String): Unit = {
     info(s"${session.user}'s ${session.getClass.getSimpleName} with" +
       s" ${session.handle}${session.name.map("/" + _).getOrElse("")} is 
$action," +
-      s" current opening sessions $getOpenSessionCount")
+      s" current opening sessions $getActiveUserSessionCount")
   }
 
   def openSession(
@@ -122,11 +122,13 @@ abstract class SessionManager(name: String) extends 
CompositeService(name) {
   }
 
   def closeSession(sessionHandle: SessionHandle): Unit = {
-    _latestLogoutTime = System.currentTimeMillis()
     val session = handleToSession.remove(sessionHandle)
     if (session == null) {
       throw KyuubiSQLException(s"Invalid $sessionHandle")
     }
+    if (!session.isForAliveProbe) {
+      _latestLogoutTime = System.currentTimeMillis()
+    }
     logSessionCountInfo(session, "closed")
     try {
       session.close()
@@ -159,7 +161,10 @@ abstract class SessionManager(name: String) extends 
CompositeService(name) {
     handleToSession.put(sessionHandle, session)
   }
 
-  def getOpenSessionCount: Int = handleToSession.size()
+  /**
+   * Get the count of active user sessions, which excludes alive probe 
sessions.
+   */
+  def getActiveUserSessionCount: Int = 
handleToSession.values().asScala.count(!_.isForAliveProbe)
 
   def allSessions(): Iterable[Session] = handleToSession.values().asScala
 
@@ -303,7 +308,7 @@ abstract class SessionManager(name: String) extends 
CompositeService(name) {
 
     val checkTask = new Runnable {
       override def run(): Unit = {
-        info(s"Checking sessions timeout, current count: $getOpenSessionCount")
+        info(s"Checking sessions timeout, current count: 
$getActiveUserSessionCount")
         val current = System.currentTimeMillis
         if (!shutdown) {
           for (session <- handleToSession.values().asScala) {
@@ -341,7 +346,7 @@ abstract class SessionManager(name: String) extends 
CompositeService(name) {
       val checkTask = new Runnable {
         override def run(): Unit = {
           if (!shutdown && System.currentTimeMillis() - latestLogoutTime > 
idleTimeout &&
-            getOpenSessionCount <= 0) {
+            getActiveUserSessionCount <= 0) {
             info(s"Idled for more than $idleTimeout ms, terminating")
             stop()
           }
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
index 246fc59ad..0e3e98606 100644
--- 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
@@ -549,13 +549,13 @@ class TFrontendServiceSuite extends KyuubiFunSuite {
         .getSession(SessionHandle(handle))
         .asInstanceOf[AbstractSession]
       var lastAccessTime = session.lastAccessTime
-      assert(sessionManager.getOpenSessionCount === 1)
+      assert(sessionManager.getActiveUserSessionCount === 1)
       assert(session.lastIdleTime > 0)
 
       val cancelOpReq = new TCancelOperationReq(resp.getOperationHandle)
       val cancelOpResp = client.CancelOperation(cancelOpReq)
       assert(cancelOpResp.getStatus.getStatusCode === 
TStatusCode.SUCCESS_STATUS)
-      assert(sessionManager.getOpenSessionCount === 1)
+      assert(sessionManager.getActiveUserSessionCount === 1)
       assert(session.lastIdleTime === 0)
 
       lastAccessTime = session.lastAccessTime
@@ -569,7 +569,7 @@ class TFrontendServiceSuite extends KyuubiFunSuite {
         assert(session.lastAccessTime > lastAccessTime)
       }
       info("session is terminated")
-      assert(sessionManager.getOpenSessionCount === 0)
+      assert(sessionManager.getActiveUserSessionCount === 0)
     }
   }
 
diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
index c7eee1503..2968c4f96 100644
--- 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++ 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
@@ -66,8 +66,9 @@ abstract class ServiceDiscovery(
 
   // stop the server genteelly
   def stopGracefully(isLost: Boolean = false): Unit = {
-    while (fe.be.sessionManager.getOpenSessionCount > 0) {
-      info(s"${fe.be.sessionManager.getOpenSessionCount} connection(s) are 
active, delay shutdown")
+    val activeSessionCount = fe.be.sessionManager.getActiveUserSessionCount
+    while (activeSessionCount > 0) {
+      info(s"$activeSessionCount connection(s) are active, delay shutdown")
       Thread.sleep(TimeUnit.SECONDS.toMillis(10))
     }
     isServerLost.set(isLost)
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index c5ef66f5e..9628613a6 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -187,7 +187,7 @@ class KyuubiSyncThriftClient private (
     val req = new TOpenSessionReq(protocol)
     req.setUsername(user)
     req.setPassword(password)
-    req.setConfiguration(configs.asJava)
+    req.setConfiguration((configs ++ Map(KYUUBI_SESSION_ALIVE_PROBE -> 
"false")).asJava)
     val resp = withLockAcquired(OpenSession(req))
     ThriftUtils.verifyTStatus(resp.getStatus)
     _remoteSessionHandle = resp.getSessionHandle
@@ -207,7 +207,8 @@ class KyuubiSyncThriftClient private (
         req.setConfiguration((configs ++ Map(
           KyuubiConf.SESSION_NAME.key -> sessionName,
           KYUUBI_SESSION_HANDLE_KEY -> UUID.randomUUID().toString,
-          KyuubiConf.ENGINE_SESSION_INITIALIZE_SQL.key -> "")).asJava)
+          KyuubiConf.ENGINE_SESSION_INITIALIZE_SQL.key -> "",
+          KYUUBI_SESSION_ALIVE_PROBE -> "true")).asJava)
         val resp = aliveProbeClient.OpenSession(req)
         ThriftUtils.verifyTStatus(resp.getStatus)
         _aliveProbeSessionHandle = resp.getSessionHandle
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
index 928bb207a..0954f8828 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
@@ -109,7 +109,7 @@ private[v1] class SessionsResource extends 
ApiRequestContext with Logging {
   @GET
   @Path("count")
   def sessionCount(): SessionOpenCount = {
-    new SessionOpenCount(sessionManager.getOpenSessionCount)
+    new SessionOpenCount(sessionManager.getActiveUserSessionCount)
   }
 
   @ApiResponse(
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
index 19f403987..e9ce2adc9 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
@@ -70,4 +70,6 @@ abstract class KyuubiSession(
     ms.decCount(MetricRegistry.name(CONN_OPEN, user, sessionType.toString))
     ms.decCount(MetricRegistry.name(CONN_OPEN, sessionType.toString))
   }
+
+  override val isForAliveProbe: Boolean = false
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 2297cc02c..9edc8218e 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -287,7 +287,7 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
 
   override def start(): Unit = synchronized {
     MetricsSystem.tracing { ms =>
-      ms.registerGauge(CONN_OPEN, getOpenSessionCount, 0)
+      ms.registerGauge(CONN_OPEN, getActiveUserSessionCount, 0)
       ms.registerGauge(EXEC_POOL_ALIVE, getExecPoolSize, 0)
       ms.registerGauge(EXEC_POOL_ACTIVE, getActiveCount, 0)
       ms.registerGauge(EXEC_POOL_WORK_QUEUE_SIZE, getWorkQueueSize, 0)
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index fd4cca329..f3287170a 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -525,7 +525,7 @@ abstract class BatchesResourceSuiteBase extends 
KyuubiFunSuite
     val sessionManager = 
fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
     val kyuubiInstance = fe.connectionUrl
 
-    assert(sessionManager.getOpenSessionCount === 0)
+    assert(sessionManager.getActiveUserSessionCount === 0)
     val batchId1 = UUID.randomUUID().toString
     val batchId2 = UUID.randomUUID().toString
 
@@ -585,7 +585,7 @@ abstract class BatchesResourceSuiteBase extends 
KyuubiFunSuite
 
     val restFe = fe.asInstanceOf[KyuubiRestFrontendService]
     restFe.recoverBatchSessions()
-    assert(sessionManager.getOpenSessionCount === 2)
+    assert(sessionManager.getActiveUserSessionCount === 2)
 
     val sessionHandle1 = SessionHandle.fromUUID(batchId1)
     val sessionHandle2 = SessionHandle.fromUUID(batchId2)

Reply via email to