This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 8d424ef43 [KYUUBI #4746] Do not recreate async request executor if has
been shutdown
8d424ef43 is described below
commit 8d424ef435dc198f16905bcdb7d4b722f6577466
Author: fwang12 <[email protected]>
AuthorDate: Fri Apr 21 17:57:01 2023 +0800
[KYUUBI #4746] Do not recreate async request executor if has been shutdown
### _Why are the changes needed?_
After #4480, there should be only one asyncRequestExecutor in one
KyuubiSyncThriftClient
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4746 from turboFei/engine_alive.
Closes #4746
9d2fa5fe9 [fwang12] fix typo
d90263b65 [fwang12] check
38aefdf39 [fwang12] close protocol first
6eb61780f [fwang12] do not renew executor
64e66a857 [fwang12] close protocol first
Authored-by: fwang12 <[email protected]>
Signed-off-by: fwang12 <[email protected]>
---
.../kyuubi/client/KyuubiSyncThriftClient.scala | 23 ++++++++++------------
1 file changed, 10 insertions(+), 13 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 587fd5756..c02851715 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -64,7 +64,9 @@ class KyuubiSyncThriftClient private (
private var engineAliveThreadPool: ScheduledExecutorService = _
@volatile private var engineLastAlive: Long = _
- private var asyncRequestExecutor: ExecutorService = _
+ private lazy val asyncRequestExecutor: ExecutorService =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+ "async-request-executor-" + SessionHandle(_remoteSessionHandle))
@VisibleForTesting
@volatile private[kyuubi] var asyncRequestInterrupted: Boolean = false
@@ -72,11 +74,6 @@ class KyuubiSyncThriftClient private (
@VisibleForTesting
private[kyuubi] def getEngineAliveProbeProtocol: Option[TProtocol] =
engineAliveProbeProtocol
- private def newAsyncRequestExecutor(): ExecutorService = {
- ThreadUtils.newDaemonSingleThreadScheduledExecutor(
- "async-request-executor-" + _remoteSessionHandle)
- }
-
private def shutdownAsyncRequestExecutor(): Unit = {
Option(asyncRequestExecutor).filterNot(_.isShutdown).foreach(ThreadUtils.shutdown(_))
asyncRequestInterrupted = true
@@ -109,7 +106,6 @@ class KyuubiSyncThriftClient private (
}
}
} else {
- shutdownAsyncRequestExecutor()
warn(s"Removing Clients for ${_remoteSessionHandle}")
Seq(protocol).union(engineAliveProbeProtocol.toSeq).foreach {
tProtocol =>
Utils.tryLogNonFatalError {
@@ -117,10 +113,11 @@ class KyuubiSyncThriftClient private (
tProtocol.getTransport.close()
}
}
- clientClosedOnEngineBroken = true
- Option(engineAliveThreadPool).foreach { pool =>
- ThreadUtils.shutdown(pool, Duration(engineAliveProbeInterval,
TimeUnit.MILLISECONDS))
- }
+ }
+ clientClosedOnEngineBroken = true
+ shutdownAsyncRequestExecutor()
+ Option(engineAliveThreadPool).foreach { pool =>
+ ThreadUtils.shutdown(pool, Duration(engineAliveProbeInterval,
TimeUnit.MILLISECONDS))
}
}
}
@@ -144,8 +141,8 @@ class KyuubiSyncThriftClient private (
}
private def withLockAcquiredAsyncRequest[T](block: => T): T =
withLockAcquired {
- if (asyncRequestExecutor == null || asyncRequestExecutor.isShutdown) {
- asyncRequestExecutor = newAsyncRequestExecutor()
+ if (asyncRequestExecutor.isShutdown) {
+ throw KyuubiSQLException.connectionDoesNotExist()
}
val task = asyncRequestExecutor.submit(() => {