This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 93f13ef66 [KYUUBI #2419] Release engine during closing kyuubi server
session if share level is connection
93f13ef66 is described below
commit 93f13ef662f497a0e02281cf09cbf8cd0d8577fd
Author: wforget <[email protected]>
AuthorDate: Thu Apr 28 11:20:19 2022 +0800
[KYUUBI #2419] Release engine during closing kyuubi server session if share
level is connection
### _Why are the changes needed?_
close #2419
We need to clean up the ProcBuilder process and engine application when the
session is closed.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2482 from wForget/KYUUBI-2419.
Closes #2419
2690b5dc [wforget] comment
ae8be05b [wforget] revert BatchJobSubmission
4fe3c2f9 [wforget] [KYUUBI-2419] Destroy the ProcBuilder process and call
killApplication during closing session
Authored-by: wforget <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../main/scala/org/apache/kyuubi/engine/EngineRef.scala | 17 ++++++++++++++++-
.../scala/org/apache/kyuubi/engine/ProcBuilder.scala | 4 ++--
.../org/apache/kyuubi/session/KyuubiSessionImpl.scala | 1 +
3 files changed, 19 insertions(+), 3 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index a109d847e..5db3b63d7 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -71,6 +71,8 @@ private[kyuubi] class EngineRef(
private val clientPoolName: String = conf.get(ENGINE_POOL_NAME)
+ private var builder: ProcBuilder = _
+
@VisibleForTesting
private[kyuubi] val subdomain: String =
conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN) match {
case Some(_subdomain) => _subdomain
@@ -162,7 +164,7 @@ private[kyuubi] class EngineRef(
conf.set(HA_ZK_ENGINE_REF_ID, engineRefId)
val started = System.currentTimeMillis()
conf.set(KYUUBI_ENGINE_SUBMIT_TIME_KEY, String.valueOf(started))
- val builder = engineType match {
+ builder = engineType match {
case SPARK_SQL =>
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
new SparkProcessBuilder(appUser, conf, extraEngineLog)
@@ -230,4 +232,17 @@ private[kyuubi] class EngineRef(
create(discoveryClient, extraEngineLog)
}
}
+
+ def close(): Unit = {
+ if (shareLevel == CONNECTION && builder != null) {
+ try {
+ val clusterManager = builder.clusterManager()
+ builder.close(true)
+ engineManager.killApplication(clusterManager, engineRefId)
+ } catch {
+ case e: Exception =>
+ warn(s"Error closing engine builder, engineRefId: $engineRefId", e)
+ }
+ }
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 7b7ff744d..4bc59af95 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -237,12 +237,12 @@ trait ProcBuilder {
process
}
- def close(): Unit = synchronized {
+ def close(destroyProcess: Boolean = !waitCompletion): Unit = synchronized {
if (logCaptureThread != null) {
logCaptureThread.interrupt()
logCaptureThread = null
}
- if (!waitCompletion && process != null) {
+ if (destroyProcess && process != null) {
info("Destroy the process, since waitCompletion is false.")
process.destroyForcibly()
process = null
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 08f5d9499..c0c5b2705 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -160,6 +160,7 @@ class KyuubiSessionImpl(
try {
if (_client != null) _client.closeSession()
} finally {
+ if (engine != null) engine.close()
sessionEvent.endTime = System.currentTimeMillis()
EventBus.post(sessionEvent)
MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))