This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.9 by this push:
new 5ceb641c1 [KYUUBI #4847][FOLLOWUP] Fix engine session never idle issue
5ceb641c1 is described below
commit 5ceb641c1958eb02516d9e49cc5a7777d087de2f
Author: Wang, Fei <[email protected]>
AuthorDate: Mon Jun 17 18:28:45 2024 +0800
[KYUUBI #4847][FOLLOWUP] Fix engine session never idle issue
# :mag: Description
## Issue References ๐
Address comments
https://github.com/apache/kyuubi/issues/4847#issuecomment-2114284381
Now, for `checkEngineConnectionAlive`, it use the client to send
`TGetInfoType` to engine and cause the user session never idle for timeout.
## Describe Your Solution ๐ง
We shall reuse the alive probe client.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
Pass the current UT.
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6468 from turboFei/engine_alive_check.
Closes #4847
e2368b206 [Wang, Fei] reuse
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 83de6cf11e23b65adc680427b5efca6931497993)
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/client/KyuubiSyncThriftClient.scala | 11 ++++---
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 38 ++--------------------
2 files changed, 8 insertions(+), 41 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 7133131d7..d34458c64 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -63,7 +63,8 @@ class KyuubiSyncThriftClient private (
private[kyuubi] def remoteSessionHandle: TSessionHandle =
_remoteSessionHandle
@volatile private var _aliveProbeSessionHandle: TSessionHandle = _
- @volatile private var remoteEngineBroken: Boolean = false
+ @volatile private var _remoteEngineBroken: Boolean = false
+ private[kyuubi] def remoteEngineBroken: Boolean = _remoteEngineBroken
@volatile private var clientClosedByAliveProbe: Boolean = false
private val engineAliveProbeClient = engineAliveProbeProtocol.map(new
TCLIService.Client(_))
private var engineAliveThreadPool: ScheduledExecutorService = _
@@ -111,7 +112,7 @@ class KyuubiSyncThriftClient private (
val task = new Runnable {
override def run(): Unit = {
- if (!remoteEngineBroken && !engineConnectionClosed) {
+ if (!_remoteEngineBroken && !engineConnectionClosed) {
engineAliveProbeClient.foreach { client =>
val tGetInfoReq = new TGetInfoReq()
tGetInfoReq.setSessionHandle(_aliveProbeSessionHandle)
@@ -120,7 +121,7 @@ class KyuubiSyncThriftClient private (
try {
client.GetInfo(tGetInfoReq).getInfoValue.getStringValue
engineLastAlive = System.currentTimeMillis()
- remoteEngineBroken = false
+ _remoteEngineBroken = false
} catch {
case e: Throwable =>
val engineIdStr = engineId.getOrElse("")
@@ -129,7 +130,7 @@ class KyuubiSyncThriftClient private (
if (now - engineLastAlive > engineAliveTimeout) {
error(s"Mark the engine[$engineIdStr] not alive with no
recent alive probe" +
s" success: ${now - engineLastAlive} ms exceeds timeout
$engineAliveTimeout ms")
- remoteEngineBroken = true
+ _remoteEngineBroken = true
closeClient()
}
}
@@ -165,7 +166,7 @@ class KyuubiSyncThriftClient private (
val task = asyncRequestExecutor.submit(() => {
val resp = block
- remoteEngineBroken = false
+ _remoteEngineBroken = false
resp
})
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index c0634455f..22cd4dc8a 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -119,7 +119,6 @@ class KyuubiSessionImpl(
super.open()
runOperation(launchEngineOp)
- engineLastAlive = System.currentTimeMillis()
}
def getEngineNode: Option[ServiceNodeInfo] = {
@@ -315,41 +314,8 @@ class KyuubiSessionImpl(
}
}
- @volatile private var engineLastAlive: Long = _
- private val engineAliveTimeout =
sessionConf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT)
- private val aliveProbeEnabled =
sessionConf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED)
- private val engineAliveMaxFailCount =
sessionConf.get(KyuubiConf.ENGINE_ALIVE_MAX_FAILURES)
- @volatile private var engineAliveFailCount = 0
-
def checkEngineConnectionAlive(): Boolean = {
- try {
- if (Option(client).exists(_.engineConnectionClosed)) return false
- if (!aliveProbeEnabled) return true
- getInfo(TGetInfoType.CLI_DBMS_VER)
- engineLastAlive = System.currentTimeMillis()
- engineAliveFailCount = 0
- true
- } catch {
- case e: Throwable =>
- val now = System.currentTimeMillis()
- engineAliveFailCount = engineAliveFailCount + 1
- if (now - engineLastAlive > engineAliveTimeout &&
- engineAliveFailCount >= engineAliveMaxFailCount) {
- error(s"The engineRef[${engine.getEngineRefId}] is marked as not
alive "
- + s"due to a lack of recent successful alive probes. "
- + s"The time since last successful probe: "
- + s"${now - engineLastAlive} ms exceeds the timeout of
$engineAliveTimeout ms. "
- + s"The engine has failed $engineAliveFailCount times, "
- + s"surpassing the maximum failure count of
$engineAliveMaxFailCount.")
- false
- } else {
- warn(
- s"The engineRef[${engine.getEngineRefId}] alive probe fails, " +
- s"${now - engineLastAlive} ms exceeds timeout
$engineAliveTimeout ms, " +
- s"and has failed $engineAliveFailCount times.",
- e)
- true
- }
- }
+ if (Option(client).exists(_.engineConnectionClosed)) return false
+ !client.remoteEngineBroken
}
}