This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 47562b464 [KYUUBI #4847][FOLLOWUP] Close the session immediately when
engine connection closed
47562b464 is described below
commit 47562b464bd4b7a384ee219aabc5fe7b4e1aa364
Author: fwang12 <[email protected]>
AuthorDate: Mon Jul 10 12:28:10 2023 +0800
[KYUUBI #4847][FOLLOWUP] Close the session immediately when engine
connection closed
### _Why are the changes needed?_
If the session between kyuubi server and kyuubi engine has been inactive,
we should close the kyuubi session as well.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #5031 from turboFei/close_session_inactive.
Closes #4847
2eea080b5 [fwang12] fix
964ead778 [fwang12] check engine connection alive
62642f734 [fwang12] save
Authored-by: fwang12 <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../org/apache/kyuubi/client/KyuubiSyncThriftClient.scala | 6 ++++--
.../scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala | 3 ++-
.../scala/org/apache/kyuubi/session/KyuubiSessionManager.scala | 10 +++++++---
3 files changed, 13 insertions(+), 6 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 6cace4452..ad7191c09 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -52,6 +52,8 @@ class KyuubiSyncThriftClient private (
@volatile private var _engineUrl: Option[String] = _
@volatile private var _engineName: Option[String] = _
+ private[kyuubi] def engineConnectionClosed: Boolean =
!protocol.getTransport.isOpen
+
private val lock = new ReentrantLock()
// Visible for testing.
@@ -84,7 +86,7 @@ class KyuubiSyncThriftClient private (
"engine-alive-probe-" + _aliveProbeSessionHandle)
val task = new Runnable {
override def run(): Unit = {
- if (!remoteEngineBroken && protocol.getTransport.isOpen) {
+ if (!remoteEngineBroken && !engineConnectionClosed) {
engineAliveProbeClient.foreach { client =>
val tGetInfoReq = new TGetInfoReq()
tGetInfoReq.setSessionHandle(_aliveProbeSessionHandle)
@@ -134,7 +136,7 @@ class KyuubiSyncThriftClient private (
* Lock every rpc call to send them sequentially
*/
private def withLockAcquired[T](block: => T): T =
Utils.withLockRequired(lock) {
- if (!protocol.getTransport.isOpen) {
+ if (engineConnectionClosed) {
throw KyuubiSQLException.connectionDoesNotExist()
}
block
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 237eb3ca6..809be86f3 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -291,8 +291,9 @@ class KyuubiSessionImpl(
var engineAliveMaxFailCount = 3
var engineAliveFailCount = 0
- def checkEngineAlive(): Boolean = {
+ def checkEngineConnectionAlive(): Boolean = {
try {
+ if (Option(client).exists(_.engineConnectionClosed)) return false
if (!aliveProbeEnabled) return true
getInfo(TGetInfoType.CLI_DBMS_VER)
engineLastAlive = System.currentTimeMillis()
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 6e74bedbf..d2547bca9 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -63,7 +63,7 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
private var batchLimiter: Option[SessionLimiter] = None
lazy val (signingPrivateKey, signingPublicKey) = SignUtils.generateKeyPair()
- private val engineAliveChecker =
+ private val engineConnectionAliveChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker")
override def initialize(conf: KyuubiConf): Unit = {
@@ -350,7 +350,7 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
val interval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL)
val checkTask: Runnable = () => {
allSessions().foreach { session =>
- if (!session.asInstanceOf[KyuubiSessionImpl].checkEngineAlive()) {
+ if
(!session.asInstanceOf[KyuubiSessionImpl].checkEngineConnectionAlive()) {
try {
closeSession(session.handle)
logger.info(s"The session ${session.handle} has been closed " +
@@ -362,7 +362,11 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
}
}
}
- engineAliveChecker.scheduleWithFixedDelay(checkTask, interval, interval,
TimeUnit.MILLISECONDS)
+ engineConnectionAliveChecker.scheduleWithFixedDelay(
+ checkTask,
+ interval,
+ interval,
+ TimeUnit.MILLISECONDS)
}
}