This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/branch-1.5 by this push:
     new 98e36de  [KYUUBI #2054] [KYUUBI-1819] Support closing Flink SQL engine 
process
98e36de is described below

commit 98e36deea90c08db0126f79657523d81be1a2788
Author: SteNicholas <[email protected]>
AuthorDate: Tue Mar 8 09:38:02 2022 +0800

    [KYUUBI #2054] [KYUUBI-1819] Support closing Flink SQL engine process
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    Support closing Flink SQL engine process.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2054 from SteNicholas/KYUUBI-1819.
    
    Closes #2054
    
    992bd0b2 [SteNicholas] [KYUUBI-1819] Support closing Flink SQL engine 
process
    69567120 [SteNicholas] [KYUUBI-1819] Support closing Flink SQL engine 
process
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
    (cherry picked from commit 9518724c4667aacbdd750a997647115733eb35fd)
    Signed-off-by: ulysses-you <[email protected]>
---
 .../kyuubi/engine/flink/FlinkProcessBuilder.scala  | 26 ++++++++++++++++++++++
 .../engine/flink/FlinkProcessBuilderSuite.scala    | 21 +++++++++++++++++
 2 files changed, 47 insertions(+)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 7fec4e6..72e4f0d 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -18,9 +18,12 @@
 package org.apache.kyuubi.engine.flink
 
 import java.io.{File, FilenameFilter}
+import java.lang.ProcessBuilder.Redirect
 import java.net.URI
 import java.nio.file.{Files, Paths}
 
+import scala.collection.JavaConverters._
+
 import com.google.common.annotations.VisibleForTesting
 
 import org.apache.kyuubi._
@@ -99,6 +102,29 @@ class FlinkProcessBuilder(
 
   override protected def commands: Array[String] = Array(executable)
 
+  override def killApplication(line: String = 
lastRowsOfLog.toArray.mkString("\n")): String = {
+    "Job ID: .*".r.findFirstIn(line) match {
+      case Some(jobIdLine) =>
+        val jobId = jobIdLine.split("Job ID: ")(1).trim
+        env.get("FLINK_HOME") match {
+          case Some(flinkHome) =>
+            val pb = new ProcessBuilder("/bin/sh", s"$flinkHome/bin/flink", 
"stop", jobId)
+            pb.environment()
+              .putAll(childProcEnv.asJava)
+            pb.redirectError(Redirect.appendTo(engineLog))
+            pb.redirectOutput(Redirect.appendTo(engineLog))
+            val process = pb.start()
+            process.waitFor() match {
+              case id if id != 0 => s"Failed to kill Application $jobId, 
please kill it manually. "
+              case _ => s"Killed Application $jobId successfully. "
+            }
+          case None =>
+            s"FLINK_HOME is not set! Failed to kill Application $jobId, please 
kill it manually."
+        }
+      case None => ""
+    }
+  }
+
   override def toString: String = commands.map {
     case arg if arg.startsWith("--") => s"\\\n\t$arg"
     case arg => arg
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 94c5cb4..ee11254 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -28,4 +28,25 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
     val commands = builder.toString.split(' ')
     assert(commands.exists(_ endsWith "flink-sql-engine.sh"))
   }
+
+  test("kill application") {
+    val processBuilder = new FakeFlinkProcessBuilder(conf) {
+      override protected def env: Map[String, String] = Map("FLINK_HOME" -> "")
+    }
+    val exit1 = processBuilder.killApplication(
+      """
+        |[INFO] SQL update statement has been successfully submitted to the 
cluster:
+        |Job ID: 6b1af540c0c0bb3fcfcad50ac037c862
+        |""".stripMargin)
+    assert(exit1.contains("6b1af540c0c0bb3fcfcad50ac037c862")
+      && !exit1.contains("FLINK_HOME is not set!"))
+
+    val exit2 = processBuilder.killApplication("unknow")
+    assert(exit2.equals(""))
+  }
+}
+
+class FakeFlinkProcessBuilder(config: KyuubiConf)
+  extends FlinkProcessBuilder("fake", config) {
+  override protected def commands: Array[String] = Array("ls")
 }

Reply via email to