This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 9c18a7e38 [KYUUBI #6130] Stop engine immediately after close session
for `CONNECTION` level FlinkSQLEngine
9c18a7e38 is described below
commit 9c18a7e38d3ea650d8ae3f57cd8c9600b82bea36
Author: wforget <[email protected]>
AuthorDate: Fri Mar 8 20:40:05 2024 +0800
[KYUUBI #6130] Stop engine immediately after close session for `CONNECTION`
level FlinkSQLEngine
# :mag: Description
## Issue References ๐
This pull request fixes #6130
## Describe Your Solution ๐ง
Stop engine immediately after close session for `CONNECTION` level
FlinkSQLEngine
## Types of changes :bookmark:
- [X] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6132 from wForget/KYUUBI-6130.
Closes #6130
033c0fff5 [wforget] assert exception
c0ce68e35 [wforget] debug
07e0320af [wforget] add test
a3c4ae319 [wforget] [KYUUBI-6130] Stop engine immediately after close
session for `CONNECTION` level FlinkSQLEngine
Authored-by: wforget <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../flink/session/FlinkSQLSessionManager.scala | 26 ++++++++++++---
.../operation/FlinkEngineInitializeSuite.scala | 38 ++++++++++++++++++++--
2 files changed, 57 insertions(+), 7 deletions(-)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index 8627e5a24..1bb82a0ef 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -24,7 +24,10 @@ import
org.apache.flink.table.gateway.api.session.SessionEnvironment
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion
import org.apache.flink.table.gateway.service.context.DefaultContext
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.flink.FlinkSQLEngine
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
import org.apache.kyuubi.engine.flink.shim.FlinkSessionManager
import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
@@ -35,6 +38,8 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
override protected def isServer: Boolean = false
+ private lazy val shareLevel =
ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL))
+
val operationManager = new FlinkSQLOperationManager()
val sessionManager = new FlinkSessionManager(engineContext)
@@ -77,10 +82,23 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
- val fSession = super.getSessionOption(sessionHandle)
- fSession.foreach(s =>
-
sessionManager.closeSession(s.asInstanceOf[FlinkSessionImpl].fSession.getSessionHandle))
- super.closeSession(sessionHandle)
+ try {
+ val fSession = super.getSessionOption(sessionHandle)
+ fSession.foreach(s =>
+
sessionManager.closeSession(s.asInstanceOf[FlinkSessionImpl].fSession.getSessionHandle))
+ super.closeSession(sessionHandle)
+ } catch {
+ case t: Throwable =>
+ warn(s"Error closing session $sessionHandle", t)
+ }
+ if (shareLevel == ShareLevel.CONNECTION) {
+ info("Session stopped due to shared level is Connection.")
+ stopSession()
+ }
+ }
+
+ private def stopSession(): Unit = {
+ FlinkSQLEngine.currentEngine.foreach(_.stop())
}
override def stop(): Unit = synchronized {
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
index c98d07cc4..f852ad4db 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala
@@ -19,14 +19,18 @@ package org.apache.kyuubi.engine.flink.operation
import java.util.UUID
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar._
+
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.ShareLevel.ShareLevel
import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine,
WithFlinkSQLEngineLocal}
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID,
HA_NAMESPACE}
import org.apache.kyuubi.operation.{HiveJDBCTestHelper, NoneMode}
-class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
+trait FlinkEngineInitializeSuite extends HiveJDBCTestHelper
with WithDiscoveryFlinkSQLEngine with WithFlinkSQLEngineLocal {
protected def jdbcUrl: String = getFlinkEngineServiceUrl
@@ -51,7 +55,7 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
HA_NAMESPACE.key -> namespace,
HA_ENGINE_REF_ID.key -> engineRefId,
ENGINE_TYPE.key -> "FLINK_SQL",
- ENGINE_SHARE_LEVEL.key -> shareLevel,
+ ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name,
ENGINE_FLINK_INITIALIZE_SQL.key -> ENGINE_INITIALIZE_SQL_VALUE,
ENGINE_SESSION_FLINK_INITIALIZE_SQL.key ->
ENGINE_SESSION_INITIALIZE_SQL_VALUE,
@@ -62,7 +66,7 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
def namespace: String = "/kyuubi/flink-local-engine-test"
- def shareLevel: String = ShareLevel.USER.toString
+ def shareLevel: ShareLevel
def engineType: String = "flink"
@@ -100,5 +104,33 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
assert(dropResult.next())
assert(dropResult.getString(1) === "OK")
}
+ // check engine alive status after close session with connection level
engine
+ if (shareLevel == ShareLevel.CONNECTION) {
+ eventually(Timeout(10.seconds)) {
+ assert(!engineProcess.isAlive)
+ }
+ val e = intercept[Exception] {
+ withJdbcStatement() { statement =>
+ statement.executeQuery("select 1")
+ }
+ }
+ assert(e.getMessage() == "Time out retrieving Flink engine service url.")
+ }
+ // check engine alive status after close session with user level engine
+ if (shareLevel == ShareLevel.USER) {
+ assert(engineProcess.isAlive)
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("select 1")
+ assert(resultSet.next())
+ }
+ }
}
}
+
+class FlinkConnectionLevelEngineInitializeSuite extends
FlinkEngineInitializeSuite {
+ def shareLevel: ShareLevel = ShareLevel.CONNECTION
+}
+
+class FlinkUserLevelEngineInitializeSuite extends FlinkEngineInitializeSuite {
+ def shareLevel: ShareLevel = ShareLevel.USER
+}