This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new f0c3d8b86 [KYUUBI #5306] YarnApplicationOperation supports proxy user
f0c3d8b86 is described below
commit f0c3d8b8675870d195c1790f27b0b76c55bb8ae4
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Sep 20 11:34:13 2023 +0800
[KYUUBI #5306] YarnApplicationOperation supports proxy user
### _Why are the changes needed?_
For the secured YARN cluster, the Kyuubi Server's user typically has no
permission to kill the application. Proxy user or admin should be used instead.
https://docs.cloudera.com/documentation/enterprise/latest/topics/cm_mc_yarn_acl.html#concept_yarn_app_acls__section_killing_an_app
> For YARN, the following three groups of users are allowed to kill a
running application:
> - The application owner
> - A cluster administrator defined in yarn.admin.acl
> - A queue administrator defined in aclAdministerApps for the queue in
which the application is running
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
Verified ADMIN mode in internal deployment. (output message is formatted
for readable)
```
Error: Batch e351185f-1ed8-437a-91bf-da2174e611e2 failed:
{
"id":"e351185f-1ed8-437a-91bf-da2174e611e2",
"user":"da_music",
"batchType":"SPARK",
"name":"SparkPi",
"appStartTime":0,
"appId":"application_1694730881181_58306",
"appUrl":"http://xxxx-rm-2.xxxx:8088/cluster/app/application_1694730881181_58306",
"appState":"KILLED",
"appDiagnostic":"Application application_1694730881181_58306 was killed
by user yarn at 10.49.59.149",
"kyuubiInstance":"kyuubi-1.kyuubi-headless.spark.svc.cluster.local:10099",
"state":"CANCELED",
"createTime":1695102138188,
"endTime":1695102163341,
"batchInfo":{}
}
```
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No.
Closes #5306 from pan3793/kill-proxy-user.
Closes #5306
2b2e54307 [Cheng Pan] address comments
e7e9a9c57 [Cheng Pan] nit
9cf2afc61 [Cheng Pan] polish
ff82d1230 [Cheng Pan] polish
bf0057b41 [Cheng Pan] ApplicationManager supports proxy user
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit cd325b48aeaa9aa155f03f5ad99790e3157fd30f)
Signed-off-by: Cheng Pan <[email protected]>
---
docs/configuration/settings.md | 7 +
.../src/main/scala/org/apache/kyuubi/Utils.scala | 9 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 26 +++
.../kyuubi/engine/ApplicationOperation.scala | 14 +-
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 6 +-
.../kyuubi/engine/JpsApplicationOperation.scala | 6 +-
.../engine/KubernetesApplicationOperation.scala | 6 +-
.../kyuubi/engine/KyuubiApplicationManager.scala | 10 +-
.../kyuubi/engine/YarnApplicationOperation.scala | 183 ++++++++++++---------
.../kyuubi/operation/BatchJobSubmission.scala | 3 +-
.../kyuubi/server/api/v1/BatchesResource.scala | 14 +-
11 files changed, 193 insertions(+), 91 deletions(-)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index d6d142548..ece26a1a6 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -459,6 +459,13 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.spnego.keytab | <undefined> | Keytab file for SPNego
principal
| string | 1.6.0 |
| kyuubi.spnego.principal | <undefined> | SPNego service principal,
typical value would look like HTTP/[email protected]. SPNego service principal
would be used when restful Kerberos security is enabled. This needs to be set
only if SPNEGO is to be used in authentication. | string | 1.6.0 |
+### Yarn
+
+| Key | Default |
Meaning
| Type | Since |
+|---------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|-------|
+| kyuubi.yarn.user.admin | yarn | When kyuubi.yarn.user.strategy is set
to ADMIN, use this admin user to construct YARN client for application
management, e.g. kill application.
| string | 1.8.0 |
+| kyuubi.yarn.user.strategy | NONE | Determine which user to use to
construct YARN client for application management, e.g. kill application.
Options: <ul><li>NONE: use Kyuubi server user.</li><li>ADMIN: use admin user
configured in `kyuubi.yarn.user.admin`.</li><li>OWNER: use session user,
typically is application owner.</li></ul> | string | 1.8.0 |
+
### Zookeeper
| Key | Default |
Meaning
| Type | Since |
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index fac30a173..accfca4c9 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -21,6 +21,7 @@ import java.io._
import java.net.{Inet4Address, InetAddress, NetworkInterface}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths, StandardCopyOption}
+import java.security.PrivilegedAction
import java.text.SimpleDateFormat
import java.util.{Date, Properties, TimeZone, UUID}
import java.util.concurrent.TimeUnit
@@ -203,6 +204,14 @@ object Utils extends Logging {
def currentUser: String =
UserGroupInformation.getCurrentUser.getShortUserName
+ def doAs[T](
+ proxyUser: String,
+ realUser: UserGroupInformation = UserGroupInformation.getCurrentUser)(f:
() => T): T = {
+ UserGroupInformation.createProxyUser(proxyUser, realUser).doAs(new
PrivilegedAction[T] {
+ override def run(): T = f()
+ })
+ }
+
private val shortVersionRegex = """^(\d+\.\d+\.\d+)(.*)?$""".r
/**
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index bbbb73b95..9cef84d6b 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2818,6 +2818,32 @@ object KyuubiConf {
.version("1.7.2")
.fallbackConf(ENGINE_SUBMIT_TIMEOUT)
+ object YarnUserStrategy extends Enumeration {
+ type YarnUserStrategy = Value
+ val NONE, ADMIN, OWNER = Value
+ }
+
+ val YARN_USER_STRATEGY: ConfigEntry[String] =
+ buildConf("kyuubi.yarn.user.strategy")
+ .doc("Determine which user to use to construct YARN client for
application management, " +
+ "e.g. kill application. Options: <ul>" +
+ "<li>NONE: use Kyuubi server user.</li>" +
+ "<li>ADMIN: use admin user configured in
`kyuubi.yarn.user.admin`.</li>" +
+ "<li>OWNER: use session user, typically is application owner.</li>" +
+ "</ul>")
+ .version("1.8.0")
+ .stringConf
+ .checkValues(YarnUserStrategy)
+ .createWithDefault("NONE")
+
+ val YARN_USER_ADMIN: ConfigEntry[String] =
+ buildConf("kyuubi.yarn.user.admin")
+ .doc(s"When ${YARN_USER_STRATEGY.key} is set to ADMIN, use this admin
user to " +
+ "construct YARN client for application management, e.g. kill
application.")
+ .version("1.8.0")
+ .stringConf
+ .createWithDefault("yarn")
+
/**
* Holds information about keys that have been deprecated.
*
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
index 2acce39cc..23a49c1ae 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -47,11 +47,18 @@ trait ApplicationOperation {
* For example,
* if the Hadoop Yarn is used, for spark applications,
* the tag will be preset via spark.yarn.tags
+ * @param proxyUser the proxy user to use for executing kill commands.
+ * For secured YARN cluster, the Kyuubi Server's user
typically
+ * has no permission to kill the application. Admin user or
+ * application owner should be used instead.
* @return a message contains response describing how the kill process.
*
* @note For implementations, please suppress exceptions and always return
KillResponse
*/
- def killApplicationByTag(appMgrInfo: ApplicationManagerInfo, tag: String):
KillResponse
+ def killApplicationByTag(
+ appMgrInfo: ApplicationManagerInfo,
+ tag: String,
+ proxyUser: Option[String] = None): KillResponse
/**
* Get the engine/application status by the unique application tag
@@ -59,11 +66,16 @@ trait ApplicationOperation {
* @param appMgrInfo the application manager information
* @param tag the unique application tag for engine instance.
* @param submitTime engine submit to resourceManager time
+ * @param proxyUser the proxy user to use for creating YARN client
+ * For secured YARN cluster, the Kyuubi Server's user may
have no permission
+ * to operate the application. Admin user or application
owner could be used
+ * instead.
* @return [[ApplicationInfo]]
*/
def getApplicationInfoByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
+ proxyUser: Option[String] = None,
submitTime: Option[Long] = None): ApplicationInfo
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 160e7f39e..6122a6f13 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -233,7 +233,8 @@ private[kyuubi] class EngineRef(
}
if (started + timeout <= System.currentTimeMillis()) {
- val killMessage =
engineManager.killApplication(builder.appMgrInfo(), engineRefId)
+ val killMessage =
+ engineManager.killApplication(builder.appMgrInfo(), engineRefId,
Some(appUser))
builder.close(true)
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT,
appUser)))
throw KyuubiSQLException(
@@ -254,6 +255,7 @@ private[kyuubi] class EngineRef(
val applicationInfo = engineMgr.getApplicationInfo(
builder.appMgrInfo(),
engineRefId,
+ Some(appUser),
Some(started))
applicationInfo.foreach { appInfo =>
@@ -310,7 +312,7 @@ private[kyuubi] class EngineRef(
try {
val appMgrInfo = builder.appMgrInfo()
builder.close(true)
- engineManager.killApplication(appMgrInfo, engineRefId)
+ engineManager.killApplication(appMgrInfo, engineRefId, Some(appUser))
} catch {
case e: Exception =>
warn(s"Error closing engine builder, engineRefId: $engineRefId", e)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
index 64dacbb64..1d0d58d16 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
@@ -83,14 +83,16 @@ class JpsApplicationOperation extends ApplicationOperation {
override def killApplicationByTag(
appMgrInfo: ApplicationManagerInfo,
- tag: String): KillResponse = {
+ tag: String,
+ proxyUser: Option[String] = None): KillResponse = {
killJpsApplicationByTag(tag, true)
}
override def getApplicationInfoByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
- submitTime: Option[Long]): ApplicationInfo = {
+ proxyUser: Option[String] = None,
+ submitTime: Option[Long] = None): ApplicationInfo = {
val commandOption = getEngine(tag)
if (commandOption.nonEmpty) {
val idAndCmd = commandOption.get
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index c9d509efe..16a0c29d1 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -117,7 +117,8 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
override def killApplicationByTag(
appMgrInfo: ApplicationManagerInfo,
- tag: String): KillResponse = {
+ tag: String,
+ proxyUser: Option[String] = None): KillResponse = {
if (kyuubiConf == null) {
throw new IllegalStateException("Methods initialize and isSupported must
be called ahead")
}
@@ -157,7 +158,8 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
override def getApplicationInfoByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
- submitTime: Option[Long]): ApplicationInfo = {
+ proxyUser: Option[String] = None,
+ submitTime: Option[Long] = None): ApplicationInfo = {
if (kyuubiConf == null) {
throw new IllegalStateException("Methods initialize and isSupported must
be called ahead")
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 4e121d297..f8b640053 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -60,11 +60,14 @@ class KyuubiApplicationManager extends
AbstractService("KyuubiApplicationManager
super.stop()
}
- def killApplication(appMgrInfo: ApplicationManagerInfo, tag: String):
KillResponse = {
+ def killApplication(
+ appMgrInfo: ApplicationManagerInfo,
+ tag: String,
+ proxyUser: Option[String] = None): KillResponse = {
var (killed, lastMessage): KillResponse = (false, null)
for (operation <- operations if !killed) {
if (operation.isSupported(appMgrInfo)) {
- val (k, m) = operation.killApplicationByTag(appMgrInfo, tag)
+ val (k, m) = operation.killApplicationByTag(appMgrInfo, tag, proxyUser)
killed = k
lastMessage = m
}
@@ -83,10 +86,11 @@ class KyuubiApplicationManager extends
AbstractService("KyuubiApplicationManager
def getApplicationInfo(
appMgrInfo: ApplicationManagerInfo,
tag: String,
+ proxyUser: Option[String] = None,
submitTime: Option[Long] = None): Option[ApplicationInfo] = {
val operation = operations.find(_.isSupported(appMgrInfo))
operation match {
- case Some(op) => Some(op.getApplicationInfoByTag(appMgrInfo, tag,
submitTime))
+ case Some(op) => Some(op.getApplicationInfoByTag(appMgrInfo, tag,
proxyUser, submitTime))
case None => None
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
index d87fc406a..1f672ad70 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -21,11 +21,14 @@ import java.util.Locale
import scala.collection.JavaConverters._
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus,
YarnApplicationState}
import org.apache.hadoop.yarn.client.api.YarnClient
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.YarnUserStrategy
+import org.apache.kyuubi.config.KyuubiConf.YarnUserStrategy._
import org.apache.kyuubi.engine.ApplicationOperation._
import org.apache.kyuubi.engine.ApplicationState.ApplicationState
import org.apache.kyuubi.engine.YarnApplicationOperation.toApplicationState
@@ -33,106 +36,136 @@ import org.apache.kyuubi.util.KyuubiHadoopUtils
class YarnApplicationOperation extends ApplicationOperation with Logging {
- @volatile private var yarnClient: YarnClient = _
+ private var yarnConf: Configuration = _
+ @volatile private var adminYarnClient: Option[YarnClient] = None
private var submitTimeout: Long = _
override def initialize(conf: KyuubiConf): Unit = {
submitTimeout = conf.get(KyuubiConf.ENGINE_YARN_SUBMIT_TIMEOUT)
- val yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf)
- // YarnClient is thread-safe
- val c = YarnClient.createYarnClient()
- c.init(yarnConf)
- c.start()
- yarnClient = c
- info(s"Successfully initialized yarn client: ${c.getServiceState}")
+ yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf)
+
+ def createYarnClientWithCurrentUser(): Unit = {
+ val c = createYarnClient(yarnConf)
+ info(s"Creating admin YARN client with current user:
${Utils.currentUser}.")
+ adminYarnClient = Some(c)
+ }
+
+ def createYarnClientWithProxyUser(proxyUser: String): Unit =
Utils.doAs(proxyUser) { () =>
+ val c = createYarnClient(yarnConf)
+ info(s"Creating admin YARN client with proxy user: $proxyUser.")
+ adminYarnClient = Some(c)
+ }
+
+ YarnUserStrategy.withName(conf.get(KyuubiConf.YARN_USER_STRATEGY)) match {
+ case NONE =>
+ createYarnClientWithCurrentUser()
+ case ADMIN if conf.get(KyuubiConf.YARN_USER_ADMIN) == Utils.currentUser
=>
+ createYarnClientWithCurrentUser()
+ case ADMIN =>
+ createYarnClientWithProxyUser(conf.get(KyuubiConf.YARN_USER_ADMIN))
+ case OWNER =>
+ info("Skip initializing admin YARN client")
+ }
}
- override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {
- yarnClient != null && appMgrInfo.resourceManager.exists(
- _.toLowerCase(Locale.ROOT).startsWith("yarn"))
+ private def createYarnClient(_yarnConf: Configuration): YarnClient = {
+ // YarnClient is thread-safe
+ val yarnClient = YarnClient.createYarnClient()
+ yarnClient.init(_yarnConf)
+ yarnClient.start()
+ yarnClient
}
- override def killApplicationByTag(
- appMgrInfo: ApplicationManagerInfo,
- tag: String): KillResponse = {
- if (yarnClient != null) {
- try {
- val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
- if (reports.isEmpty) {
- (false, NOT_FOUND)
- } else {
+ private def withYarnClient[T](proxyUser: Option[String])(action: YarnClient
=> T): T = {
+ (adminYarnClient, proxyUser) match {
+ case (Some(yarnClient), _) =>
+ action(yarnClient)
+ case (None, Some(user)) =>
+ Utils.doAs(user) { () =>
+ var yarnClient: YarnClient = null
try {
- val applicationId = reports.get(0).getApplicationId
- yarnClient.killApplication(applicationId)
- (true, s"Succeeded to terminate: $applicationId with $tag")
- } catch {
- case e: Exception =>
- (false, s"Failed to terminate application with $tag, due to
${e.getMessage}")
+ yarnClient = createYarnClient(yarnConf)
+ action(yarnClient)
+ } finally {
+ Utils.tryLogNonFatalError(yarnClient.close())
}
}
- } catch {
- case e: Exception =>
- (
- false,
- s"Failed to get while terminating application with tag $tag," +
- s" due to ${e.getMessage}")
- }
- } else {
- throw new IllegalStateException("Methods initialize and isSupported must
be called ahead")
+ case (None, None) =>
+ throw new IllegalStateException("Methods initialize and isSupported
must be called ahead")
}
}
- override def getApplicationInfoByTag(
+ override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean =
+
appMgrInfo.resourceManager.exists(_.toLowerCase(Locale.ROOT).startsWith("yarn"))
+
+ override def killApplicationByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
- submitTime: Option[Long]): ApplicationInfo = {
- if (yarnClient != null) {
- debug(s"Getting application info from Yarn cluster by $tag tag")
+ proxyUser: Option[String] = None): KillResponse =
withYarnClient(proxyUser) { yarnClient =>
+ try {
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
if (reports.isEmpty) {
- debug(s"Application with tag $tag not found")
- submitTime match {
- case Some(_submitTime) =>
- val elapsedTime = System.currentTimeMillis - _submitTime
- if (elapsedTime > submitTimeout) {
- error(s"Can't find target yarn application by tag: $tag, " +
- s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
- ApplicationInfo.NOT_FOUND
- } else {
- warn("Wait for yarn application to be submitted, " +
- s"elapsed time: ${elapsedTime}ms, return UNKNOWN status")
- ApplicationInfo.UNKNOWN
- }
- case _ => ApplicationInfo.NOT_FOUND
- }
+ (false, NOT_FOUND)
} else {
- val report = reports.get(0)
- val info = ApplicationInfo(
- id = report.getApplicationId.toString,
- name = report.getName,
- state = toApplicationState(
- report.getApplicationId.toString,
- report.getYarnApplicationState,
- report.getFinalApplicationStatus),
- url = Option(report.getTrackingUrl),
- error = Option(report.getDiagnostics))
- debug(s"Successfully got application info by $tag: $info")
- info
+ try {
+ val applicationId = reports.get(0).getApplicationId
+ yarnClient.killApplication(applicationId)
+ (true, s"Succeeded to terminate: $applicationId with $tag")
+ } catch {
+ case e: Exception =>
+ (false, s"Failed to terminate application with $tag, due to
${e.getMessage}")
+ }
}
- } else {
- throw new IllegalStateException("Methods initialize and isSupported must
be called ahead")
+ } catch {
+ case e: Exception =>
+ (
+ false,
+ s"Failed to get while terminating application with tag $tag, due to
${e.getMessage}")
}
}
- override def stop(): Unit = {
- if (yarnClient != null) {
- try {
- yarnClient.stop()
- } catch {
- case e: Exception => error(e.getMessage)
+ override def getApplicationInfoByTag(
+ appMgrInfo: ApplicationManagerInfo,
+ tag: String,
+ proxyUser: Option[String] = None,
+ submitTime: Option[Long] = None): ApplicationInfo =
withYarnClient(proxyUser) { yarnClient =>
+ debug(s"Getting application info from Yarn cluster by $tag tag")
+ val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
+ if (reports.isEmpty) {
+ debug(s"Application with tag $tag not found")
+ submitTime match {
+ case Some(_submitTime) =>
+ val elapsedTime = System.currentTimeMillis - _submitTime
+ if (elapsedTime > submitTimeout) {
+ error(s"Can't find target yarn application by tag: $tag, " +
+ s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
+ ApplicationInfo.NOT_FOUND
+ } else {
+ warn("Wait for yarn application to be submitted, " +
+ s"elapsed time: ${elapsedTime}ms, return UNKNOWN status")
+ ApplicationInfo.UNKNOWN
+ }
+ case _ => ApplicationInfo.NOT_FOUND
}
+ } else {
+ val report = reports.get(0)
+ val info = ApplicationInfo(
+ id = report.getApplicationId.toString,
+ name = report.getName,
+ state = toApplicationState(
+ report.getApplicationId.toString,
+ report.getYarnApplicationState,
+ report.getFinalApplicationStatus),
+ url = Option(report.getTrackingUrl),
+ error = Option(report.getDiagnostics))
+ debug(s"Successfully got application info by $tag: $info")
+ info
}
}
+
+ override def stop(): Unit = adminYarnClient.foreach { yarnClient =>
+ Utils.tryLogNonFatalError(yarnClient.stop())
+ }
}
object YarnApplicationOperation extends Logging {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 4ea609540..779dc48ae 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -110,6 +110,7 @@ class BatchJobSubmission(
applicationManager.getApplicationInfo(
builder.appMgrInfo(),
batchId,
+ Some(session.user),
Some(_submitTime))
applicationId(applicationInfo).foreach { _ =>
if (_appStartTime <= 0) {
@@ -124,7 +125,7 @@ class BatchJobSubmission(
}
private[kyuubi] def killBatchApplication(): KillResponse = {
- applicationManager.killApplication(builder.appMgrInfo(), batchId)
+ applicationManager.killApplication(builder.appMgrInfo(), batchId,
Some(session.user))
}
private val applicationCheckInterval =
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index a525ac4b2..76d913a98 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -323,6 +323,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
val batchAppStatus =
sessionManager.applicationManager.getApplicationInfo(
metadata.appMgrInfo,
batchId,
+ Some(userName),
// prevent that the batch be marked as terminated if
application state is NOT_FOUND
Some(metadata.engineOpenTime).filter(_ >
0).orElse(Some(System.currentTimeMillis)))
buildBatch(metadata, batchAppStatus)
@@ -452,9 +453,12 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
}
}
- def forceKill(appMgrInfo: ApplicationManagerInfo, batchId: String):
KillResponse = {
+ def forceKill(
+ appMgrInfo: ApplicationManagerInfo,
+ batchId: String,
+ user: String): KillResponse = {
val (killed, message) = sessionManager.applicationManager
- .killApplication(appMgrInfo, batchId)
+ .killApplication(appMgrInfo, batchId, Some(user))
info(s"Mark batch[$batchId] closed by ${fe.connectionUrl}")
sessionManager.updateMetadata(Metadata(identifier = batchId,
peerInstanceClosed = true))
(killed, message)
@@ -480,7 +484,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
new CloseBatchResponse(false, s"The batch[$metadata] has been
terminated.")
} else {
info(s"Cancel batch[$batchId] with state ${metadata.state} by
killing application")
- val (killed, msg) = forceKill(metadata.appMgrInfo, batchId)
+ val (killed, msg) = forceKill(metadata.appMgrInfo, batchId,
userName)
new CloseBatchResponse(killed, msg)
}
} else if (metadata.kyuubiInstance != fe.connectionUrl) {
@@ -491,12 +495,12 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
} catch {
case e: KyuubiRestException =>
error(s"Error redirecting delete batch[$batchId] to
${metadata.kyuubiInstance}", e)
- val (killed, msg) = forceKill(metadata.appMgrInfo, batchId)
+ val (killed, msg) = forceKill(metadata.appMgrInfo, batchId,
userName)
new CloseBatchResponse(killed, if (killed) msg else
Utils.stringifyException(e))
}
} else { // should not happen, but handle this for safe
warn(s"Something wrong on deleting batch[$batchId], try forcibly
killing application")
- val (killed, msg) = forceKill(metadata.appMgrInfo, batchId)
+ val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, userName)
new CloseBatchResponse(killed, msg)
}
}.getOrElse {