This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new f0c3d8b86 [KYUUBI #5306] YarnApplicationOperation supports proxy user
f0c3d8b86 is described below

commit f0c3d8b8675870d195c1790f27b0b76c55bb8ae4
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Sep 20 11:34:13 2023 +0800

    [KYUUBI #5306] YarnApplicationOperation supports proxy user
    
    ### _Why are the changes needed?_
    
    For the secured YARN cluster, the Kyuubi Server's user typically has no 
permission to kill the application. Proxy user or admin should be used instead.
    
    
https://docs.cloudera.com/documentation/enterprise/latest/topics/cm_mc_yarn_acl.html#concept_yarn_app_acls__section_killing_an_app
    
    > For YARN, the following three groups of users are allowed to kill a 
running application:
    > - The application owner
    > - A cluster administrator defined in yarn.admin.acl
    > - A queue administrator defined in aclAdministerApps for the queue in 
which the application is running
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    
    Verified ADMIN mode in internal deployment. (output message is formatted 
for readable)
    ```
    Error: Batch e351185f-1ed8-437a-91bf-da2174e611e2 failed:
    {
        "id":"e351185f-1ed8-437a-91bf-da2174e611e2",
        "user":"da_music",
        "batchType":"SPARK",
        "name":"SparkPi",
        "appStartTime":0,
        "appId":"application_1694730881181_58306",
        
"appUrl":"http://xxxx-rm-2.xxxx:8088/cluster/app/application_1694730881181_58306";,
        "appState":"KILLED",
        "appDiagnostic":"Application application_1694730881181_58306 was killed 
by user yarn at 10.49.59.149",
        
"kyuubiInstance":"kyuubi-1.kyuubi-headless.spark.svc.cluster.local:10099",
        "state":"CANCELED",
        "createTime":1695102138188,
        "endTime":1695102163341,
        "batchInfo":{}
    }
    ```
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    No.
    
    Closes #5306 from pan3793/kill-proxy-user.
    
    Closes #5306
    
    2b2e54307 [Cheng Pan] address comments
    e7e9a9c57 [Cheng Pan] nit
    9cf2afc61 [Cheng Pan] polish
    ff82d1230 [Cheng Pan] polish
    bf0057b41 [Cheng Pan] ApplicationManager supports proxy user
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit cd325b48aeaa9aa155f03f5ad99790e3157fd30f)
    Signed-off-by: Cheng Pan <[email protected]>
---
 docs/configuration/settings.md                     |   7 +
 .../src/main/scala/org/apache/kyuubi/Utils.scala   |   9 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  26 +++
 .../kyuubi/engine/ApplicationOperation.scala       |  14 +-
 .../scala/org/apache/kyuubi/engine/EngineRef.scala |   6 +-
 .../kyuubi/engine/JpsApplicationOperation.scala    |   6 +-
 .../engine/KubernetesApplicationOperation.scala    |   6 +-
 .../kyuubi/engine/KyuubiApplicationManager.scala   |  10 +-
 .../kyuubi/engine/YarnApplicationOperation.scala   | 183 ++++++++++++---------
 .../kyuubi/operation/BatchJobSubmission.scala      |   3 +-
 .../kyuubi/server/api/v1/BatchesResource.scala     |  14 +-
 11 files changed, 193 insertions(+), 91 deletions(-)

diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index d6d142548..ece26a1a6 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -459,6 +459,13 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.spnego.keytab    | &lt;undefined&gt; | Keytab file for SPNego 
principal                                                                       
                                                                                
                                                | string | 1.6.0 |
 | kyuubi.spnego.principal | &lt;undefined&gt; | SPNego service principal, 
typical value would look like HTTP/[email protected]. SPNego service principal 
would be used when restful Kerberos security is enabled. This needs to be set 
only if SPNEGO is to be used in authentication. | string | 1.6.0 |
 
+### Yarn
+
+|            Key            | Default |                                        
                                                                                
                         Meaning                                                
                                                                                
                 |  Type  | Since |
+|---------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|-------|
+| kyuubi.yarn.user.admin    | yarn    | When kyuubi.yarn.user.strategy is set 
to ADMIN, use this admin user to construct YARN client for application 
management, e.g. kill application.                                              
                                                                                
                           | string | 1.8.0 |
+| kyuubi.yarn.user.strategy | NONE    | Determine which user to use to 
construct YARN client for application management, e.g. kill application. 
Options: <ul><li>NONE: use Kyuubi server user.</li><li>ADMIN: use admin user 
configured in `kyuubi.yarn.user.admin`.</li><li>OWNER: use session user, 
typically is application owner.</li></ul> | string | 1.8.0 |
+
 ### Zookeeper
 
 |                       Key                        |      Default       |      
                                                                           
Meaning                                                                         
         |  Type   | Since |
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index fac30a173..accfca4c9 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -21,6 +21,7 @@ import java.io._
 import java.net.{Inet4Address, InetAddress, NetworkInterface}
 import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Path, Paths, StandardCopyOption}
