This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new ffdd665  [KYUUBI #2104] Kill yarn job using yarn client API when 
kyuubi engine …
ffdd665 is described below

commit ffdd665f79c0bc3145a516a1732ff0b93c61b4e5
Author: jiadongdong <[email protected]>
AuthorDate: Mon Mar 14 10:22:37 2022 +0800

    [KYUUBI #2104] Kill yarn job using yarn client API when kyuubi engine …
    
    …initialization times out and yarn application status is accepted
    
    ### _Why are the changes needed?_
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2110 from 942011334/KYUUBI-2104.
    
    Closes #2104
    
    6bb4f37c [jiadongdong] [KYUUBI #2104] Kill yarn job using yarn client API 
when kyuubi engine initialization times out and yarn application status is 
accepted
    38118aaf [jiadongdong] [KYUUBI #2104] Kill yarn job using yarn client API 
when kyuubi engine initialization times out and yarn application status is 
accepted
    2db9c458 [jiadongdong] Merge branch 'KYUUBI-2104' of 
https://github.com/942011334/incubator-kyuubi into KYUUBI-2104
    549fab4d [jiadongdong] [KYUUBI #2104] Kill yarn job using yarn client API 
when kyuubi engine initialization times out and yarn application status is 
accepted
    65c6c5cf [jiadongdong] [KYUUBI #2104] Kill yarn job using yarn client API 
when kyuubi engine initialization times out and yarn application status is 
accepted
    28665f3d [jiadongdong] [Kyuubi#2104] Kill yarn job using yarn client API 
when kyuubi engine initialization times out and yarn application status is 
accepted
    
    Authored-by: jiadongdong <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 kyuubi-server/pom.xml                              |  6 ++++
 .../kyuubi/engine/spark/SparkProcessBuilder.scala  | 38 +++++++++++++---------
 .../engine/spark/SparkProcessBuilderSuite.scala    | 20 ++++++++----
 pom.xml                                            |  7 ++++
 4 files changed, 49 insertions(+), 22 deletions(-)

diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index e5c0a95..127e578 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -386,6 +386,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.scalatestplus</groupId>
+            <artifactId>mockito-3-4_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
             <scope>test</scope>
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index ba601a7..9959997 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -18,15 +18,15 @@
 package org.apache.kyuubi.engine.spark
 
 import java.io.{File, FilenameFilter, IOException}
-import java.lang.ProcessBuilder.Redirect
 import java.net.URI
 import java.nio.file.{Files, Paths}
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.matching.Regex
 
 import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.apache.hadoop.yarn.client.api.YarnClient
 
 import org.apache.kyuubi._
 import org.apache.kyuubi.config.KyuubiConf
@@ -35,6 +35,7 @@ import org.apache.kyuubi.engine.ProcBuilder
 import org.apache.kyuubi.ha.HighAvailabilityConf
 import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
 import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.util.KyuubiHadoopUtils
 
 class SparkProcessBuilder(
     override val proxyUser: String,
@@ -44,6 +45,10 @@ class SparkProcessBuilder(
 
   import SparkProcessBuilder._
 
+  val yarnClient = getYarnClient
+
+  def getYarnClient: YarnClient = YarnClient.createYarnClient
+
   override protected val executable: String = {
     val sparkHomeOpt = env.get("SPARK_HOME").orElse {
       val cwd = getClass.getProtectionDomain.getCodeSource.getLocation.getPath
@@ -190,20 +195,21 @@ class SparkProcessBuilder(
   override def killApplication(line: String = 
lastRowsOfLog.toArray.mkString("\n")): String =
     YARN_APP_NAME_REGEX.findFirstIn(line) match {
       case Some(appId) =>
-        env.get(KyuubiConf.KYUUBI_HOME) match {
-          case Some(kyuubiHome) =>
-            val pb = new ProcessBuilder("/bin/sh", 
s"$kyuubiHome/bin/stop-application.sh", appId)
-            pb.environment()
-              .putAll(childProcEnv.asJava)
-            pb.redirectError(Redirect.appendTo(engineLog))
-            pb.redirectOutput(Redirect.appendTo(engineLog))
-            val process = pb.start()
-            process.waitFor() match {
-              case id if id != 0 => s"Failed to kill Application $appId, 
please kill it manually. "
-              case _ => s"Killed Application $appId successfully. "
-            }
-          case None =>
-            s"KYUUBI_HOME is not set! Failed to kill Application $appId, 
please kill it manually."
+        try {
+          val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
+          yarnClient.init(hadoopConf)
+          yarnClient.start()
+          val applicationId = ApplicationId.fromString(appId)
+          yarnClient.killApplication(applicationId)
+          s"Killed Application $appId successfully."
+        } catch {
+          case e: Throwable =>
+            s"Failed to kill Application $appId, please kill it manually." +
+              s" Caused by ${e.getMessage}."
+        } finally {
+          if (yarnClient != null) {
+            yarnClient.stop()
+          }
         }
       case None => ""
     }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index a9b3fae..45ff4b3 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -22,7 +22,9 @@ import java.nio.file.{Files, Path, Paths, StandardOpenOption}
 import java.time.Duration
 import java.util.concurrent.{Executors, TimeUnit}
 
+import org.apache.hadoop.yarn.client.api.YarnClient
 import org.scalatest.time.SpanSugar._
+import org.scalatestplus.mockito.MockitoSugar
 
 import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
@@ -31,7 +33,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf
 import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
 import org.apache.kyuubi.service.ServiceUtils
 
-class SparkProcessBuilderSuite extends KerberizedTestHelper {
+class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
   private def conf = KyuubiConf().set("kyuubi.on", "off")
 
   test("spark process builder") {
@@ -240,20 +242,26 @@ class SparkProcessBuilderSuite extends 
KerberizedTestHelper {
   test("kill application") {
     val pb1 = new FakeSparkProcessBuilder(conf) {
       override protected def env: Map[String, String] = Map()
+      override def getYarnClient: YarnClient = mock[YarnClient]
     }
     val exit1 = pb1.killApplication("21/09/30 17:12:47 INFO yarn.Client: " +
       "Application report for application_1593587619692_20149 (state: 
ACCEPTED)")
-    assert(exit1.contains("KYUUBI_HOME is not set!"))
+    assert(exit1.contains("Killed Application application_1593587619692_20149 
successfully."))
 
     val pb2 = new FakeSparkProcessBuilder(conf) {
-      override protected def env: Map[String, String] = Map("KYUUBI_HOME" -> 
"")
+      override protected def env: Map[String, String] = Map()
+      override def getYarnClient: YarnClient = null
     }
     val exit2 = pb2.killApplication("21/09/30 17:12:47 INFO yarn.Client: " +
       "Application report for application_1593587619692_20149 (state: 
ACCEPTED)")
-    assert(exit2.contains("application_1593587619692_20149")
-      && !exit2.contains("KYUUBI_HOME is not set!"))
+    assert(exit2.contains("Failed to kill Application 
application_1593587619692_20149")
+      && exit2.contains("Caused by"))
 
-    val exit3 = pb2.killApplication("unknow")
+    val pb3 = new FakeSparkProcessBuilder(conf) {
+      override protected def env: Map[String, String] = Map()
+      override def getYarnClient: YarnClient = mock[YarnClient]
+    }
+    val exit3 = pb3.killApplication("unknow")
     assert(exit3.equals(""))
   }
 
diff --git a/pom.xml b/pom.xml
index 923e1a4..417eb37 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,6 +134,7 @@
         <prometheus.version>0.14.1</prometheus.version>
         <scalacheck.version>3.2.9.0</scalacheck.version>
         <scalatest.version>3.2.9</scalatest.version>
+        <scalatestplus.version>3.2.9.0</scalatestplus.version>
         <scopt.version>4.0.1</scopt.version>
         <slf4j.version>1.7.35</slf4j.version>
         <thrift.version>0.9.3</thrift.version>
@@ -893,6 +894,12 @@
             </dependency>
 
             <dependency>
+                <groupId>org.scalatestplus</groupId>
+                <artifactId>mockito-3-4_${scala.binary.version}</artifactId>
+                <version>${scalatestplus.version}</version>
+            </dependency>
+
+            <dependency>
                 <groupId>org.apache.hadoop</groupId>
                 <artifactId>hadoop-minikdc</artifactId>
                 <version>${hadoop.version}</version>

Reply via email to