This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new e8c38f60b [KYUUBI #4746] Do not recreate async request executor if has 
been shutdown
e8c38f60b is described below

commit e8c38f60b45f11ac9b613a43ab98095d8300e7ab
Author: fwang12 <[email protected]>
AuthorDate: Fri Apr 21 17:57:01 2023 +0800

    [KYUUBI #4746] Do not recreate async request executor if has been shutdown
    
    ### _Why are the changes needed?_
    
    After #4480, there should be only one asyncRequestExecutor in one 
KyuubiSyncThriftClient
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4746 from turboFei/engine_alive.
    
    Closes #4746
    
    9d2fa5fe9 [fwang12] fix typo
    d90263b65 [fwang12] check
    38aefdf39 [fwang12] close protocol first
    6eb61780f [fwang12] do not renew executor
    64e66a857 [fwang12] close protocol first
    
    Authored-by: fwang12 <[email protected]>
    Signed-off-by: fwang12 <[email protected]>
---
 .../kyuubi/client/KyuubiSyncThriftClient.scala     | 23 ++++++++++------------
 1 file changed, 10 insertions(+), 13 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 587fd5756..c02851715 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -64,7 +64,9 @@ class KyuubiSyncThriftClient private (
   private var engineAliveThreadPool: ScheduledExecutorService = _
   @volatile private var engineLastAlive: Long = _
 
-  private var asyncRequestExecutor: ExecutorService = _
+  private lazy val asyncRequestExecutor: ExecutorService =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+      "async-request-executor-" + SessionHandle(_remoteSessionHandle))
 
   @VisibleForTesting
   @volatile private[kyuubi] var asyncRequestInterrupted: Boolean = false
@@ -72,11 +74,6 @@ class KyuubiSyncThriftClient private (
   @VisibleForTesting
   private[kyuubi] def getEngineAliveProbeProtocol: Option[TProtocol] = 
engineAliveProbeProtocol
 
-  private def newAsyncRequestExecutor(): ExecutorService = {
-    ThreadUtils.newDaemonSingleThreadScheduledExecutor(
-      "async-request-executor-" + _remoteSessionHandle)
-  }
-
   private def shutdownAsyncRequestExecutor(): Unit = {
     
Option(asyncRequestExecutor).filterNot(_.isShutdown).foreach(ThreadUtils.shutdown(_))
     asyncRequestInterrupted = true
@@ -109,7 +106,6 @@ class KyuubiSyncThriftClient private (
             }
           }
         } else {
-          shutdownAsyncRequestExecutor()
           warn(s"Removing Clients for ${_remoteSessionHandle}")
           Seq(protocol).union(engineAliveProbeProtocol.toSeq).foreach { 
tProtocol =>
             Utils.tryLogNonFatalError {
@@ -117,10 +113,11 @@ class KyuubiSyncThriftClient private (
                 tProtocol.getTransport.close()
               }
             }
-            clientClosedOnEngineBroken = true
-            Option(engineAliveThreadPool).foreach { pool =>
-              ThreadUtils.shutdown(pool, Duration(engineAliveProbeInterval, 
TimeUnit.MILLISECONDS))
-            }
+          }
+          clientClosedOnEngineBroken = true
+          shutdownAsyncRequestExecutor()
+          Option(engineAliveThreadPool).foreach { pool =>
+            ThreadUtils.shutdown(pool, Duration(engineAliveProbeInterval, 
TimeUnit.MILLISECONDS))
           }
         }
       }
@@ -144,8 +141,8 @@ class KyuubiSyncThriftClient private (
   }
 
   private def withLockAcquiredAsyncRequest[T](block: => T): T = 
withLockAcquired {
-    if (asyncRequestExecutor == null || asyncRequestExecutor.isShutdown) {
-      asyncRequestExecutor = newAsyncRequestExecutor()
+    if (asyncRequestExecutor.isShutdown) {
+      throw KyuubiSQLException.connectionDoesNotExist()
     }
 
     val task = asyncRequestExecutor.submit(() => {

Reply via email to