This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new ffdd665 [KYUUBI #2104] Kill yarn job using yarn client API when
kyuubi engine …
ffdd665 is described below
commit ffdd665f79c0bc3145a516a1732ff0b93c61b4e5
Author: jiadongdong <[email protected]>
AuthorDate: Mon Mar 14 10:22:37 2022 +0800
[KYUUBI #2104] Kill yarn job using yarn client API when kyuubi engine …
…initialization times out and yarn application status is accepted
### _Why are the changes needed?_
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2110 from 942011334/KYUUBI-2104.
Closes #2104
6bb4f37c [jiadongdong] [KYUUBI #2104] Kill yarn job using yarn client API
when kyuubi engine initialization times out and yarn application status is
accepted
38118aaf [jiadongdong] [KYUUBI #2104] Kill yarn job using yarn client API
when kyuubi engine initialization times out and yarn application status is
accepted
2db9c458 [jiadongdong] Merge branch 'KYUUBI-2104' of
https://github.com/942011334/incubator-kyuubi into KYUUBI-2104
549fab4d [jiadongdong] [KYUUBI #2104] Kill yarn job using yarn client API
when kyuubi engine initialization times out and yarn application status is
accepted
65c6c5cf [jiadongdong] [KYUUBI #2104] Kill yarn job using yarn client API
when kyuubi engine initialization times out and yarn application status is
accepted
28665f3d [jiadongdong] [Kyuubi#2104] Kill yarn job using yarn client API
when kyuubi engine initialization times out and yarn application status is
accepted
Authored-by: jiadongdong <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
kyuubi-server/pom.xml | 6 ++++
.../kyuubi/engine/spark/SparkProcessBuilder.scala | 38 +++++++++++++---------
.../engine/spark/SparkProcessBuilderSuite.scala | 20 ++++++++----
pom.xml | 7 ++++
4 files changed, 49 insertions(+), 22 deletions(-)
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index e5c0a95..127e578 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -386,6 +386,12 @@
</dependency>
<dependency>
+ <groupId>org.scalatestplus</groupId>
+ <artifactId>mockito-3-4_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>test</scope>
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index ba601a7..9959997 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -18,15 +18,15 @@
package org.apache.kyuubi.engine.spark
import java.io.{File, FilenameFilter, IOException}
-import java.lang.ProcessBuilder.Redirect
import java.net.URI
import java.nio.file.{Files, Paths}
-import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.matching.Regex
import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
@@ -35,6 +35,7 @@ import org.apache.kyuubi.engine.ProcBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.util.KyuubiHadoopUtils
class SparkProcessBuilder(
override val proxyUser: String,
@@ -44,6 +45,10 @@ class SparkProcessBuilder(
import SparkProcessBuilder._
+ val yarnClient = getYarnClient
+
+ def getYarnClient: YarnClient = YarnClient.createYarnClient
+
override protected val executable: String = {
val sparkHomeOpt = env.get("SPARK_HOME").orElse {
val cwd = getClass.getProtectionDomain.getCodeSource.getLocation.getPath
@@ -190,20 +195,21 @@ class SparkProcessBuilder(
override def killApplication(line: String =
lastRowsOfLog.toArray.mkString("\n")): String =
YARN_APP_NAME_REGEX.findFirstIn(line) match {
case Some(appId) =>
- env.get(KyuubiConf.KYUUBI_HOME) match {
- case Some(kyuubiHome) =>
- val pb = new ProcessBuilder("/bin/sh",
s"$kyuubiHome/bin/stop-application.sh", appId)
- pb.environment()
- .putAll(childProcEnv.asJava)
- pb.redirectError(Redirect.appendTo(engineLog))
- pb.redirectOutput(Redirect.appendTo(engineLog))
- val process = pb.start()
- process.waitFor() match {
- case id if id != 0 => s"Failed to kill Application $appId,
please kill it manually. "
- case _ => s"Killed Application $appId successfully. "
- }
- case None =>
- s"KYUUBI_HOME is not set! Failed to kill Application $appId,
please kill it manually."
+ try {
+ val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
+ yarnClient.init(hadoopConf)
+ yarnClient.start()
+ val applicationId = ApplicationId.fromString(appId)
+ yarnClient.killApplication(applicationId)
+ s"Killed Application $appId successfully."
+ } catch {
+ case e: Throwable =>
+ s"Failed to kill Application $appId, please kill it manually." +
+ s" Caused by ${e.getMessage}."
+ } finally {
+ if (yarnClient != null) {
+ yarnClient.stop()
+ }
}
case None => ""
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index a9b3fae..45ff4b3 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -22,7 +22,9 @@ import java.nio.file.{Files, Path, Paths, StandardOpenOption}
import java.time.Duration
import java.util.concurrent.{Executors, TimeUnit}
+import org.apache.hadoop.yarn.client.api.YarnClient
import org.scalatest.time.SpanSugar._
+import org.scalatestplus.mockito.MockitoSugar
import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
@@ -31,7 +33,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
import org.apache.kyuubi.service.ServiceUtils
-class SparkProcessBuilderSuite extends KerberizedTestHelper {
+class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
private def conf = KyuubiConf().set("kyuubi.on", "off")
test("spark process builder") {
@@ -240,20 +242,26 @@ class SparkProcessBuilderSuite extends
KerberizedTestHelper {
test("kill application") {
val pb1 = new FakeSparkProcessBuilder(conf) {
override protected def env: Map[String, String] = Map()
+ override def getYarnClient: YarnClient = mock[YarnClient]
}
val exit1 = pb1.killApplication("21/09/30 17:12:47 INFO yarn.Client: " +
"Application report for application_1593587619692_20149 (state:
ACCEPTED)")
- assert(exit1.contains("KYUUBI_HOME is not set!"))
+ assert(exit1.contains("Killed Application application_1593587619692_20149
successfully."))
val pb2 = new FakeSparkProcessBuilder(conf) {
- override protected def env: Map[String, String] = Map("KYUUBI_HOME" ->
"")
+ override protected def env: Map[String, String] = Map()
+ override def getYarnClient: YarnClient = null
}
val exit2 = pb2.killApplication("21/09/30 17:12:47 INFO yarn.Client: " +
"Application report for application_1593587619692_20149 (state:
ACCEPTED)")
- assert(exit2.contains("application_1593587619692_20149")
- && !exit2.contains("KYUUBI_HOME is not set!"))
+ assert(exit2.contains("Failed to kill Application
application_1593587619692_20149")
+ && exit2.contains("Caused by"))
- val exit3 = pb2.killApplication("unknow")
+ val pb3 = new FakeSparkProcessBuilder(conf) {
+ override protected def env: Map[String, String] = Map()
+ override def getYarnClient: YarnClient = mock[YarnClient]
+ }
+ val exit3 = pb3.killApplication("unknow")
assert(exit3.equals(""))
}
diff --git a/pom.xml b/pom.xml
index 923e1a4..417eb37 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,6 +134,7 @@
<prometheus.version>0.14.1</prometheus.version>
<scalacheck.version>3.2.9.0</scalacheck.version>
<scalatest.version>3.2.9</scalatest.version>
+ <scalatestplus.version>3.2.9.0</scalatestplus.version>
<scopt.version>4.0.1</scopt.version>
<slf4j.version>1.7.35</slf4j.version>
<thrift.version>0.9.3</thrift.version>
@@ -893,6 +894,12 @@
</dependency>
<dependency>
+ <groupId>org.scalatestplus</groupId>
+ <artifactId>mockito-3-4_${scala.binary.version}</artifactId>
+ <version>${scalatestplus.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<version>${hadoop.version}</version>