This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/branch-1.5 by this push:
new 98e36de [KYUUBI #2054] [KYUUBI-1819] Support closing Flink SQL engine
process
98e36de is described below
commit 98e36deea90c08db0126f79657523d81be1a2788
Author: SteNicholas <[email protected]>
AuthorDate: Tue Mar 8 09:38:02 2022 +0800
[KYUUBI #2054] [KYUUBI-1819] Support closing Flink SQL engine process
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
Support closing Flink SQL engine process.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2054 from SteNicholas/KYUUBI-1819.
Closes #2054
992bd0b2 [SteNicholas] [KYUUBI-1819] Support closing Flink SQL engine
process
69567120 [SteNicholas] [KYUUBI-1819] Support closing Flink SQL engine
process
Authored-by: SteNicholas <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
(cherry picked from commit 9518724c4667aacbdd750a997647115733eb35fd)
Signed-off-by: ulysses-you <[email protected]>
---
.../kyuubi/engine/flink/FlinkProcessBuilder.scala | 26 ++++++++++++++++++++++
.../engine/flink/FlinkProcessBuilderSuite.scala | 21 +++++++++++++++++
2 files changed, 47 insertions(+)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 7fec4e6..72e4f0d 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -18,9 +18,12 @@
package org.apache.kyuubi.engine.flink
import java.io.{File, FilenameFilter}
+import java.lang.ProcessBuilder.Redirect
import java.net.URI
import java.nio.file.{Files, Paths}
+import scala.collection.JavaConverters._
+
import com.google.common.annotations.VisibleForTesting
import org.apache.kyuubi._
@@ -99,6 +102,29 @@ class FlinkProcessBuilder(
override protected def commands: Array[String] = Array(executable)
+ override def killApplication(line: String =
lastRowsOfLog.toArray.mkString("\n")): String = {
+ "Job ID: .*".r.findFirstIn(line) match {
+ case Some(jobIdLine) =>
+ val jobId = jobIdLine.split("Job ID: ")(1).trim
+ env.get("FLINK_HOME") match {
+ case Some(flinkHome) =>
+ val pb = new ProcessBuilder("/bin/sh", s"$flinkHome/bin/flink",
"stop", jobId)
+ pb.environment()
+ .putAll(childProcEnv.asJava)
+ pb.redirectError(Redirect.appendTo(engineLog))
+ pb.redirectOutput(Redirect.appendTo(engineLog))
+ val process = pb.start()
+ process.waitFor() match {
+ case id if id != 0 => s"Failed to kill Application $jobId,
please kill it manually. "
+ case _ => s"Killed Application $jobId successfully. "
+ }
+ case None =>
+ s"FLINK_HOME is not set! Failed to kill Application $jobId, please
kill it manually."
+ }
+ case None => ""
+ }
+ }
+
override def toString: String = commands.map {
case arg if arg.startsWith("--") => s"\\\n\t$arg"
case arg => arg
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 94c5cb4..ee11254 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -28,4 +28,25 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
val commands = builder.toString.split(' ')
assert(commands.exists(_ endsWith "flink-sql-engine.sh"))
}
+
+ test("kill application") {
+ val processBuilder = new FakeFlinkProcessBuilder(conf) {
+ override protected def env: Map[String, String] = Map("FLINK_HOME" -> "")
+ }
+ val exit1 = processBuilder.killApplication(
+ """
+ |[INFO] SQL update statement has been successfully submitted to the
cluster:
+ |Job ID: 6b1af540c0c0bb3fcfcad50ac037c862
+ |""".stripMargin)
+ assert(exit1.contains("6b1af540c0c0bb3fcfcad50ac037c862")
+ && !exit1.contains("FLINK_HOME is not set!"))
+
+ val exit2 = processBuilder.killApplication("unknow")
+ assert(exit2.equals(""))
+ }
+}
+
+class FakeFlinkProcessBuilder(config: KyuubiConf)
+ extends FlinkProcessBuilder("fake", config) {
+ override protected def commands: Array[String] = Array("ls")
}