This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 93f13ef66 [KYUUBI #2419] Release engine during closing kyuubi server 
session if share level is connection
93f13ef66 is described below

commit 93f13ef662f497a0e02281cf09cbf8cd0d8577fd
Author: wforget <[email protected]>
AuthorDate: Thu Apr 28 11:20:19 2022 +0800

    [KYUUBI #2419] Release engine during closing kyuubi server session if share 
level is connection
    
    ### _Why are the changes needed?_
    
    close #2419
    
    We need to clean up the ProcBuilder process and engine application when the 
session is closed.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2482 from wForget/KYUUBI-2419.
    
    Closes #2419
    
    2690b5dc [wforget] comment
    ae8be05b [wforget] revert BatchJobSubmission
    4fe3c2f9 [wforget] [KYUUBI-2419] Destroy the ProcBuilder process and call 
killApplication during closing session
    
    Authored-by: wforget <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../main/scala/org/apache/kyuubi/engine/EngineRef.scala | 17 ++++++++++++++++-
 .../scala/org/apache/kyuubi/engine/ProcBuilder.scala    |  4 ++--
 .../org/apache/kyuubi/session/KyuubiSessionImpl.scala   |  1 +
 3 files changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index a109d847e..5db3b63d7 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -71,6 +71,8 @@ private[kyuubi] class EngineRef(
 
   private val clientPoolName: String = conf.get(ENGINE_POOL_NAME)
 
+  private var builder: ProcBuilder = _
+
   @VisibleForTesting
   private[kyuubi] val subdomain: String = 
conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN) match {
     case Some(_subdomain) => _subdomain
@@ -162,7 +164,7 @@ private[kyuubi] class EngineRef(
     conf.set(HA_ZK_ENGINE_REF_ID, engineRefId)
     val started = System.currentTimeMillis()
     conf.set(KYUUBI_ENGINE_SUBMIT_TIME_KEY, String.valueOf(started))
-    val builder = engineType match {
+    builder = engineType match {
       case SPARK_SQL =>
         conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
         new SparkProcessBuilder(appUser, conf, extraEngineLog)
@@ -230,4 +232,17 @@ private[kyuubi] class EngineRef(
         create(discoveryClient, extraEngineLog)
       }
   }
+
+  def close(): Unit = {
+    if (shareLevel == CONNECTION && builder != null) {
+      try {
+        val clusterManager = builder.clusterManager()
+        builder.close(true)
+        engineManager.killApplication(clusterManager, engineRefId)
+      } catch {
+        case e: Exception =>
+          warn(s"Error closing engine builder, engineRefId: $engineRefId", e)
+      }
+    }
+  }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 7b7ff744d..4bc59af95 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -237,12 +237,12 @@ trait ProcBuilder {
     process
   }
 
-  def close(): Unit = synchronized {
+  def close(destroyProcess: Boolean = !waitCompletion): Unit = synchronized {
     if (logCaptureThread != null) {
       logCaptureThread.interrupt()
       logCaptureThread = null
     }
-    if (!waitCompletion && process != null) {
+    if (destroyProcess && process != null) {
       info("Destroy the process, since waitCompletion is false.")
       process.destroyForcibly()
       process = null
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 08f5d9499..c0c5b2705 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -160,6 +160,7 @@ class KyuubiSessionImpl(
     try {
       if (_client != null) _client.closeSession()
     } finally {
+      if (engine != null) engine.close()
       sessionEvent.endTime = System.currentTimeMillis()
       EventBus.post(sessionEvent)
       MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))

Reply via email to