This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new 5ceb641c1 [KYUUBI #4847][FOLLOWUP] Fix engine session never idle issue
5ceb641c1 is described below

commit 5ceb641c1958eb02516d9e49cc5a7777d087de2f
Author: Wang, Fei <[email protected]>
AuthorDate: Mon Jun 17 18:28:45 2024 +0800

    [KYUUBI #4847][FOLLOWUP] Fix engine session never idle issue
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    Address comments 
https://github.com/apache/kyuubi/issues/4847#issuecomment-2114284381
    
    Now, for `checkEngineConnectionAlive`, it use the client to send 
`TGetInfoType` to engine and cause the user session never idle for timeout.
    
    ## Describe Your Solution ๐Ÿ”ง
    
    We shall reuse the alive probe client.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    Pass the current UT.
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6468 from turboFei/engine_alive_check.
    
    Closes #4847
    
    e2368b206 [Wang, Fei] reuse
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 83de6cf11e23b65adc680427b5efca6931497993)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/client/KyuubiSyncThriftClient.scala     | 11 ++++---
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  | 38 ++--------------------
 2 files changed, 8 insertions(+), 41 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 7133131d7..d34458c64 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -63,7 +63,8 @@ class KyuubiSyncThriftClient private (
   private[kyuubi] def remoteSessionHandle: TSessionHandle = 
_remoteSessionHandle
 
   @volatile private var _aliveProbeSessionHandle: TSessionHandle = _
-  @volatile private var remoteEngineBroken: Boolean = false
+  @volatile private var _remoteEngineBroken: Boolean = false
+  private[kyuubi] def remoteEngineBroken: Boolean = _remoteEngineBroken
   @volatile private var clientClosedByAliveProbe: Boolean = false
   private val engineAliveProbeClient = engineAliveProbeProtocol.map(new 
TCLIService.Client(_))
   private var engineAliveThreadPool: ScheduledExecutorService = _
@@ -111,7 +112,7 @@ class KyuubiSyncThriftClient private (
 
     val task = new Runnable {
       override def run(): Unit = {
-        if (!remoteEngineBroken && !engineConnectionClosed) {
+        if (!_remoteEngineBroken && !engineConnectionClosed) {
           engineAliveProbeClient.foreach { client =>
             val tGetInfoReq = new TGetInfoReq()
             tGetInfoReq.setSessionHandle(_aliveProbeSessionHandle)
@@ -120,7 +121,7 @@ class KyuubiSyncThriftClient private (
             try {
               client.GetInfo(tGetInfoReq).getInfoValue.getStringValue
               engineLastAlive = System.currentTimeMillis()
-              remoteEngineBroken = false
+              _remoteEngineBroken = false
             } catch {
               case e: Throwable =>
                 val engineIdStr = engineId.getOrElse("")
@@ -129,7 +130,7 @@ class KyuubiSyncThriftClient private (
                 if (now - engineLastAlive > engineAliveTimeout) {
                   error(s"Mark the engine[$engineIdStr] not alive with no 
recent alive probe" +
                     s" success: ${now - engineLastAlive} ms exceeds timeout 
$engineAliveTimeout ms")
-                  remoteEngineBroken = true
+                  _remoteEngineBroken = true
                   closeClient()
                 }
             }
@@ -165,7 +166,7 @@ class KyuubiSyncThriftClient private (
 
     val task = asyncRequestExecutor.submit(() => {
       val resp = block
-      remoteEngineBroken = false
+      _remoteEngineBroken = false
       resp
     })
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index c0634455f..22cd4dc8a 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -119,7 +119,6 @@ class KyuubiSessionImpl(
     super.open()
 
     runOperation(launchEngineOp)
-    engineLastAlive = System.currentTimeMillis()
   }
 
   def getEngineNode: Option[ServiceNodeInfo] = {
@@ -315,41 +314,8 @@ class KyuubiSessionImpl(
     }
   }
 
-  @volatile private var engineLastAlive: Long = _
-  private val engineAliveTimeout = 
sessionConf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT)
-  private val aliveProbeEnabled = 
sessionConf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED)
-  private val engineAliveMaxFailCount = 
sessionConf.get(KyuubiConf.ENGINE_ALIVE_MAX_FAILURES)
-  @volatile private var engineAliveFailCount = 0
-
   def checkEngineConnectionAlive(): Boolean = {
-    try {
-      if (Option(client).exists(_.engineConnectionClosed)) return false
-      if (!aliveProbeEnabled) return true
-      getInfo(TGetInfoType.CLI_DBMS_VER)
-      engineLastAlive = System.currentTimeMillis()
-      engineAliveFailCount = 0
-      true
-    } catch {
-      case e: Throwable =>
-        val now = System.currentTimeMillis()
-        engineAliveFailCount = engineAliveFailCount + 1
-        if (now - engineLastAlive > engineAliveTimeout &&
-          engineAliveFailCount >= engineAliveMaxFailCount) {
-          error(s"The engineRef[${engine.getEngineRefId}] is marked as not 
alive "
-            + s"due to a lack of recent successful alive probes. "
-            + s"The time since last successful probe: "
-            + s"${now - engineLastAlive} ms exceeds the timeout of 
$engineAliveTimeout ms. "
-            + s"The engine has failed $engineAliveFailCount times, "
-            + s"surpassing the maximum failure count of 
$engineAliveMaxFailCount.")
-          false
-        } else {
-          warn(
-            s"The engineRef[${engine.getEngineRefId}] alive probe fails, " +
-              s"${now - engineLastAlive} ms exceeds timeout 
$engineAliveTimeout ms, " +
-              s"and has failed $engineAliveFailCount times.",
-            e)
-          true
-        }
-    }
+    if (Option(client).exists(_.engineConnectionClosed)) return false
+    !client.remoteEngineBroken
   }
 }

Reply via email to