This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new f4442c0de [KYUUBI #6172][TASK][EASY] Support to interrupt the thrift 
request immediately after marking the engine not alive
f4442c0de is described below

commit f4442c0de56b5268606ca897ae132729a9e2a9de
Author: wangjunbo <[email protected]>
AuthorDate: Thu May 9 08:46:16 2024 -0700

    [KYUUBI #6172][TASK][EASY] Support to interrupt the thrift request 
immediately after marking the engine not alive
    
    Support to interrupt the thrift request immediately after marking the 
engine not alive
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #6172
    
    ## Describe Your Solution ๐Ÿ”ง
    
    
https://github.com/apache/kyuubi/blob/12c5568c9b020b1212c9514f046c67fcb267467e/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala#L103-L110
    
    When probe fails and exceeds engineAliveTimeout, not interrupt the thrift 
request immediately, only marked `remoteEngineBroken` and wait next 
`engineAliveProbeInterval` to interrupt.
    Unit test `KyuubiOperationPerConnectionSuite` assert timeout 3s.
    
    
https://github.com/apache/kyuubi/blob/12c5568c9b020b1212c9514f046c67fcb267467e/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala#L344-L346
    
    Exception log
    ```
    03:25:15.125 
engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:BD 23 DE B6 
16 56 4E 2B 97 3C 09 74 89 F5 C9 26, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 
08 ED 8F 38)): Thread-11886 WARN KyuubiSyncThriftClient: The 
engine[local-1714879506640] alive probe fails
    org.apache.kyuubi.shaded.thrift.transport.TTransportException: Socket is 
closed by peer.
            ...
    03:25:16.126 
engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:BD 23 DE B6 
16 56 4E 2B 97 3C 09 74 89 F5 C9 26, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 
08 ED 8F 38)): Thread-11886 WARN KyuubiSyncThriftClient: The 
engine[local-1714879506640] alive probe fails
    org.apache.kyuubi.shaded.thrift.transport.TTransportException: 
java.net.SocketException: Broken pipe (Write failed)
            ...
    03:25:16.126 
engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:BD 23 DE B6 
16 56 4E 2B 97 3C 09 74 89 F5 C9 26, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 
08 ED 8F 38)): Thread-11886 ERROR KyuubiSyncThriftClient: Mark the 
engine[local-1714879506640] not alive with no recent alive probe success: 2001 
ms exceeds timeout 1000 ms
    ```
    
    Success log
    ```
    16:57:46.859 
engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:B4 9D 71 83 
6D 15 4D 8D BE DA 65 75 27 5D 4E D8, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 
08 ED 8F 38)): Thread-11609 WARN KyuubiSyncThriftClient: The 
engine[local-1715101059872] alive probe fails
    ...
    16:57:46.860 
engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:B4 9D 71 83 
6D 15 4D 8D BE DA 65 75 27 5D 4E D8, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 
08 ED 8F 38)): Thread-11609 ERROR KyuubiSyncThriftClient: Mark the 
engine[local-1715101059872] not alive with no recent alive probe success: 1001 
ms exceeds timeout 1000 ms
    16:57:47.860 
engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:B4 9D 71 83 
6D 15 4D 8D BE DA 65 75 27 5D 4E D8, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 
08 ED 8F 38)): Thread-11609 WARN KyuubiSyncThriftClient: Removing Clients for 
TSessionHandle(sessionId:THandleIdentifier(guid:9D AA D5 C2 9B E4 43 D7 BE 81 
D0 99 EA 5B 9E 37, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 08 ED 8F 38))
    ```
    
    ## Types of changes :bookmark:
    
    - [x] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6375 from beryllw/kyuubi_6172.
    
    Closes #6172
    
    991798b86 [wangjunbo] [KYUUBI #6172][TASK][EASY] Support to interrupt the 
thrift request immediately after marking the engine not alive
    
    Authored-by: wangjunbo <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit 1fc2b3519aa00269508cf064fef4dd0593cb986a)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../kyuubi/client/KyuubiSyncThriftClient.scala     | 32 +++++++++++++---------
 1 file changed, 19 insertions(+), 13 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 3516c3771..9691852b9 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -85,6 +85,23 @@ class KyuubiSyncThriftClient private (
   private def startEngineAliveProbe(): Unit = {
     engineAliveThreadPool = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
       "engine-alive-probe-" + _aliveProbeSessionHandle)
+
+    def closeClient(): Unit = {
+      warn(s"Removing Clients for ${_remoteSessionHandle}")
+      Seq(protocol).union(engineAliveProbeProtocol.toSeq).foreach { tProtocol 
=>
+        Utils.tryLogNonFatalError {
+          if (tProtocol.getTransport.isOpen) {
+            tProtocol.getTransport.close()
+          }
+        }
+      }
+      clientClosedByAliveProbe = true
+      shutdownAsyncRequestExecutor()
+      Option(engineAliveThreadPool).foreach { pool =>
+        ThreadUtils.shutdown(pool, Duration(engineAliveProbeInterval, 
TimeUnit.MILLISECONDS))
+      }
+    }
+
     val task = new Runnable {
       override def run(): Unit = {
         if (!remoteEngineBroken && !engineConnectionClosed) {
@@ -106,23 +123,12 @@ class KyuubiSyncThriftClient private (
                   error(s"Mark the engine[$engineIdStr] not alive with no 
recent alive probe" +
                     s" success: ${now - engineLastAlive} ms exceeds timeout 
$engineAliveTimeout ms")
                   remoteEngineBroken = true
+                  closeClient()
                 }
             }
           }
         } else {
-          warn(s"Removing Clients for ${_remoteSessionHandle}")
-          Seq(protocol).union(engineAliveProbeProtocol.toSeq).foreach { 
tProtocol =>
-            Utils.tryLogNonFatalError {
-              if (tProtocol.getTransport.isOpen) {
-                tProtocol.getTransport.close()
-              }
-            }
-          }
-          clientClosedByAliveProbe = true
-          shutdownAsyncRequestExecutor()
-          Option(engineAliveThreadPool).foreach { pool =>
-            ThreadUtils.shutdown(pool, Duration(engineAliveProbeInterval, 
TimeUnit.MILLISECONDS))
-          }
+          closeClient()
         }
       }
     }

Reply via email to