+import java.security.PrivilegedAction
 import java.text.SimpleDateFormat
 import java.util.{Date, Properties, TimeZone, UUID}
 import java.util.concurrent.TimeUnit
@@ -203,6 +204,14 @@ object Utils extends Logging {
 
   def currentUser: String = 
UserGroupInformation.getCurrentUser.getShortUserName
 
+  def doAs[T](
+      proxyUser: String,
+      realUser: UserGroupInformation = UserGroupInformation.getCurrentUser)(f: 
() => T): T = {
+    UserGroupInformation.createProxyUser(proxyUser, realUser).doAs(new 
PrivilegedAction[T] {
+      override def run(): T = f()
+    })
+  }
+
   private val shortVersionRegex = """^(\d+\.\d+\.\d+)(.*)?$""".r
 
   /**
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index bbbb73b95..9cef84d6b 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2818,6 +2818,32 @@ object KyuubiConf {
       .version("1.7.2")
       .fallbackConf(ENGINE_SUBMIT_TIMEOUT)
 
+  object YarnUserStrategy extends Enumeration {
+    type YarnUserStrategy = Value
+    val NONE, ADMIN, OWNER = Value
+  }
+
+  val YARN_USER_STRATEGY: ConfigEntry[String] =
+    buildConf("kyuubi.yarn.user.strategy")
+      .doc("Determine which user to use to construct YARN client for 
application management, " +
+        "e.g. kill application. Options: <ul>" +
+        "<li>NONE: use Kyuubi server user.</li>" +
+        "<li>ADMIN: use admin user configured in 
`kyuubi.yarn.user.admin`.</li>" +
+        "<li>OWNER: use session user, typically is application owner.</li>" +
+        "</ul>")
+      .version("1.8.0")
+      .stringConf
+      .checkValues(YarnUserStrategy)
+      .createWithDefault("NONE")
+
+  val YARN_USER_ADMIN: ConfigEntry[String] =
+    buildConf("kyuubi.yarn.user.admin")
+      .doc(s"When ${YARN_USER_STRATEGY.key} is set to ADMIN, use this admin 
user to " +
+        "construct YARN client for application management, e.g. kill 
application.")
+      .version("1.8.0")
+      .stringConf
+      .createWithDefault("yarn")
+
   /**
    * Holds information about keys that have been deprecated.
    *
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
index 2acce39cc..23a49c1ae 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -47,11 +47,18 @@ trait ApplicationOperation {
    *            For example,
    *            if the Hadoop Yarn is used, for spark applications,
    *            the tag will be preset via spark.yarn.tags
+   * @param proxyUser the proxy user to use for executing kill commands.
+   *                  For secured YARN cluster, the Kyuubi Server's user 
typically
+   *                  has no permission to kill the application. Admin user or
+   *                  application owner should be used instead.
    * @return a message contains response describing how the kill process.
    *
    * @note For implementations, please suppress exceptions and always return 
KillResponse
    */
-  def killApplicationByTag(appMgrInfo: ApplicationManagerInfo, tag: String): 
KillResponse
+  def killApplicationByTag(
+      appMgrInfo: ApplicationManagerInfo,
+      tag: String,
+      proxyUser: Option[String] = None): KillResponse
 
   /**
    * Get the engine/application status by the unique application tag
@@ -59,11 +66,16 @@ trait ApplicationOperation {
    * @param appMgrInfo the application manager information
    * @param tag the unique application tag for engine instance.
    * @param submitTime engine submit to resourceManager time
+   * @param proxyUser  the proxy user to use for creating YARN client
+   *                   For secured YARN cluster, the Kyuubi Server's user may 
have no permission
+   *                   to operate the application. Admin user or application 
owner could be used
+   *                   instead.
    * @return [[ApplicationInfo]]
    */
   def getApplicationInfoByTag(
       appMgrInfo: ApplicationManagerInfo,
       tag: String,
+      proxyUser: Option[String] = None,
       submitTime: Option[Long] = None): ApplicationInfo
 }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 160e7f39e..6122a6f13 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -233,7 +233,8 @@ private[kyuubi] class EngineRef(
         }
 
         if (started + timeout <= System.currentTimeMillis()) {
-          val killMessage = 
engineManager.killApplication(builder.appMgrInfo(), engineRefId)
+          val killMessage =
+            engineManager.killApplication(builder.appMgrInfo(), engineRefId, 
Some(appUser))
           builder.close(true)
           MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, 
appUser)))
           throw KyuubiSQLException(
@@ -254,6 +255,7 @@ private[kyuubi] class EngineRef(
             val applicationInfo = engineMgr.getApplicationInfo(
               builder.appMgrInfo(),
               engineRefId,
+              Some(appUser),
               Some(started))
 
             applicationInfo.foreach { appInfo =>
@@ -310,7 +312,7 @@ private[kyuubi] class EngineRef(
       try {
         val appMgrInfo = builder.appMgrInfo()
         builder.close(true)
-        engineManager.killApplication(appMgrInfo, engineRefId)
+        engineManager.killApplication(appMgrInfo, engineRefId, Some(appUser))
       } catch {
         case e: Exception =>
           warn(s"Error closing engine builder, engineRefId: $engineRefId", e)
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
index 64dacbb64..1d0d58d16 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
@@ -83,14 +83,16 @@ class JpsApplicationOperation extends ApplicationOperation {
 
   override def killApplicationByTag(
       appMgrInfo: ApplicationManagerInfo,
-      tag: String): KillResponse = {
+      tag: String,
+      proxyUser: Option[String] = None): KillResponse = {
     killJpsApplicationByTag(tag, true)
   }
 
   override def getApplicationInfoByTag(
       appMgrInfo: ApplicationManagerInfo,
       tag: String,
-      submitTime: Option[Long]): ApplicationInfo = {
+      proxyUser: Option[String] = None,
+      submitTime: Option[Long] = None): ApplicationInfo = {
     val commandOption = getEngine(tag)
     if (commandOption.nonEmpty) {
       val idAndCmd = commandOption.get
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index c9d509efe..16a0c29d1 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -117,7 +117,8 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
 
   override def killApplicationByTag(
       appMgrInfo: ApplicationManagerInfo,
-      tag: String): KillResponse = {
+      tag: String,
+      proxyUser: Option[String] = None): KillResponse = {
     if (kyuubiConf == null) {
       throw new IllegalStateException("Methods initialize and isSupported must 
be called ahead")
     }
@@ -157,7 +158,8 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
   override def getApplicationInfoByTag(
       appMgrInfo: ApplicationManagerInfo,
       tag: String,
-      submitTime: Option[Long]): ApplicationInfo = {
+      proxyUser: Option[String] = None,
+      submitTime: Option[Long] = None): ApplicationInfo = {
     if (kyuubiConf == null) {
       throw new IllegalStateException("Methods initialize and isSupported must 
be called ahead")
     }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 4e121d297..f8b640053 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -60,11 +60,14 @@ class KyuubiApplicationManager extends 
AbstractService("KyuubiApplicationManager
     super.stop()
   }
 
-  def killApplication(appMgrInfo: ApplicationManagerInfo, tag: String): 
KillResponse = {
+  def killApplication(
+      appMgrInfo: ApplicationManagerInfo,
+      tag: String,
+      proxyUser: Option[String] = None): KillResponse = {
     var (killed, lastMessage): KillResponse = (false, null)
     for (operation <- operations if !killed) {
       if (operation.isSupported(appMgrInfo)) {
-        val (k, m) = operation.killApplicationByTag(appMgrInfo, tag)
+        val (k, m) = operation.killApplicationByTag(appMgrInfo, tag, proxyUser)
         killed = k
         lastMessage = m
       }
@@ -83,10 +86,11 @@ class KyuubiApplicationManager extends 
AbstractService("KyuubiApplicationManager
   def getApplicationInfo(
       appMgrInfo: ApplicationManagerInfo,
       tag: String,
+      proxyUser: Option[String] = None,
       submitTime: Option[Long] = None): Option[ApplicationInfo] = {
     val operation = operations.find(_.isSupported(appMgrInfo))
     operation match {
-      case Some(op) => Some(op.getApplicationInfoByTag(appMgrInfo, tag, 
submitTime))
+      case Some(op) => Some(op.getApplicationInfoByTag(appMgrInfo, tag, 
proxyUser, submitTime))
       case None => None
     }
   }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
index d87fc406a..1f672ad70 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -21,11 +21,14 @@ import java.util.Locale
 
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, 
YarnApplicationState}
 import org.apache.hadoop.yarn.client.api.YarnClient
 
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.YarnUserStrategy
+import org.apache.kyuubi.config.KyuubiConf.YarnUserStrategy._
 import org.apache.kyuubi.engine.ApplicationOperation._
 import org.apache.kyuubi.engine.ApplicationState.ApplicationState
 import org.apache.kyuubi.engine.YarnApplicationOperation.toApplicationState
@@ -33,106 +36,136 @@ import org.apache.kyuubi.util.KyuubiHadoopUtils
 
 class YarnApplicationOperation extends ApplicationOperation with Logging {
 
-  @volatile private var yarnClient: YarnClient = _
+  private var yarnConf: Configuration = _
+  @volatile private var adminYarnClient: Option[YarnClient] = None
   private var submitTimeout: Long = _
 
   override def initialize(conf: KyuubiConf): Unit = {
     submitTimeout = conf.get(KyuubiConf.ENGINE_YARN_SUBMIT_TIMEOUT)
-    val yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf)
-    // YarnClient is thread-safe
-    val c = YarnClient.createYarnClient()
-    c.init(yarnConf)
-    c.start()
-    yarnClient = c
-    info(s"Successfully initialized yarn client: ${c.getServiceState}")
+    yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf)
+
+    def createYarnClientWithCurrentUser(): Unit = {
+      val c = createYarnClient(yarnConf)
+      info(s"Creating admin YARN client with current user: 
${Utils.currentUser}.")
+      adminYarnClient = Some(c)
+    }
+
+    def createYarnClientWithProxyUser(proxyUser: String): Unit = 
Utils.doAs(proxyUser) { () =>
+      val c = createYarnClient(yarnConf)
+      info(s"Creating admin YARN client with proxy user: $proxyUser.")
+      adminYarnClient = Some(c)
+    }
+
+    YarnUserStrategy.withName(conf.get(KyuubiConf.YARN_USER_STRATEGY)) match {
+      case NONE =>
+        createYarnClientWithCurrentUser()
+      case ADMIN if conf.get(KyuubiConf.YARN_USER_ADMIN) == Utils.currentUser 
=>
+        createYarnClientWithCurrentUser()
+      case ADMIN =>
+        createYarnClientWithProxyUser(conf.get(KyuubiConf.YARN_USER_ADMIN))
+      case OWNER =>
+        info("Skip initializing admin YARN client")
+    }
   }
 
-  override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {
-    yarnClient != null && appMgrInfo.resourceManager.exists(
-      _.toLowerCase(Locale.ROOT).startsWith("yarn"))
+  private def createYarnClient(_yarnConf: Configuration): YarnClient = {
+    // YarnClient is thread-safe
+    val yarnClient = YarnClient.createYarnClient()
+    yarnClient.init(_yarnConf)
+    yarnClient.start()
+    yarnClient
   }
 
-  override def killApplicationByTag(
-      appMgrInfo: ApplicationManagerInfo,
-      tag: String): KillResponse = {
-    if (yarnClient != null) {
-      try {
-        val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
-        if (reports.isEmpty) {
-          (false, NOT_FOUND)
-        } else {
+  private def withYarnClient[T](proxyUser: Option[String])(action: YarnClient 
=> T): T = {
+    (adminYarnClient, proxyUser) match {
+      case (Some(yarnClient), _) =>
+        action(yarnClient)
+      case (None, Some(user)) =>
+        Utils.doAs(user) { () =>
+          var yarnClient: YarnClient = null
           try {
-            val applicationId = reports.get(0).getApplicationId
-            yarnClient.killApplication(applicationId)
-            (true, s"Succeeded to terminate: $applicationId with $tag")
-          } catch {
-            case e: Exception =>
-              (false, s"Failed to terminate application with $tag, due to 
${e.getMessage}")
+            yarnClient = createYarnClient(yarnConf)
+            action(yarnClient)
+          } finally {
+            Utils.tryLogNonFatalError(yarnClient.close())
           }
         }
-      } catch {
-        case e: Exception =>
-          (
-            false,
-            s"Failed to get while terminating application with tag $tag," +
-              s" due to ${e.getMessage}")
-      }
-    } else {
-      throw new IllegalStateException("Methods initialize and isSupported must 
be called ahead")
+      case (None, None) =>
+        throw new IllegalStateException("Methods initialize and isSupported 
must be called ahead")
     }
   }
 
-  override def getApplicationInfoByTag(
+  override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean =
+    
appMgrInfo.resourceManager.exists(_.toLowerCase(Locale.ROOT).startsWith("yarn"))
+
+  override def killApplicationByTag(
       appMgrInfo: ApplicationManagerInfo,
       tag: String,
-      submitTime: Option[Long]): ApplicationInfo = {
-    if (yarnClient != null) {
-      debug(s"Getting application info from Yarn cluster by $tag tag")
+      proxyUser: Option[String] = None): KillResponse = 
withYarnClient(proxyUser) { yarnClient =>
+    try {
       val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
       if (reports.isEmpty) {
-        debug(s"Application with tag $tag not found")
-        submitTime match {
-          case Some(_submitTime) =>
-            val elapsedTime = System.currentTimeMillis - _submitTime
-            if (elapsedTime > submitTimeout) {
-              error(s"Can't find target yarn application by tag: $tag, " +
-                s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
-              ApplicationInfo.NOT_FOUND
-            } else {
-              warn("Wait for yarn application to be submitted, " +
-                s"elapsed time: ${elapsedTime}ms, return UNKNOWN status")
-              ApplicationInfo.UNKNOWN
-            }
-          case _ => ApplicationInfo.NOT_FOUND
-        }
+        (false, NOT_FOUND)
       } else {
-        val report = reports.get(0)
-        val info = ApplicationInfo(
-          id = report.getApplicationId.toString,
-          name = report.getName,
-          state = toApplicationState(
-            report.getApplicationId.toString,
-            report.getYarnApplicationState,
-            report.getFinalApplicationStatus),
-          url = Option(report.getTrackingUrl),
-          error = Option(report.getDiagnostics))
-        debug(s"Successfully got application info by $tag: $info")
-        info
+        try {
+          val applicationId = reports.get(0).getApplicationId
+          yarnClient.killApplication(applicationId)
+          (true, s"Succeeded to terminate: $applicationId with $tag")
+        } catch {
+          case e: Exception =>
+            (false, s"Failed to terminate application with $tag, due to 
${e.getMessage}")
+        }
       }
-    } else {
-      throw new IllegalStateException("Methods initialize and isSupported must 
be called ahead")
+    } catch {
+      case e: Exception =>
+        (
+          false,
+          s"Failed to get while terminating application with tag $tag, due to 
${e.getMessage}")
     }
   }
 
-  override def stop(): Unit = {
-    if (yarnClient != null) {
-      try {
-        yarnClient.stop()
-      } catch {
-        case e: Exception => error(e.getMessage)
+  override def getApplicationInfoByTag(
+      appMgrInfo: ApplicationManagerInfo,
+      tag: String,
+      proxyUser: Option[String] = None,
+      submitTime: Option[Long] = None): ApplicationInfo = 
withYarnClient(proxyUser) { yarnClient =>
+    debug(s"Getting application info from Yarn cluster by $tag tag")
+    val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
+    if (reports.isEmpty) {
+      debug(s"Application with tag $tag not found")
+      submitTime match {
+        case Some(_submitTime) =>
+          val elapsedTime = System.currentTimeMillis - _submitTime
+          if (elapsedTime > submitTimeout) {
+            error(s"Can't find target yarn application by tag: $tag, " +
+              s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
+            ApplicationInfo.NOT_FOUND
+          } else {
+            warn("Wait for yarn application to be submitted, " +
+              s"elapsed time: ${elapsedTime}ms, return UNKNOWN status")
+            ApplicationInfo.UNKNOWN
+          }
+        case _ => ApplicationInfo.NOT_FOUND
       }
+    } else {
+      val report = reports.get(0)
+      val info = ApplicationInfo(
+        id = report.getApplicationId.toString,
+        name = report.getName,
+        state = toApplicationState(
+          report.getApplicationId.toString,
+          report.getYarnApplicationState,
+          report.getFinalApplicationStatus),
+        url = Option(report.getTrackingUrl),
+        error = Option(report.getDiagnostics))
+      debug(s"Successfully got application info by $tag: $info")
+      info
     }
   }
+
+  override def stop(): Unit = adminYarnClient.foreach { yarnClient =>
+    Utils.tryLogNonFatalError(yarnClient.stop())
+  }
 }
 
 object YarnApplicationOperation extends Logging {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 4ea609540..779dc48ae 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -110,6 +110,7 @@ class BatchJobSubmission(
       applicationManager.getApplicationInfo(
         builder.appMgrInfo(),
         batchId,
+        Some(session.user),
         Some(_submitTime))
     applicationId(applicationInfo).foreach { _ =>
       if (_appStartTime <= 0) {
@@ -124,7 +125,7 @@ class BatchJobSubmission(
   }
 
   private[kyuubi] def killBatchApplication(): KillResponse = {
-    applicationManager.killApplication(builder.appMgrInfo(), batchId)
+    applicationManager.killApplication(builder.appMgrInfo(), batchId, 
Some(session.user))
   }
 
   private val applicationCheckInterval =
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index a525ac4b2..76d913a98 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -323,6 +323,7 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
               val batchAppStatus = 
sessionManager.applicationManager.getApplicationInfo(
                 metadata.appMgrInfo,
                 batchId,
+                Some(userName),
                 // prevent that the batch be marked as terminated if 
application state is NOT_FOUND
                 Some(metadata.engineOpenTime).filter(_ > 
0).orElse(Some(System.currentTimeMillis)))
               buildBatch(metadata, batchAppStatus)
@@ -452,9 +453,12 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
       }
     }
 
-    def forceKill(appMgrInfo: ApplicationManagerInfo, batchId: String): 
KillResponse = {
+    def forceKill(
+        appMgrInfo: ApplicationManagerInfo,
+        batchId: String,
+        user: String): KillResponse = {
       val (killed, message) = sessionManager.applicationManager
-        .killApplication(appMgrInfo, batchId)
+        .killApplication(appMgrInfo, batchId, Some(user))
       info(s"Mark batch[$batchId] closed by ${fe.connectionUrl}")
       sessionManager.updateMetadata(Metadata(identifier = batchId, 
peerInstanceClosed = true))
       (killed, message)
@@ -480,7 +484,7 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
             new CloseBatchResponse(false, s"The batch[$metadata] has been 
terminated.")
           } else {
             info(s"Cancel batch[$batchId] with state ${metadata.state} by 
killing application")
-            val (killed, msg) = forceKill(metadata.appMgrInfo, batchId)
+            val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, 
userName)
             new CloseBatchResponse(killed, msg)
           }
         } else if (metadata.kyuubiInstance != fe.connectionUrl) {
@@ -491,12 +495,12 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
           } catch {
             case e: KyuubiRestException =>
               error(s"Error redirecting delete batch[$batchId] to 
${metadata.kyuubiInstance}", e)
-              val (killed, msg) = forceKill(metadata.appMgrInfo, batchId)
+              val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, 
userName)
               new CloseBatchResponse(killed, if (killed) msg else 
Utils.stringifyException(e))
           }
         } else { // should not happen, but handle this for safe
           warn(s"Something wrong on deleting batch[$batchId], try forcibly 
killing application")
-          val (killed, msg) = forceKill(metadata.appMgrInfo, batchId)
+          val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, userName)
           new CloseBatchResponse(killed, msg)
         }
       }.getOrElse {

Reply via email to