This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 47562b464 [KYUUBI #4847][FOLLOWUP] Close the session immediately when 
engine connection closed
47562b464 is described below

commit 47562b464bd4b7a384ee219aabc5fe7b4e1aa364
Author: fwang12 <[email protected]>
AuthorDate: Mon Jul 10 12:28:10 2023 +0800

    [KYUUBI #4847][FOLLOWUP] Close the session immediately when engine 
connection closed
    
    ### _Why are the changes needed?_
    
    If the session between kyuubi server and kyuubi engine has been inactive, 
we should close the kyuubi session as well.
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    Closes #5031 from turboFei/close_session_inactive.
    
    Closes #4847
    
    2eea080b5 [fwang12] fix
    964ead778 [fwang12] check engine connection alive
    62642f734 [fwang12] save
    
    Authored-by: fwang12 <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../org/apache/kyuubi/client/KyuubiSyncThriftClient.scala      |  6 ++++--
 .../scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala    |  3 ++-
 .../scala/org/apache/kyuubi/session/KyuubiSessionManager.scala | 10 +++++++---
 3 files changed, 13 insertions(+), 6 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 6cace4452..ad7191c09 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -52,6 +52,8 @@ class KyuubiSyncThriftClient private (
   @volatile private var _engineUrl: Option[String] = _
   @volatile private var _engineName: Option[String] = _
 
+  private[kyuubi] def engineConnectionClosed: Boolean = 
!protocol.getTransport.isOpen
+
   private val lock = new ReentrantLock()
 
   // Visible for testing.
@@ -84,7 +86,7 @@ class KyuubiSyncThriftClient private (
       "engine-alive-probe-" + _aliveProbeSessionHandle)
     val task = new Runnable {
       override def run(): Unit = {
-        if (!remoteEngineBroken && protocol.getTransport.isOpen) {
+        if (!remoteEngineBroken && !engineConnectionClosed) {
           engineAliveProbeClient.foreach { client =>
             val tGetInfoReq = new TGetInfoReq()
             tGetInfoReq.setSessionHandle(_aliveProbeSessionHandle)
@@ -134,7 +136,7 @@ class KyuubiSyncThriftClient private (
    * Lock every rpc call to send them sequentially
    */
   private def withLockAcquired[T](block: => T): T = 
Utils.withLockRequired(lock) {
-    if (!protocol.getTransport.isOpen) {
+    if (engineConnectionClosed) {
       throw KyuubiSQLException.connectionDoesNotExist()
     }
     block
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 237eb3ca6..809be86f3 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -291,8 +291,9 @@ class KyuubiSessionImpl(
   var engineAliveMaxFailCount = 3
   var engineAliveFailCount = 0
 
-  def checkEngineAlive(): Boolean = {
+  def checkEngineConnectionAlive(): Boolean = {
     try {
+      if (Option(client).exists(_.engineConnectionClosed)) return false
       if (!aliveProbeEnabled) return true
       getInfo(TGetInfoType.CLI_DBMS_VER)
       engineLastAlive = System.currentTimeMillis()
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 6e74bedbf..d2547bca9 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -63,7 +63,7 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
   private var batchLimiter: Option[SessionLimiter] = None
   lazy val (signingPrivateKey, signingPublicKey) = SignUtils.generateKeyPair()
 
-  private val engineAliveChecker =
+  private val engineConnectionAliveChecker =
     
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker")
 
   override def initialize(conf: KyuubiConf): Unit = {
@@ -350,7 +350,7 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
     val interval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL)
     val checkTask: Runnable = () => {
       allSessions().foreach { session =>
-        if (!session.asInstanceOf[KyuubiSessionImpl].checkEngineAlive()) {
+        if 
(!session.asInstanceOf[KyuubiSessionImpl].checkEngineConnectionAlive()) {
           try {
             closeSession(session.handle)
             logger.info(s"The session ${session.handle} has been closed " +
@@ -362,7 +362,11 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
         }
       }
     }
-    engineAliveChecker.scheduleWithFixedDelay(checkTask, interval, interval, 
TimeUnit.MILLISECONDS)
+    engineConnectionAliveChecker.scheduleWithFixedDelay(
+      checkTask,
+      interval,
+      interval,
+      TimeUnit.MILLISECONDS)
   }
 
 }

Reply via email to