This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.9 by this push:
new 494c65bcc [KYUUBI #6172][TASK][EASY] Support to interrupt the thrift
request immediately after marking the engine not alive
494c65bcc is described below
commit 494c65bcc590b61634eabe208dd090d3eca685f2
Author: wangjunbo <[email protected]>
AuthorDate: Thu May 9 08:46:16 2024 -0700
[KYUUBI #6172][TASK][EASY] Support to interrupt the thrift request
immediately after marking the engine not alive
Support to interrupt the thrift request immediately after marking the
engine not alive
# :mag: Description
## Issue References ๐
This pull request fixes #6172
## Describe Your Solution ๐ง
https://github.com/apache/kyuubi/blob/12c5568c9b020b1212c9514f046c67fcb267467e/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala#L103-L110
When probe fails and exceeds engineAliveTimeout, not interrupt the thrift
request immediately, only marked `remoteEngineBroken` and wait next
`engineAliveProbeInterval` to interrupt.
Unit test `KyuubiOperationPerConnectionSuite` assert timeout 3s.
https://github.com/apache/kyuubi/blob/12c5568c9b020b1212c9514f046c67fcb267467e/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala#L344-L346
Exception log
```
03:25:15.125
engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:BD 23 DE B6
16 56 4E 2B 97 3C 09 74 89 F5 C9 26, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7
08 ED 8F 38)): Thread-11886 WARN KyuubiSyncThriftClient: The
engine[local-1714879506640] alive probe fails
org.apache.kyuubi.shaded.thrift.transport.TTransportException: Socket is
closed by peer.
...
03:25:16.126
engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:BD 23 DE B6
16 56 4E 2B 97 3C 09 74 89 F5 C9 26, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7
08 ED 8F 38)): Thread-11886 WARN KyuubiSyncThriftClient: The
engine[local-1714879506640] alive probe fails
org.apache.kyuubi.shaded.thrift.transport.TTransportException:
java.net.SocketException: Broken pipe (Write failed)
...
03:25:16.126
engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:BD 23 DE B6
16 56 4E 2B 97 3C 09 74 89 F5 C9 26, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7
08 ED 8F 38)): Thread-11886 ERROR KyuubiSyncThriftClient: Mark the
engine[local-1714879506640] not alive with no recent alive probe success: 2001
ms exceeds timeout 1000 ms
```
Success log
```
16:57:46.859
engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:B4 9D 71 83
6D 15 4D 8D BE DA 65 75 27 5D 4E D8, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7
08 ED 8F 38)): Thread-11609 WARN KyuubiSyncThriftClient: The
engine[local-1715101059872] alive probe fails
...
16:57:46.860
engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:B4 9D 71 83
6D 15 4D 8D BE DA 65 75 27 5D 4E D8, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7
08 ED 8F 38)): Thread-11609 ERROR KyuubiSyncThriftClient: Mark the
engine[local-1715101059872] not alive with no recent alive probe success: 1001
ms exceeds timeout 1000 ms
16:57:47.860
engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:B4 9D 71 83
6D 15 4D 8D BE DA 65 75 27 5D 4E D8, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7
08 ED 8F 38)): Thread-11609 WARN KyuubiSyncThriftClient: Removing Clients for
TSessionHandle(sessionId:THandleIdentifier(guid:9D AA D5 C2 9B E4 43 D7 BE 81
D0 99 EA 5B 9E 37, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 08 ED 8F 38))
```
## Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6375 from beryllw/kyuubi_6172.
Closes #6172
991798b86 [wangjunbo] [KYUUBI #6172][TASK][EASY] Support to interrupt the
thrift request immediately after marking the engine not alive
Authored-by: wangjunbo <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit 1fc2b3519aa00269508cf064fef4dd0593cb986a)
Signed-off-by: Wang, Fei <[email protected]>
---
.../kyuubi/client/KyuubiSyncThriftClient.scala | 32 +++++++++++++---------
1 file changed, 19 insertions(+), 13 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 755e55d6d..c5ef66f5e 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -87,6 +87,23 @@ class KyuubiSyncThriftClient private (
private def startEngineAliveProbe(): Unit = {
engineAliveThreadPool = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"engine-alive-probe-" + _aliveProbeSessionHandle)
+
+ def closeClient(): Unit = {
+ warn(s"Removing Clients for ${_remoteSessionHandle}")
+ Seq(protocol).union(engineAliveProbeProtocol.toSeq).foreach { tProtocol
=>
+ Utils.tryLogNonFatalError {
+ if (tProtocol.getTransport.isOpen) {
+ tProtocol.getTransport.close()
+ }
+ }
+ }
+ clientClosedByAliveProbe = true
+ shutdownAsyncRequestExecutor()
+ Option(engineAliveThreadPool).foreach { pool =>
+ ThreadUtils.shutdown(pool, Duration(engineAliveProbeInterval,
TimeUnit.MILLISECONDS))
+ }
+ }
+
val task = new Runnable {
override def run(): Unit = {
if (!remoteEngineBroken && !engineConnectionClosed) {
@@ -108,23 +125,12 @@ class KyuubiSyncThriftClient private (
error(s"Mark the engine[$engineIdStr] not alive with no
recent alive probe" +
s" success: ${now - engineLastAlive} ms exceeds timeout
$engineAliveTimeout ms")
remoteEngineBroken = true
+ closeClient()
}
}
}
} else {
- warn(s"Removing Clients for ${_remoteSessionHandle}")
- Seq(protocol).union(engineAliveProbeProtocol.toSeq).foreach {
tProtocol =>
- Utils.tryLogNonFatalError {
- if (tProtocol.getTransport.isOpen) {
- tProtocol.getTransport.close()
- }
- }
- }
- clientClosedByAliveProbe = true
- shutdownAsyncRequestExecutor()
- Option(engineAliveThreadPool).foreach { pool =>
- ThreadUtils.shutdown(pool, Duration(engineAliveProbeInterval,
TimeUnit.MILLISECONDS))
- }
+ closeClient()
}
}
}