This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e6d645e3 [KYUUBI #2445] Implement ApplicationManager and Yarn/ 
JPS-local Application Operation
5e6d645e3 is described below

commit 5e6d645e3e99034692e461f953631124bee381e6
Author: Kent Yao <[email protected]>
AuthorDate: Mon Apr 25 10:11:08 2022 +0800

    [KYUUBI #2445] Implement ApplicationManager and Yarn/ JPS-local Application 
Operation
    
    ### _Why are the changes needed?_
    
    Add KyuubiApplicationManager in SessionManager for application management, 
currently support kill and get application information.
    
    The underlying cluster manager operation added in this PR are
    - local jps with SIG TERM KILL
    - YARN-client
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2447 from yaooqinn/2445.
    
    Closes #2445
    
    77810390 [Kent Yao] address comment
    aed3f251 [Kent Yao] address comment
    0fad9419 [Kent Yao] address comment
    67ec2500 [Kent Yao] address comment
    800bedb5 [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/ 
JPS-local Application Operation
    1a4084ab [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/ 
JPS-local Application Operation
    be58583a [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/ 
JPS-local Application Operation
    e75e20e7 [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/ 
JPS-local Application Operation
    baac7f04 [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/ 
JPS-local Application Operation
    e3f5c29a [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/ 
JPS-local Application Operation
    c81e5635 [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/ 
JPS-local Application Operation
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 .../org/apache/kyuubi/util/KyuubiHadoopUtils.scala |   5 +
 .../org.apache.kyuubi.engine.ApplicationOperation  |  19 +++
 .../kyuubi/engine/ApplicationOperation.scala       |  75 ++++++++++++
 .../scala/org/apache/kyuubi/engine/EngineRef.scala |  30 ++---
 .../kyuubi/engine/JpsApplicationOperation.scala    |  92 +++++++++++++++
 .../kyuubi/engine/KyuubiApplicationManager.scala   | 129 +++++++++++++++++++++
 .../org/apache/kyuubi/engine/ProcBuilder.scala     |  12 +-
 .../kyuubi/engine/YarnApplicationOperation.scala   | 107 +++++++++++++++++
 .../kyuubi/engine/flink/FlinkProcessBuilder.scala  |  33 +-----
 .../kyuubi/engine/hive/HiveProcessBuilder.scala    |   2 +-
 .../scala/org/apache/kyuubi/engine/package.scala   |  26 +++++
 .../engine/spark/SparkBatchProcessBuilder.scala    |  24 +---
 .../kyuubi/engine/spark/SparkProcessBuilder.scala  |  65 +++--------
 .../kyuubi/engine/trino/TrinoProcessBuilder.scala  |   2 +-
 .../kyuubi/operation/BatchJobSubmission.scala      |  95 ++++++---------
 .../kyuubi/session/KyuubiBatchSessionImpl.scala    |   1 -
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  |   3 +-
 .../kyuubi/session/KyuubiSessionManager.scala      |   3 +
 .../org/apache/kyuubi/WithKyuubiServerOnYarn.scala |  24 ++--
 .../org/apache/kyuubi/engine/EngineRefSuite.scala  |  30 ++---
 .../engine/JpsApplicationOperationSuite.scala      |  98 ++++++++++++++++
 .../engine/flink/FlinkProcessBuilderSuite.scala    |  21 ----
 .../spark/SparkBatchProcessBuilderSuite.scala      |  78 -------------
 .../spark/SparkProcessBuilderOnYarnSuite.scala     |  71 ------------
 .../operation/KyuubiBatchYarnClusterSuite.scala    |  53 ++++++---
 .../KyuubiOperationYarnClusterSuite.scala          |  11 --
 .../org/apache/kyuubi/server/MiniYarnService.scala |  35 ++----
 27 files changed, 704 insertions(+), 440 deletions(-)

diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
index 85a7d65cb..5ce2c08d7 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
@@ -27,6 +27,7 @@ import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
 import org.apache.hadoop.io.Text
 import org.apache.hadoop.security.{Credentials, SecurityUtil, 
UserGroupInformation}
 import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.apache.hadoop.yarn.conf.YarnConfiguration
 
 import org.apache.kyuubi.config.KyuubiConf
 
@@ -49,6 +50,10 @@ object KyuubiHadoopUtils {
     hadoopConf
   }
 
+  def newYarnConfiguration(conf: KyuubiConf): YarnConfiguration = {
+    new YarnConfiguration(newHadoopConf(conf))
+  }
+
   def getServerPrincipal(principal: String): String = {
     SecurityUtil.getServerPrincipal(principal, "0.0.0.0")
   }
diff --git 
a/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.engine.ApplicationOperation
 
b/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.engine.ApplicationOperation
new file mode 100644
index 000000000..90fa759e8
--- /dev/null
+++ 
b/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.engine.ApplicationOperation
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.kyuubi.engine.YarnApplicationOperation
+org.apache.kyuubi.engine.JpsApplicationOperation
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
new file mode 100644
index 000000000..fc083a336
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import org.apache.kyuubi.config.KyuubiConf
+
+trait ApplicationOperation {
+
+  /**
+   * Step for initializing the instance.
+   */
+  def initialize(conf: KyuubiConf): Unit
+
+  /**
+   * Step to clean up the instance
+   */
+  def stop(): Unit
+
+  /**
+   * Called before other method to do a quick skip
+   *
+   * @param clusterManager the underlying cluster manager or just local 
instance
+   */
+  def isSupported(clusterManager: Option[String]): Boolean
+
+  /**
+   * Kill the app/engine by the unique application tag
+   *
+   * @param tag the unique application tag for engine instance.
+   *            For example,
+   *            if the Hadoop Yarn is used, for spark applications,
+   *            the tag will be preset via spark.yarn.tags
+   * @return a message contains response describing how the kill process.
+   *
+   * @note For implementations, please suppress exceptions and always return 
KillResponse
+   */
+  def killApplicationByTag(tag: String): KillResponse
+
+  /**
+   * Get the engine/application status by the unique application tag
+   *
+   * @param tag the unique application tag for engine instance.
+   * @return a map contains the application status
+   */
+  def getApplicationInfoByTag(tag: String): Map[String, String]
+}
+
+object ApplicationOperation {
+
+  /**
+   * identifier determined by cluster manager for the engine
+   */
+  val APP_ID_KEY = "id"
+  val APP_NAME_KEY = "name"
+  val APP_STATE_KEY = "state"
+  val APP_URL_KEY = "url"
+  val APP_ERROR_KEY = "error"
+
+  val NOT_FOUND = "APPLICATION_NOT_FOUND"
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index ff5ea0063..a109d847e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -17,7 +17,6 @@
 
 package org.apache.kyuubi.engine
 
-import java.util.UUID
 import java.util.concurrent.TimeUnit
 
 import scala.util.Random
@@ -36,10 +35,8 @@ import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
 import org.apache.kyuubi.engine.hive.HiveProcessBuilder
 import org.apache.kyuubi.engine.spark.SparkProcessBuilder
 import org.apache.kyuubi.engine.trino.TrinoProcessBuilder
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
-import org.apache.kyuubi.ha.client.DiscoveryClient
-import org.apache.kyuubi.ha.client.DiscoveryPaths
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_ENGINE_REF_ID, 
HA_ZK_NAMESPACE}
+import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryPaths}
 import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, 
ENGINE_TIMEOUT, ENGINE_TOTAL}
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.log.OperationLog
@@ -54,7 +51,8 @@ import org.apache.kyuubi.operation.log.OperationLog
 private[kyuubi] class EngineRef(
     conf: KyuubiConf,
     user: String,
-    engineRefId: String = UUID.randomUUID().toString)
+    engineRefId: String,
+    engineManager: KyuubiApplicationManager)
   extends Logging {
   // The corresponding ServerSpace where the engine belongs to
   private val serverSpace: String = conf.get(HA_ZK_NAMESPACE)
@@ -167,24 +165,21 @@ private[kyuubi] class EngineRef(
     val builder = engineType match {
       case SPARK_SQL =>
         conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
-        // tag is a seq type with comma-separated
-        conf.set(
-          SparkProcessBuilder.TAG_KEY,
-          conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + 
",").getOrElse("") +
-            "KYUUBI," + engineRefId)
         new SparkProcessBuilder(appUser, conf, extraEngineLog)
       case FLINK_SQL =>
         conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName)
-        // tag is a seq type with comma-separated
-        conf.set(
-          FlinkProcessBuilder.TAG_KEY,
-          conf.getOption(FlinkProcessBuilder.TAG_KEY).map(_ + 
",").getOrElse("") + "KYUUBI")
         new FlinkProcessBuilder(appUser, conf, extraEngineLog)
       case TRINO =>
         new TrinoProcessBuilder(appUser, conf, extraEngineLog)
       case HIVE_SQL =>
         new HiveProcessBuilder(appUser, conf, extraEngineLog)
     }
+    // TODO: Better to do this inside ProcBuilder
+    KyuubiApplicationManager.tagApplication(
+      engineRefId,
+      builder.shortName,
+      builder.clusterManager(),
+      builder.conf)
 
     MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
     try {
@@ -204,10 +199,7 @@ private[kyuubi] class EngineRef(
           }
         }
         if (started + timeout <= System.currentTimeMillis()) {
-          val killMessage = engineType match {
-            case SPARK_SQL => builder.killApplication(Left(engineRefId))
-            case _ => builder.killApplication()
-          }
+          val killMessage = 
engineManager.killApplication(builder.clusterManager(), engineRefId)
           process.destroyForcibly()
           MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, 
appUser)))
           throw KyuubiSQLException(
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
new file mode 100644
index 000000000..40dd8ba74
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import java.nio.file.Paths
+
+import scala.sys.process._
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.ApplicationOperation.NOT_FOUND
+
+class JpsApplicationOperation extends ApplicationOperation {
+
+  private var runner: String = _
+
+  override def initialize(conf: KyuubiConf): Unit = {
+    val jps = sys.env.get("JAVA_HOME").orElse(sys.props.get("java.home"))
+      .map(Paths.get(_, "bin", "jps").toString)
+      .getOrElse("jps")
+    runner =
+      try {
+        jps.!!
+      } catch {
+        case _: Throwable => null
+      }
+  }
+
+  override def isSupported(clusterManager: Option[String]): Boolean = {
+    runner != null && (clusterManager.isEmpty || clusterManager.get == "local")
+  }
+
+  private def getEngine(tag: String): Option[String] = {
+    if (runner == null) {
+      None
+    } else {
+      val pb = "jps -ml" #| s"grep $tag"
+      try {
+        pb.lineStream_!.headOption
+      } catch {
+        case _: Throwable => None
+      }
+    }
+  }
+
+  override def killApplicationByTag(tag: String): KillResponse = {
+    val commandOption = getEngine(tag)
+    if (commandOption.nonEmpty) {
+      val idAndCmd = commandOption.get
+      val (id, _) = idAndCmd.splitAt(idAndCmd.indexOf(" "))
+      try {
+        s"kill -15 $id".lineStream
+        (true, s"Succeeded to terminate: $idAndCmd")
+      } catch {
+        case e: Exception =>
+          (false, s"Failed to terminate: $idAndCmd, due to ${e.getMessage}")
+      }
+    } else {
+      (false, NOT_FOUND)
+    }
+  }
+
+  override def getApplicationInfoByTag(tag: String): Map[String, String] = {
+    val commandOption = getEngine(tag)
+    if (commandOption.nonEmpty) {
+      val idAndCmd = commandOption.get
+      val (id, cmd) = idAndCmd.splitAt(idAndCmd.indexOf(" "))
+      Map(
+        "id" -> id,
+        "name" -> cmd,
+        "state" -> "RUNNING")
+    } else {
+      Map("state" -> "FINISHED")
+    }
+  }
+
+  override def stop(): Unit = {}
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
new file mode 100644
index 000000000..4d5e488cb
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import java.util.ServiceLoader
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.service.AbstractService
+
+class KyuubiApplicationManager extends 
AbstractService("KyuubiApplicationManager") {
+
+  // TODO: maybe add a configuration is better
+  private val operations = {
+    ServiceLoader.load(classOf[ApplicationOperation], getClass.getClassLoader)
+      .iterator().asScala.toSeq
+  }
+
+  override def initialize(conf: KyuubiConf): Unit = {
+    operations.foreach { op =>
+      try {
+        op.initialize(conf)
+      } catch {
+        case NonFatal(e) => warn(s"Error starting 
${op.getClass.getSimpleName}: ${e.getMessage}")
+      }
+    }
+    super.initialize(conf)
+  }
+
+  override def stop(): Unit = {
+    operations.foreach { op =>
+      try {
+        op.stop()
+      } catch {
+        case NonFatal(e) => warn(s"Error stopping 
${op.getClass.getSimpleName}: ${e.getMessage}")
+      }
+    }
+    super.stop()
+  }
+
+  def killApplication(resourceManager: Option[String], tag: String): 
KillResponse = {
+    var (killed, lastMessage): KillResponse = (false, null)
+    for (operation <- operations if !killed) {
+      if (operation.isSupported(resourceManager)) {
+        val (k, m) = operation.killApplicationByTag(tag)
+        killed = k
+        lastMessage = m
+      }
+    }
+
+    val finalMessage =
+      if (lastMessage == null) {
+        s"No ${classOf[ApplicationOperation]} Service found in ServiceLoader" +
+          s" for $resourceManager"
+      } else {
+        lastMessage
+      }
+    (killed, finalMessage)
+  }
+
+  def getApplicationInfo(
+      clusterManager: Option[String],
+      tag: String): Option[Map[String, String]] = {
+    
operations.find(_.isSupported(clusterManager)).map(_.getApplicationInfoByTag(tag))
+  }
+}
+
+object KyuubiApplicationManager {
+  private def setupSparkYarnTag(tag: String, conf: KyuubiConf): Unit = {
+    val originalTag = conf.getOption("spark.yarn.tags").map(_ + 
",").getOrElse("")
+    val newTag = s"${originalTag}KYUUBI,$tag"
+    conf.set("spark.yarn.tags", newTag)
+  }
+
+  private def setupSparkK8sTag(tag: String, conf: KyuubiConf): Unit = {
+    conf.set("spark.kubernetes.driver.label.kyuubi_unique_tag", tag)
+  }
+
+  private def setupFlinkK8sTag(tag: String, conf: KyuubiConf): Unit = {
+    // TODO: yarn.tags or flink.yarn.tags, the mess of flink settings confuses 
me now.
+    val originalTag = conf.getOption("yarn.tags").map(_ + ",")
+    val newTag = s"${originalTag}KYUUBI,$tag"
+    conf.set("yarn.tags", newTag)
+  }
+
+  /**
+   * Add a unique tag on the application
+   * @param applicationTag a unique tag to identify application
+   * @param applicationType application short type, e.g. SPARK_SQL SPARK_BATCH 
is SPARK
+   * @param resourceManager yarn, kubernetes(k8s) etc.
+   * @param conf  kyuubi conf instance in session layer
+   */
+  def tagApplication(
+      applicationTag: String,
+      applicationType: String,
+      resourceManager: Option[String],
+      conf: KyuubiConf): Unit = {
+    (applicationType.toUpperCase, resourceManager.map(_.toUpperCase())) match {
+      case ("SPARK", Some("YARN")) => setupSparkYarnTag(applicationTag, conf)
+      case ("SPARK", Some("K8S")) => setupSparkK8sTag(applicationTag, conf)
+      case ("SPARK", _) =>
+        // if the master is not identified ahead, add all tags
+        setupSparkYarnTag(applicationTag, conf)
+        setupSparkK8sTag(applicationTag, conf)
+      case ("FLINK", _) =>
+        // running flink on other platforms is not yet supported
+        setupFlinkK8sTag(applicationTag, conf)
+      // other engine types are running locally yet
+      case _ =>
+    }
+  }
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 24b6f0365..7b7ff744d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -41,7 +41,7 @@ trait ProcBuilder {
    * The short name of the engine process builder, we use this for form the 
engine jar paths now
    * see `mainResource`
    */
-  protected def shortName: String
+  def shortName: String
 
   /**
    * executable, it is `JAVA_HOME/bin/java` by default
@@ -99,7 +99,7 @@ trait ProcBuilder {
 
   protected def commands: Array[String]
 
-  protected def conf: KyuubiConf
+  def conf: KyuubiConf
 
   protected def env: Map[String, String] = conf.getEnvs
 
@@ -237,12 +237,6 @@ trait ProcBuilder {
     process
   }
 
-  /**
-   * Use Left to represent engineRefId and Right to represent line.
-   */
-  def killApplication(clue: Either[String, String] = 
Right(lastRowsOfLog.toArray.mkString("\n")))
-      : String = ""
-
   def close(): Unit = synchronized {
     if (logCaptureThread != null) {
       logCaptureThread.interrupt()
@@ -332,6 +326,8 @@ trait ProcBuilder {
       s"deployment/settings.html#environments")
   }
 
+  def clusterManager(): Option[String] = None
+
 }
 
 object ProcBuilder extends Logging {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
new file mode 100644
index 000000000..9cbdd4452
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.yarn.client.api.YarnClient
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.ApplicationOperation._
+import org.apache.kyuubi.util.KyuubiHadoopUtils
+
+class YarnApplicationOperation extends ApplicationOperation with Logging {
+
+  @volatile private var yarnClient: YarnClient = _
+
+  override def initialize(conf: KyuubiConf): Unit = {
+    val yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf)
+    // YarnClient is thread-safe
+    val c = YarnClient.createYarnClient()
+    c.init(yarnConf)
+    c.start()
+    yarnClient = c
+    info(s"Successfully initialized yarn client: ${c.getServiceState}")
+  }
+
+  override def isSupported(clusterManager: Option[String]): Boolean = {
+    yarnClient != null && clusterManager.nonEmpty && 
"yarn".equalsIgnoreCase(clusterManager.get)
+  }
+
+  override def killApplicationByTag(tag: String): KillResponse = {
+    if (yarnClient != null) {
+      try {
+        val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
+        if (reports.isEmpty) {
+          (false, NOT_FOUND)
+        } else {
+          try {
+            val applicationId = reports.get(0).getApplicationId
+            yarnClient.killApplication(applicationId)
+            (true, s"Succeeded to terminate: $applicationId with $tag")
+          } catch {
+            case e: Exception =>
+              (false, s"Failed to terminate application with $tag, due to 
${e.getMessage}")
+          }
+        }
+      } catch {
+        case e: Exception =>
+          (
+            false,
+            s"Failed to get while terminating application with tag $tag," +
+              s" due to ${e.getMessage}")
+      }
+    } else {
+      throw new IllegalStateException("Methods initialize and isSupported must 
be called ahead")
+    }
+  }
+
+  override def getApplicationInfoByTag(tag: String): Map[String, String] = {
+    if (yarnClient != null) {
+      debug(s"Getting application info from Yarn cluster by $tag tag")
+      val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
+      if (reports.isEmpty) {
+        debug(s"Application with tag $tag not found")
+        null
+      } else {
+        val report = reports.get(0)
+        val res = Map(
+          APP_ID_KEY -> report.getApplicationId.toString,
+          APP_NAME_KEY -> report.getName,
+          APP_STATE_KEY -> report.getYarnApplicationState.toString,
+          APP_URL_KEY -> report.getTrackingUrl,
+          APP_ERROR_KEY -> report.getDiagnostics)
+        debug(s"Successfully got application info by $tag: " + res.mkString(", 
"))
+        res
+      }
+    } else {
+      throw new IllegalStateException("Methods initialize and isSupported must 
be called ahead")
+    }
+  }
+
+  override def stop(): Unit = {
+    if (yarnClient != null) {
+      try {
+        yarnClient.stop()
+      } catch {
+        case e: Exception => error(e.getMessage)
+      }
+    }
+  }
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index d63d073e9..978f9bdd7 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -18,11 +18,8 @@
 package org.apache.kyuubi.engine.flink
 
 import java.io.{File, FilenameFilter}
-import java.lang.ProcessBuilder.Redirect
 import java.nio.file.Paths
 
-import scala.collection.JavaConverters._
-
 import com.google.common.annotations.VisibleForTesting
 
 import org.apache.kyuubi._
@@ -78,34 +75,6 @@ class FlinkProcessBuilder(
 
   override protected def commands: Array[String] = Array(executable)
 
-  override def killApplication(clue: Either[String, String]): String = clue 
match {
-    case Left(_) => ""
-    case Right(line) => killApplicationByLog(line)
-  }
-
-  def killApplicationByLog(line: String = 
lastRowsOfLog.toArray.mkString("\n")): String = {
-    "Job ID: .*".r.findFirstIn(line) match {
-      case Some(jobIdLine) =>
-        val jobId = jobIdLine.split("Job ID: ")(1).trim
-        env.get("FLINK_HOME") match {
-          case Some(flinkHome) =>
-            val pb = new ProcessBuilder("/bin/sh", s"$flinkHome/bin/flink", 
"stop", jobId)
-            pb.environment()
-              .putAll(childProcEnv.asJava)
-            pb.redirectError(Redirect.appendTo(engineLog))
-            pb.redirectOutput(Redirect.appendTo(engineLog))
-            val process = pb.start()
-            process.waitFor() match {
-              case id if id != 0 => s"Failed to kill Application $jobId, 
please kill it manually. "
-              case _ => s"Killed Application $jobId successfully. "
-            }
-          case None =>
-            s"FLINK_HOME is not set! Failed to kill Application $jobId, please 
kill it manually."
-        }
-      case None => ""
-    }
-  }
-
   @VisibleForTesting
   def FLINK_HOME: String = {
     // prepare FLINK_HOME
@@ -137,7 +106,7 @@ class FlinkProcessBuilder(
     }
   }
 
-  override protected def shortName: String = "flink"
+  override def shortName: String = "flink"
 }
 
 object FlinkProcessBuilder {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
index 8f8ad0c33..3aa70800c 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
@@ -104,5 +104,5 @@ class HiveProcessBuilder(
 
   override def toString: String = commands.mkString("\n")
 
-  override protected def shortName: String = "hive"
+  override def shortName: String = "hive"
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/package.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/package.scala
new file mode 100644
index 000000000..e1d5160e1
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/package.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi
+
+package object engine {
+
+  /**
+   * (killed or not, hint message)
+   */
+  type KillResponse = (Boolean, String)
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
index 108f13f64..5a5c23392 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
@@ -17,15 +17,11 @@
 
 package org.apache.kyuubi.engine.spark
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.server.api.v1.BatchRequest
-import org.apache.kyuubi.util.KyuubiHadoopUtils
 
 class SparkBatchProcessBuilder(
     override val proxyUser: String,
@@ -73,23 +69,7 @@ class SparkBatchProcessBuilder(
 
   override protected def module: String = "kyuubi-spark-batch-submit"
 
-  private[kyuubi] def getApplicationIdAndUrl(): Option[(String, String)] = {
-    
batchRequest.conf.get(MASTER_KEY).orElse(getSparkDefaultsConf().get(MASTER_KEY))
 match {
-      case Some("yarn") =>
-        val yarnClient = getYarnClient
-        val yarnConf = new 
YarnConfiguration(KyuubiHadoopUtils.newHadoopConf(conf))
-        yarnClient.init(yarnConf)
-        yarnClient.start()
-        try {
-          val apps = yarnClient.getApplications(null, null, 
Set(batchId).asJava)
-          apps.asScala.headOption.map { applicationReport =>
-            applicationReport.getApplicationId.toString -> 
applicationReport.getTrackingUrl
-          }
-        } finally {
-          yarnClient.stop()
-        }
-
-      case _ => None // TODO: Support other resource manager
-    }
+  override def clusterManager(): Option[String] = {
+    batchRequest.conf.get(MASTER_KEY).orElse(defaultMaster)
   }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index e249846a8..0dc9b09d5 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -20,22 +20,16 @@ package org.apache.kyuubi.engine.spark
 import java.io.{File, IOException}
 import java.nio.file.Paths
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api.records.ApplicationId
-import org.apache.hadoop.yarn.client.api.YarnClient
-import org.apache.hadoop.yarn.conf.YarnConfiguration
 
 import org.apache.kyuubi._
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INIT_TIMEOUT, ENGINE_TYPE}
 import org.apache.kyuubi.engine.ProcBuilder
 import org.apache.kyuubi.ha.HighAvailabilityConf
 import org.apache.kyuubi.ha.client.AuthTypes
 import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.util.KyuubiHadoopUtils
 
 class SparkProcessBuilder(
     override val proxyUser: String,
@@ -45,8 +39,6 @@ class SparkProcessBuilder(
 
   import SparkProcessBuilder._
 
-  def getYarnClient: YarnClient = YarnClient.createYarnClient
-
   private val sparkHome = getEngineHome(shortName)
 
   override protected val executable: String = {
@@ -139,49 +131,28 @@ class SparkProcessBuilder(
     }
   }
 
-  override def killApplication(clue: Either[String, String]): String = clue 
match {
-    case Left(engineRefId) => killApplicationByTag(engineRefId)
-    case Right(_) => ""
-  }
+  override def shortName: String = "spark"
 
-  private def killApplicationByTag(engineRefId: String): String = {
-    conf.getOption(MASTER_KEY).orElse(getSparkDefaultsConf().get(MASTER_KEY)) 
match {
-      case Some("yarn") =>
-        var applicationId: ApplicationId = null
-        val yarnClient = getYarnClient
-        try {
-          val yarnConf = new 
YarnConfiguration(KyuubiHadoopUtils.newHadoopConf(conf))
-          yarnClient.init(yarnConf)
-          yarnClient.start()
-          val apps = yarnClient.getApplications(null, null, 
Set(engineRefId).asJava)
-          if (apps.isEmpty) return s"There are no Application tagged with 
$engineRefId," +
-            s" please kill it manually."
-          applicationId = apps.asScala.head.getApplicationId
-          yarnClient.killApplication(
-            applicationId,
-            s"Kyuubi killed this caused by: 
Timeout(${conf.get(ENGINE_INIT_TIMEOUT)} ms) to" +
-              s" launched ${conf.get(ENGINE_TYPE)} engine with $this.")
-          s"Killed Application $applicationId tagged with $engineRefId 
successfully."
-        } catch {
-          case e: Throwable =>
-            s"Failed to kill Application $applicationId tagged with 
$engineRefId," +
-              s" please kill it manually. Caused by ${e.getMessage}."
-        } finally {
-          yarnClient.stop()
+  protected lazy val defaultMaster: Option[String] = {
+    val confDir = env.getOrElse(SPARK_CONF_DIR, 
s"$sparkHome${File.separator}conf")
+    val defaults =
+      try {
+        val confFile = new 
File(s"$confDir${File.separator}$SPARK_CONF_FILE_NAME")
+        if (confFile.exists()) {
+          Utils.getPropertiesFromFile(Some(confFile))
+        } else {
+          Map.empty[String, String]
         }
-      case _ => "Kill Application only works with YARN, please kill it 
manually." +
-          s" Application tagged with $engineRefId"
-    }
+      } catch {
+        case _: Exception =>
+          warn(s"Failed to load spark configurations from $confDir")
+          Map.empty[String, String]
+      }
+    defaults.get(MASTER_KEY)
   }
 
-  override protected def shortName: String = "spark"
-
-  protected def getSparkDefaultsConf(): Map[String, String] = {
-    val sparkDefaultsConfFile = env.get(SPARK_CONF_DIR)
-      .orElse(Option(s"$sparkHome${File.separator}conf"))
-      .map(_ + File.separator + SPARK_CONF_FILE_NAME)
-      .map(new File(_)).filter(_.exists())
-    Utils.getPropertiesFromFile(sparkDefaultsConfFile)
+  override def clusterManager(): Option[String] = {
+    conf.getOption(MASTER_KEY).orElse(defaultMaster)
   }
 }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala
index 70cecb8f5..de3cbc5ca 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala
@@ -96,7 +96,7 @@ class TrinoProcessBuilder(
     buffer.toArray
   }
 
-  override protected def shortName: String = "trino"
+  override def shortName: String = "trino"
 
   override def toString: String = commands.mkString("\n")
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index d8e901187..9dc2c7a80 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -26,12 +26,12 @@ import org.apache.hive.service.rpc.thrift._
 
 import org.apache.kyuubi.KyuubiException
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.ProcBuilder
+import org.apache.kyuubi.engine.{ApplicationOperation, ProcBuilder}
 import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
-import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, 
FETCH_PRIOR, FetchOrientation}
+import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.server.api.v1.BatchRequest
-import org.apache.kyuubi.session.KyuubiBatchSessionImpl
+import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
 import org.apache.kyuubi.util.ThriftUtils
 
 class BatchJobSubmission(session: KyuubiBatchSessionImpl, batchRequest: 
BatchRequest)
@@ -43,12 +43,16 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl, 
batchRequest: BatchReq
 
   private lazy val _operationLog = OperationLog.createOperationLog(session, 
getHandle)
 
+  private val applicationManager =
+    
session.sessionManager.asInstanceOf[KyuubiSessionManager].applicationManager
+
   private var builder: ProcBuilder = _
 
-  @volatile
-  private[kyuubi] var appIdAndUrl: Option[(String, String)] = None
+  private val batchId: String = session.handle.identifier.toString
 
-  private var resultFetched: Boolean = _
+  private[kyuubi] def currentApplicationState: Option[Map[String, String]] = {
+    applicationManager.getApplicationInfo(builder.clusterManager(), batchId)
+  }
 
   private val applicationCheckInterval =
     session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_CHECK_INTERVAL)
@@ -57,7 +61,7 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl, 
batchRequest: BatchReq
 
   override protected def beforeRun(): Unit = {
     OperationLog.setCurrentOperationLog(_operationLog)
-    setHasResultSet(false)
+    setHasResultSet(true)
     setState(OperationState.PENDING)
   }
 
@@ -86,7 +90,7 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl, 
batchRequest: BatchReq
         new SparkBatchProcessBuilder(
           session.user,
           session.sessionConf,
-          session.batchId,
+          batchId,
           batchRequest.copy(conf = batchSparkConf ++ batchRequest.conf),
           getOperationLog)
 
@@ -97,34 +101,29 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl, 
batchRequest: BatchReq
     try {
       info(s"Submitting ${batchRequest.batchType} batch job: $builder")
       val process = builder.start
-      while (appIdAndUrl.isEmpty) {
-        try {
-          builder match {
-            case sparkBatchProcessBuilder: SparkBatchProcessBuilder =>
-              sparkBatchProcessBuilder.getApplicationIdAndUrl() match {
-                case Some(appInfo) => appIdAndUrl = Some(appInfo)
-                case _ =>
-              }
-
-            case _ =>
-          }
-        } catch {
-          case e: Exception => error(s"Failed to check batch application", e)
-        }
+      var applicationStatus = currentApplicationState
+      while (applicationStatus.isEmpty) {
+        applicationStatus = currentApplicationState
         Thread.sleep(applicationCheckInterval)
       }
-      process.waitFor()
-      if (process.exitValue() != 0) {
-        throw new KyuubiException(s"Process exit with value 
${process.exitValue()}")
+      val state = applicationStatus.get(ApplicationOperation.APP_STATE_KEY)
+      if (state == "KILLED" || state == "FAILED") {
+        process.destroyForcibly()
+        throw new RuntimeException("Batch job failed:" + 
applicationStatus.get.mkString(","))
+      } else {
+        process.waitFor()
+        if (process.exitValue() != 0) {
+          throw new KyuubiException(s"Process exit with value 
${process.exitValue()}")
+        }
       }
     } finally {
       builder.close()
     }
   }
 
-  override def getResultSetSchema: TTableSchema = {
+  override val getResultSetSchema: TTableSchema = {
     val schema = new TTableSchema()
-    Seq("ApplicationId", "URL").zipWithIndex.foreach { case (colName, 
position) =>
+    Seq("key", "value").zipWithIndex.foreach { case (colName, position) =>
       val tColumnDesc = new TColumnDesc()
       tColumnDesc.setColumnName(colName)
       val tTypeDesc = new TTypeDesc()
@@ -137,40 +136,14 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl, 
batchRequest: BatchReq
   }
 
   override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): 
TRowSet = {
-    validateDefaultFetchOrientation(order)
-    assertState(OperationState.FINISHED)
-    setHasResultSet(true)
-    order match {
-      case FETCH_NEXT => fetchNext()
-      case FETCH_PRIOR => resultSet
-      case FETCH_FIRST => resultSet
-    }
-  }
-
-  private lazy val resultSet: TRowSet = {
-    val tRow = new TRowSet(0, new JArrayList[TRow](1))
-    val (appId, url) = appIdAndUrl.toSeq.unzip
-
-    val tAppIdColumn = TColumn.stringVal(new TStringColumn(
-      appId.asJava,
-      ByteBuffer.allocate(0)))
-
-    val tUrlColumn = TColumn.stringVal(new TStringColumn(
-      url.asJava,
-      ByteBuffer.allocate(0)))
-
-    tRow.addToColumns(tAppIdColumn)
-    tRow.addToColumns(tUrlColumn)
-    tRow
-  }
-
-  private def fetchNext(): TRowSet = {
-    if (!resultFetched) {
-      resultFetched = true
-      resultSet
-    } else {
-      ThriftUtils.EMPTY_ROW_SET
-    }
+    currentApplicationState.map { state =>
+      val tRow = new TRowSet(0, new JArrayList[TRow](state.size))
+      Seq(state.keys, state.values).map(_.toSeq.asJava).foreach { col =>
+        val tCol = TColumn.stringVal(new TStringColumn(col, 
ByteBuffer.allocate(0)))
+        tRow.addToColumns(tCol)
+      }
+      tRow
+    }.getOrElse(ThriftUtils.EMPTY_ROW_SET)
   }
 
   override def close(): Unit = {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index 8352cd031..7926e68c0 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -38,7 +38,6 @@ class KyuubiBatchSessionImpl(
     batchRequest: BatchRequest)
   extends KyuubiSession(protocol, user, password, ipAddress, conf, 
sessionManager) {
   override val handle: SessionHandle = 
sessionManager.newBatchSessionHandle(protocol)
-  val batchId: String = handle.identifier.toString
 
   // TODO: Support batch conf advisor
   override val normalizedConf: Map[String, String] =
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index d0b4b865e..da7e6a61e 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -64,7 +64,8 @@ class KyuubiSessionImpl(
     case (key, value) => sessionConf.set(key, value)
   }
 
-  val engine: EngineRef = new EngineRef(sessionConf, user)
+  val engine: EngineRef =
+    new EngineRef(sessionConf, user, handle.identifier.toString, 
sessionManager.applicationManager)
   private[kyuubi] val launchEngineOp = sessionManager.operationManager
     .newLaunchEngineOperation(this, 
sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC))
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 97ba4fddf..851492cf1 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -27,6 +27,7 @@ import org.apache.kyuubi.cli.HandleIdentifier
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.credentials.HadoopCredentialsManager
+import org.apache.kyuubi.engine.KyuubiApplicationManager
 import org.apache.kyuubi.metrics.MetricsConstants._
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.KyuubiOperationManager
@@ -42,10 +43,12 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
   val credentialsManager = new HadoopCredentialsManager()
   // this lazy is must be specified since the conf is null when the class 
initialization
   lazy val sessionConfAdvisor: SessionConfAdvisor = 
PluginLoader.loadSessionConfAdvisor(conf)
+  val applicationManager = new KyuubiApplicationManager()
 
   private var limiter: Option[SessionLimiter] = None
 
   override def initialize(conf: KyuubiConf): Unit = {
+    addService(applicationManager)
     addService(credentialsManager)
     val absPath = 
Utils.getAbsolutePathFromWork(conf.get(SERVER_OPERATION_LOG_DIR_ROOT))
     _operationLogRoot = Some(absPath.toAbsolutePath.toString)
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 633e8e8f0..d10443047 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi
 
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX
+import org.apache.kyuubi.engine.YarnApplicationOperation
 import org.apache.kyuubi.server.MiniYarnService
 
 /**
@@ -27,28 +28,35 @@ import org.apache.kyuubi.server.MiniYarnService
  * may be thrown `/bin/bash: /bin/java: No such file or directory`.
  */
 trait WithKyuubiServerOnYarn extends WithKyuubiServer {
-  protected val kyuubiServerConf: KyuubiConf
-  protected val connectionConf: Map[String, String]
-  private var miniYarnService: MiniYarnService = _
+  override protected val conf: KyuubiConf = new KyuubiConf()
 
-  final override protected lazy val conf: KyuubiConf = {
-    connectionConf.foreach { case (k, v) => kyuubiServerConf.set(k, v) }
-    kyuubiServerConf
+  protected lazy val yarnOperation: YarnApplicationOperation = {
+    val operation = new YarnApplicationOperation()
+    operation.initialize(miniYarnService.getConf)
+    operation
   }
 
+  protected var miniYarnService: MiniYarnService = _
+
   override def beforeAll(): Unit = {
+    conf.set("spark.master", "yarn")
+      .set("spark.executor.instances", "1")
     miniYarnService = new MiniYarnService()
-    miniYarnService.initialize(new KyuubiConf(false))
+    miniYarnService.initialize(conf)
     miniYarnService.start()
     conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR", 
miniYarnService.getHadoopConfDir)
     super.beforeAll()
   }
 
   override def afterAll(): Unit = {
+    // stop kyuubi server
+    // stop yarn operation client
+    // stop yarn cluster
+    super.afterAll()
+    yarnOperation.stop()
     if (miniYarnService != null) {
       miniYarnService.stop()
       miniYarnService = null
     }
-    super.afterAll()
   }
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
index 8b82db990..548e4f13d 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
@@ -65,7 +65,7 @@ class EngineRefSuite extends KyuubiFunSuite {
     Seq(None, Some("suffix")).foreach { domain =>
       conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString)
       domain.foreach(conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, _))
-      val engine = new EngineRef(conf, user, id)
+      val engine = new EngineRef(conf, user, id, null)
       assert(engine.engineSpace ===
         DiscoveryPaths.makePath(
           s"kyuubi_${KYUUBI_VERSION}_${CONNECTION}_${engineType}",
@@ -79,7 +79,7 @@ class EngineRefSuite extends KyuubiFunSuite {
     val id = UUID.randomUUID().toString
     conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
     conf.set(KyuubiConf.ENGINE_TYPE, FLINK_SQL.toString)
-    val appName = new EngineRef(conf, user, id)
+    val appName = new EngineRef(conf, user, id, null)
     assert(appName.engineSpace ===
       DiscoveryPaths.makePath(
         s"kyuubi_${KYUUBI_VERSION}_${USER}_$FLINK_SQL",
@@ -91,7 +91,7 @@ class EngineRefSuite extends KyuubiFunSuite {
       k =>
         conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN)
         conf.set(k.key, "abc")
-        val appName2 = new EngineRef(conf, user, id)
+        val appName2 = new EngineRef(conf, user, id, null)
         assert(appName2.engineSpace ===
           DiscoveryPaths.makePath(
             s"kyuubi_${KYUUBI_VERSION}_${USER}_${FLINK_SQL}",
@@ -105,7 +105,7 @@ class EngineRefSuite extends KyuubiFunSuite {
     val id = UUID.randomUUID().toString
     conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, GROUP.toString)
     conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
-    val engineRef = new EngineRef(conf, user, id)
+    val engineRef = new EngineRef(conf, user, id, null)
     val primaryGroupName = 
UserGroupInformation.createRemoteUser(user).getPrimaryGroupName
     assert(engineRef.engineSpace ===
       DiscoveryPaths.makePath(
@@ -119,7 +119,7 @@ class EngineRefSuite extends KyuubiFunSuite {
       k =>
         conf.unset(k)
         conf.set(k.key, "abc")
-        val engineRef2 = new EngineRef(conf, user, id)
+        val engineRef2 = new EngineRef(conf, user, id, null)
         assert(engineRef2.engineSpace ===
           DiscoveryPaths.makePath(
             s"kyuubi_${KYUUBI_VERSION}_${GROUP}_${SPARK_SQL}",
@@ -132,7 +132,7 @@ class EngineRefSuite extends KyuubiFunSuite {
     val userName = "Iamauserwithoutgroup"
     val newUGI = UserGroupInformation.createRemoteUser(userName)
     assert(newUGI.getGroupNames.isEmpty)
-    val engineRef3 = new EngineRef(conf, userName, id)
+    val engineRef3 = new EngineRef(conf, userName, id, null)
     assert(engineRef3.engineSpace ===
       DiscoveryPaths.makePath(
         s"kyuubi_${KYUUBI_VERSION}_GROUP_SPARK_SQL",
@@ -145,7 +145,7 @@ class EngineRefSuite extends KyuubiFunSuite {
     val id = UUID.randomUUID().toString
     conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, SERVER.toString)
     conf.set(KyuubiConf.ENGINE_TYPE, FLINK_SQL.toString)
-    val appName = new EngineRef(conf, user, id)
+    val appName = new EngineRef(conf, user, id, null)
     assert(appName.engineSpace ===
       DiscoveryPaths.makePath(
         s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}",
@@ -154,7 +154,7 @@ class EngineRefSuite extends KyuubiFunSuite {
     assert(appName.defaultEngineName === 
s"kyuubi_${SERVER}_${FLINK_SQL}_${user}_default_$id")
 
     conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
-    val appName2 = new EngineRef(conf, user, id)
+    val appName2 = new EngineRef(conf, user, id, null)
     assert(appName2.engineSpace ===
       DiscoveryPaths.makePath(
         s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}",
@@ -169,31 +169,31 @@ class EngineRefSuite extends KyuubiFunSuite {
     // set subdomain and disable engine pool
     conf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
     conf.set(ENGINE_POOL_SIZE, -1)
-    val engine1 = new EngineRef(conf, user, id)
+    val engine1 = new EngineRef(conf, user, id, null)
     assert(engine1.subdomain === "abc")
 
     // unset subdomain and disable engine pool
     conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
     conf.set(ENGINE_POOL_SIZE, -1)
-    val engine2 = new EngineRef(conf, user, id)
+    val engine2 = new EngineRef(conf, user, id, null)
     assert(engine2.subdomain === "default")
 
     // set subdomain and 1 <= engine pool size < threshold
     conf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
     conf.set(ENGINE_POOL_SIZE, 1)
-    val engine3 = new EngineRef(conf, user, id)
+    val engine3 = new EngineRef(conf, user, id, null)
     assert(engine3.subdomain === "abc")
 
     // unset subdomain and 1 <= engine pool size < threshold
     conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
     conf.set(ENGINE_POOL_SIZE, 3)
-    val engine4 = new EngineRef(conf, user, id)
+    val engine4 = new EngineRef(conf, user, id, null)
     assert(engine4.subdomain.startsWith("engine-pool-"))
 
     // unset subdomain and engine pool size > threshold
     conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
     conf.set(ENGINE_POOL_SIZE, 100)
-    val engine5 = new EngineRef(conf, user, id)
+    val engine5 = new EngineRef(conf, user, id, null)
     val engineNumber = Integer.parseInt(engine5.subdomain.substring(12))
     val threshold = ENGINE_POOL_SIZE_THRESHOLD.defaultVal.get
     assert(engineNumber <= threshold)
@@ -203,7 +203,7 @@ class EngineRefSuite extends KyuubiFunSuite {
     val enginePoolName = "test-pool"
     conf.set(ENGINE_POOL_NAME, enginePoolName)
     conf.set(ENGINE_POOL_SIZE, 3)
-    val engine6 = new EngineRef(conf, user, id)
+    val engine6 = new EngineRef(conf, user, id, null)
     assert(engine6.subdomain.startsWith(s"$enginePoolName-"))
   }
 
@@ -214,7 +214,7 @@ class EngineRefSuite extends KyuubiFunSuite {
     conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
     conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test")
     conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
-    val engine = new EngineRef(conf, user, id)
+    val engine = new EngineRef(conf, user, id, null)
 
     var port1 = 0
     var port2 = 0
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
new file mode 100644
index 000000000..414fb35ec
--- /dev/null
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import java.lang.management.ManagementFactory
+import java.time.Duration
+import java.util.{ServiceLoader, UUID}
+
+import scala.collection.JavaConverters._
+import scala.sys.process._
+
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar._
+
+import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.SESSION_IDLE_TIMEOUT
+import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+
+class JpsApplicationOperationSuite extends KyuubiFunSuite {
+  private val operations = ServiceLoader.load(classOf[ApplicationOperation])
+    
.asScala.filter(_.getClass.isAssignableFrom(classOf[JpsApplicationOperation]))
+  private val jps = operations.head
+  jps.initialize(null)
+
+  test("JpsApplicationOperation with jstat") {
+    assert(jps.isSupported(None))
+    assert(jps.isSupported(Some("local")))
+    assert(!jps.killApplicationByTag(null)._1)
+    assert(!jps.killApplicationByTag("have a space")._1)
+    val currentProcess = ManagementFactory.getRuntimeMXBean.getName
+    val currentPid = currentProcess.splitAt(currentProcess.indexOf("@"))._1
+
+    new Thread {
+      override def run(): Unit = {
+        s"jstat -gcutil $currentPid 1000".!
+      }
+    }.start()
+
+    val desc1 = jps.getApplicationInfoByTag("sun.tools.jstat.Jstat")
+    assert(desc1.contains("id"))
+    assert(desc1.contains("name"))
+    assert(desc1("state") === "RUNNING")
+
+    jps.killApplicationByTag("sun.tools.jstat.Jstat")
+
+    val desc2 = jps.getApplicationInfoByTag("sun.tools.jstat.Jstat")
+    assert(!desc2.contains("id"))
+    assert(!desc2.contains("name"))
+    assert(desc2("state") === "FINISHED")
+  }
+
+  test("JpsApplicationOperation with spark local mode") {
+    val user = Utils.currentUser
+    val id = UUID.randomUUID().toString
+    val conf = new KyuubiConf()
+      .set("spark.abc", id)
+      .set("spark.master", "local")
+      .set(SESSION_IDLE_TIMEOUT, Duration.ofMinutes(3).toMillis)
+    val builder = new SparkProcessBuilder(user, conf)
+    builder.start
+
+    assert(jps.isSupported(builder.clusterManager()))
+    eventually(Timeout(10.seconds)) {
+      val desc1 = jps.getApplicationInfoByTag(id)
+      assert(desc1.contains("id"))
+      assert(desc1("name").contains(id))
+      assert(desc1("state") === "RUNNING")
+    }
+
+    val response = jps.killApplicationByTag(id)
+    assert(response._1, response._2)
+    assert(response._2 startsWith "Succeeded to terminate:")
+
+    val desc2 = jps.getApplicationInfoByTag(id)
+    assert(!desc2.contains("id"))
+    assert(!desc2.contains("name"))
+    assert(desc2("state") === "FINISHED")
+    val response2 = jps.killApplicationByTag(id)
+    assert(!response2._1)
+    assert(response2._2 === ApplicationOperation.NOT_FOUND)
+  }
+}
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 28ce47ba8..94c5cb450 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -28,25 +28,4 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
     val commands = builder.toString.split(' ')
     assert(commands.exists(_ endsWith "flink-sql-engine.sh"))
   }
-
-  test("kill application") {
-    val processBuilder = new FakeFlinkProcessBuilder(conf) {
-      override protected def env: Map[String, String] = Map("FLINK_HOME" -> "")
-    }
-    val exit1 = processBuilder.killApplication(
-      Right("""
-              |[INFO] SQL update statement has been successfully submitted to 
the cluster:
-              |Job ID: 6b1af540c0c0bb3fcfcad50ac037c862
-              |""".stripMargin))
-    assert(exit1.contains("6b1af540c0c0bb3fcfcad50ac037c862")
-      && !exit1.contains("FLINK_HOME is not set!"))
-
-    val exit2 = processBuilder.killApplication(Right("unknow"))
-    assert(exit2.equals(""))
-  }
-}
-
-class FakeFlinkProcessBuilder(config: KyuubiConf)
-  extends FlinkProcessBuilder("fake", config) {
-  override protected def commands: Array[String] = Array("ls")
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
deleted file mode 100644
index 2dd8a0e5e..000000000
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.spark
-
-import java.util.UUID
-
-import scala.concurrent.duration._
-
-import org.apache.kyuubi.KyuubiFunSuite
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.server.MiniYarnService
-import org.apache.kyuubi.server.api.v1.BatchRequest
-
-class SparkBatchProcessBuilderSuite extends KyuubiFunSuite {
-  private val conf = KyuubiConf().set("kyuubi.on", "off")
-  private var miniYarnService: MiniYarnService = _
-
-  override def beforeAll(): Unit = {
-    miniYarnService = new MiniYarnService()
-    miniYarnService.initialize(new KyuubiConf(false))
-    miniYarnService.start()
-    conf.set(
-      s"${KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX}.HADOOP_CONF_DIR",
-      miniYarnService.getHadoopConfDir)
-    super.beforeAll()
-  }
-
-  override def afterAll(): Unit = {
-    if (miniYarnService != null) {
-      miniYarnService.stop()
-      miniYarnService = null
-    }
-    super.afterAll()
-  }
-
-  test("spark batch process builder") {
-    val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
-
-    val batchRequest = BatchRequest(
-      "spark",
-      sparkProcessBuilder.mainResource.get,
-      "kyuubi",
-      sparkProcessBuilder.mainClass,
-      "spark-batch-submission",
-      Map("spark.master" -> "yarn"),
-      Seq.empty[String])
-
-    val builder = new SparkBatchProcessBuilder(
-      batchRequest.proxyUser,
-      conf,
-      UUID.randomUUID().toString,
-      batchRequest)
-    val proc = builder.start
-
-    eventually(timeout(3.minutes), interval(500.milliseconds)) {
-      val applicationIdAndUrl = builder.getApplicationIdAndUrl()
-      assert(applicationIdAndUrl.isDefined)
-      assert(applicationIdAndUrl.exists(_._1.startsWith("application_")))
-      assert(applicationIdAndUrl.exists(_._2.nonEmpty))
-    }
-    proc.destroyForcibly()
-  }
-}
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderOnYarnSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderOnYarnSuite.scala
deleted file mode 100644
index 245a10719..000000000
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderOnYarnSuite.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.spark
-
-import java.util.UUID
-
-import scala.concurrent.duration.DurationInt
-
-import org.apache.hadoop.yarn.client.api.YarnClient
-import org.scalatestplus.mockito.MockitoSugar.mock
-
-import org.apache.kyuubi.{Utils, WithKyuubiServerOnYarn}
-import org.apache.kyuubi.config.KyuubiConf
-
-class SparkProcessBuilderOnYarnSuite extends WithKyuubiServerOnYarn {
-
-  override protected val kyuubiServerConf: KyuubiConf = KyuubiConf()
-
-  override protected val connectionConf: Map[String, String] = Map(
-    "spark.master" -> "yarn",
-    "spark.executor.instances" -> "1")
-
-  test("test kill application") {
-    val engineRefId = UUID.randomUUID().toString
-
-    conf.set(
-      SparkProcessBuilder.TAG_KEY,
-      conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") +
-        "KYUUBI," + engineRefId)
-    val builder = new SparkProcessBuilder(Utils.currentUser, conf)
-    val proc = builder.start
-    eventually(timeout(3.minutes), interval(1.seconds)) {
-      val killMsg = builder.killApplication(Left(engineRefId))
-      assert(killMsg.contains(s"tagged with $engineRefId successfully."))
-    }
-    proc.destroyForcibly()
-
-    val pb1 = new FakeSparkProcessBuilder(conf.clone) {
-      override protected def env: Map[String, String] = Map()
-      override def getYarnClient: YarnClient = mock[YarnClient]
-    }
-    val exit1 = pb1.killApplication(Left(engineRefId))
-    assert(exit1.equals(s"There are no Application tagged with $engineRefId," +
-      s" please kill it manually."))
-
-    val pb2 = new FakeSparkProcessBuilder(conf.clone) {
-      override protected def env: Map[String, String] = Map()
-      override def getYarnClient: YarnClient = mock[YarnClient]
-    }
-    pb2.conf.set("spark.master", "local")
-    val exit2 = pb2.killApplication(Left(engineRefId))
-    assert(exit2.equals("Kill Application only works with YARN, please kill it 
manually." +
-      s" Application tagged with $engineRefId"))
-  }
-
-}
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
index 28f434faf..b1748a2b9 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
@@ -26,14 +26,14 @@ import org.apache.kyuubi.WithKyuubiServerOnYarn
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import org.apache.kyuubi.operation.OperationState.ERROR
 import org.apache.kyuubi.server.api.v1.BatchRequest
 import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
 
 class KyuubiBatchYarnClusterSuite extends WithKyuubiServerOnYarn {
-  override protected val connectionConf: Map[String, String] = Map.empty
 
-  override protected val kyuubiServerConf: KyuubiConf = {
-    KyuubiConf().set(s"$KYUUBI_BATCH_CONF_PREFIX.spark.spark.master", "yarn")
+  override protected val conf: KyuubiConf = {
+    new KyuubiConf().set(s"$KYUUBI_BATCH_CONF_PREFIX.spark.spark.master", 
"yarn")
       .set(BATCH_CONF_IGNORE_LIST, Seq("spark.master"))
   }
 
@@ -55,7 +55,7 @@ class KyuubiBatchYarnClusterSuite extends 
WithKyuubiServerOnYarn {
         s"spark.${ENGINE_CHECK_INTERVAL.key}" -> "1000"),
       Seq.empty[String])
 
-    val sessionHandle = sessionManager.openBatchSession(
+    val sessionHandle = sessionManager().openBatchSession(
       TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
       batchRequest.proxyUser,
       "passwd",
@@ -67,20 +67,39 @@ class KyuubiBatchYarnClusterSuite extends 
WithKyuubiServerOnYarn {
     val session = 
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
     val batchJobSubmissionOp = session.batchJobSubmissionOp
 
-    eventually(timeout(3.minutes), interval(500.milliseconds)) {
-      val applicationIdAndUrl = batchJobSubmissionOp.appIdAndUrl
-      assert(applicationIdAndUrl.isDefined)
-      assert(applicationIdAndUrl.exists(_._1.startsWith("application_")))
-      assert(applicationIdAndUrl.exists(_._2.nonEmpty))
-
-      assert(batchJobSubmissionOp.getStatus.state === OperationState.FINISHED)
-      val resultColumns = 
batchJobSubmissionOp.getNextRowSet(FetchOrientation.FETCH_NEXT, 1)
-        .getColumns.asScala
-      val appId = 
resultColumns.apply(0).getStringVal.getValues.asScala.apply(0)
-      val url = resultColumns.apply(1).getStringVal.getValues.asScala.apply(0)
-      assert(appId === batchJobSubmissionOp.appIdAndUrl.get._1)
-      assert(url === batchJobSubmissionOp.appIdAndUrl.get._2)
+    eventually(timeout(3.minutes), interval(50.milliseconds)) {
+      val state = batchJobSubmissionOp.currentApplicationState
+      assert(state.nonEmpty)
+      assert(state.exists(_("id").startsWith("application_")))
     }
+
+    val killResponse = 
yarnOperation.killApplicationByTag(sessionHandle.identifier.toString)
+    assert(killResponse._1)
+    assert(killResponse._2 startsWith "Succeeded to terminate:")
+
+    val appInfo = 
yarnOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
+
+    assert(appInfo("state") === "KILLED")
+    assert(batchJobSubmissionOp.getStatus.state === ERROR)
+
+    val resultColumns = 
batchJobSubmissionOp.getNextRowSet(FetchOrientation.FETCH_NEXT, 10)
+      .getColumns.asScala
+
+    val keys = resultColumns.head.getStringVal.getValues.asScala
+    val values = resultColumns.apply(1).getStringVal.getValues.asScala
+    val rows = keys.zip(values).toMap
+    val appId = rows("id")
+    val appName = rows("name")
+    val appState = rows("state")
+    val appUrl = rows("url")
+    val appError = rows("error")
+
+    val state2 = batchJobSubmissionOp.currentApplicationState.get
+    assert(appId === state2("id"))
+    assert(appName === state2("name"))
+    assert(appState === state2("state"))
+    assert(appUrl === state2("url"))
+    assert(appError === state2("error"))
     sessionManager.closeSession(sessionHandle)
   }
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationYarnClusterSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationYarnClusterSuite.scala
index 7c2c32837..d708c2fd0 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationYarnClusterSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationYarnClusterSuite.scala
@@ -18,22 +18,11 @@
 package org.apache.kyuubi.operation
 
 import org.apache.kyuubi.WithKyuubiServerOnYarn
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_INIT_TIMEOUT
 
 class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with 
SparkQueryTests {
 
   override protected def jdbcUrl: String = getJdbcUrl
 
-  override protected val kyuubiServerConf: KyuubiConf = {
-    // TODO KYUUBI #745
-    KyuubiConf().set(ENGINE_INIT_TIMEOUT, 600000L)
-  }
-
-  override protected val connectionConf: Map[String, String] = Map(
-    "spark.master" -> "yarn",
-    "spark.executor.instances" -> "1")
-
   test("KYUUBI #527- Support test with mini yarn cluster") {
     withJdbcStatement() { statement =>
       val resultSet = statement.executeQuery("""SELECT "${spark.app.id}" as 
id""")
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
index 356c135a7..1a73cc24c 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
@@ -18,7 +18,7 @@
 package org.apache.kyuubi.server
 
 import java.io.{File, FileWriter}
-import java.net.{InetAddress, URLClassLoader}
+import java.net.InetAddress
 
 import scala.collection.JavaConverters._
 
@@ -26,23 +26,15 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.server.MiniYARNCluster
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.SpanSugar._
 
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.service.AbstractService
 
-class MiniYarnService(name: String) extends AbstractService(name) {
-  def this() = this(classOf[MiniYarnService].getSimpleName)
+class MiniYarnService extends AbstractService("TestMiniYarnService") {
 
-  private var hadoopConfDir: File = _
-  private var yarnConf: YarnConfiguration = _
-  private var yarnCluster: MiniYARNCluster = _
-
-  private val classLoader = Thread.currentThread().getContextClassLoader
-
-  private def newYarnConfig(): YarnConfiguration = {
+  private val hadoopConfDir: File = Utils.createTempDir().toFile
+  private val yarnConf: YarnConfiguration = {
     val yarnConfig = new YarnConfiguration()
     // Disable the disk utilization check to avoid the test hanging when 
people's disks are
     // getting full.
@@ -77,41 +69,32 @@ class MiniYarnService(name: String) extends 
AbstractService(name) {
     yarnConfig.set(s"hadoop.proxyuser.$currentUser.hosts", "*")
     yarnConfig
   }
+  private val yarnCluster: MiniYARNCluster = new MiniYARNCluster(getName, 1, 
1, 1)
 
   override def initialize(conf: KyuubiConf): Unit = {
-    hadoopConfDir = Utils.createTempDir().toFile
-    yarnConf = newYarnConfig()
-    yarnCluster = new MiniYARNCluster(name, 1, 1, 1)
     yarnCluster.init(yarnConf)
     super.initialize(conf)
   }
 
   override def start(): Unit = {
     yarnCluster.start()
-    val config = yarnCluster.getConfig
-    eventually(timeout(10.seconds), interval(100.milliseconds)) {
-      config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) != "0"
-    }
-    info(s"RM address in configuration is 
${config.get(YarnConfiguration.RM_ADDRESS)}")
     saveHadoopConf()
     super.start()
-
-    val hadoopConfClassLoader = new 
URLClassLoader(Array(hadoopConfDir.toURI.toURL), classLoader)
-    Thread.currentThread().setContextClassLoader(hadoopConfClassLoader)
   }
 
   override def stop(): Unit = {
     if (yarnCluster != null) yarnCluster.stop()
-    if (hadoopConfDir != null) hadoopConfDir.delete()
     super.stop()
-    Thread.currentThread().setContextClassLoader(classLoader)
   }
 
   private def saveHadoopConf(): Unit = {
     val configToWrite = new Configuration(false)
     val hostName = InetAddress.getLocalHost.getHostName
     yarnCluster.getConfig.iterator().asScala.foreach { kv =>
-      configToWrite.set(kv.getKey, kv.getValue.replaceAll(hostName, 
"localhost"))
+      val key = kv.getKey
+      val value = kv.getValue.replaceAll(hostName, "localhost")
+      configToWrite.set(key, value)
+      getConf.set(key, value)
     }
     val writer = new FileWriter(new File(hadoopConfDir, "yarn-site.xml"))
     configToWrite.writeXml(writer)

Reply via email to