This is an automated email from the ASF dual-hosted git repository.
wangzhen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new c8e645734 [KYUUBI #6344] FlinkProcessBuilder prioritizes user
configurations
c8e645734 is described below
commit c8e645734b0cee93627741c24d3e9d8ff16a4e11
Author: wforget <[email protected]>
AuthorDate: Tue May 14 09:54:57 2024 +0800
[KYUUBI #6344] FlinkProcessBuilder prioritizes user configurations
# :mag: Description
## Issue References ๐
This pull request fixes #6344
`FlinkProcessBuilder` specifies `yarn.ship-files`, `yarn.application.name`
and `yarn.tags` configurations of kyuubi platform. Sometimes we also need to
customize these configurations, so we should prioritize these user
configurations.
## Describe Your Solution ๐ง
FlinkProcessBuilder prioritizes user configurations.
## Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
added new unit test
---
# Checklist ๐
- [X] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6342 from wForget/hotfix2.
Closes #6344
feca972ca [wforget] address comment
17df0844d [wforget] fix test and add flink constant
ece91cc0c [wforget] FlinkProcessBuilder prioritizes user configurations
Authored-by: wforget <[email protected]>
Signed-off-by: wforget <[email protected]>
---
.../kyuubi/engine/KyuubiApplicationManager.scala | 5 +++-
.../kyuubi/engine/flink/FlinkProcessBuilder.scala | 17 +++++++++---
.../engine/flink/FlinkProcessBuilderSuite.scala | 31 ++++++++++++++++++++++
3 files changed, 49 insertions(+), 4 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 0e914987c..f2887b3e9 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -115,7 +115,10 @@ object KyuubiApplicationManager {
}
private def setupFlinkYarnTag(tag: String, conf: KyuubiConf): Unit = {
- val originalTag = conf.getOption(FlinkProcessBuilder.YARN_TAG_KEY).map(_ +
",").getOrElse("")
+ val originalTag = conf
+
.getOption(s"${FlinkProcessBuilder.FLINK_CONF_PREFIX}.${FlinkProcessBuilder.YARN_TAG_KEY}")
+ .orElse(conf.getOption(FlinkProcessBuilder.YARN_TAG_KEY))
+ .map(_ + ",").getOrElse("")
val newTag = s"${originalTag}KYUUBI" +
Some(tag).filterNot(_.isEmpty).map("," + _).getOrElse("")
conf.set(FlinkProcessBuilder.YARN_TAG_KEY, newTag)
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 4ae714dee..241b7ec78 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -115,10 +115,15 @@ class FlinkProcessBuilder(
flinkExtraJars += s"$hiveConfFile"
}
+ val customFlinkConf = conf.getAllWithPrefix(FLINK_CONF_PREFIX, "")
+ // add custom yarn.ship-files
+ flinkExtraJars ++= customFlinkConf.get(YARN_SHIP_FILES_KEY)
+ val yarnAppName = customFlinkConf.get(YARN_APPLICATION_NAME_KEY)
+ .orElse(conf.getOption(APP_KEY))
buffer += "-t"
buffer += "yarn-application"
buffer += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}"
- buffer += s"-Dyarn.application.name=${conf.getOption(APP_KEY).get}"
+ buffer += s"-Dyarn.application.name=${yarnAppName.get}"
buffer += s"-Dyarn.tags=${conf.getOption(YARN_TAG_KEY).get}"
buffer += "-Dcontainerized.master.env.FLINK_CONF_DIR=."
@@ -126,8 +131,10 @@ class FlinkProcessBuilder(
buffer += "-Dcontainerized.master.env.HIVE_CONF_DIR=."
}
- val customFlinkConf = conf.getAllWithPrefix("flink", "")
- customFlinkConf.filter(_._1 != "app.name").foreach { case (k, v) =>
+ customFlinkConf.filter { case (k, _) =>
+ !Seq("app.name", YARN_SHIP_FILES_KEY, YARN_APPLICATION_NAME_KEY,
YARN_TAG_KEY)
+ .contains(k)
+ }.foreach { case (k, v) =>
buffer += s"-D$k=$v"
}
@@ -213,8 +220,12 @@ class FlinkProcessBuilder(
object FlinkProcessBuilder {
final val FLINK_EXEC_FILE = "flink"
+ final val FLINK_CONF_PREFIX = "flink"
final val APP_KEY = "flink.app.name"
final val YARN_TAG_KEY = "yarn.tags"
+ final val YARN_SHIP_FILES_KEY = "yarn.ship-files"
+ final val YARN_APPLICATION_NAME_KEY = "yarn.application.name"
+
final val FLINK_HADOOP_CLASSPATH_KEY = "FLINK_HADOOP_CLASSPATH"
final val FLINK_PROXY_USER_KEY = "HADOOP_PROXY_USER"
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 39fee1163..952f71c08 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -167,4 +167,35 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
}
matchActualAndExpectedApplicationMode(builder)
}
+
+ test("user configuration takes priority") {
+ val customShipFiles = "testFile1.jar;testFile2.jar"
+ val customAppName = "testAppName"
+ val customYarnTags = "testTag1,testTag2"
+ val builderConf = applicationModeConf
+ builderConf.set("flink.yarn.ship-files", customShipFiles)
+ builderConf.set("flink.yarn.application.name", customAppName)
+ builderConf.set("flink.yarn.tags", customYarnTags)
+ val builder = new FlinkProcessBuilder("test", true, builderConf) {
+ override def env: Map[String, String] = envWithAllHadoop
+ }
+ val actualCommands = builder.toString
+ // scalastyle:off line.size.limit
+ val expectedCommands =
+ escapePaths(
+ s"""${builder.flinkExecutable} run-application \\\\
+ |\\t-t yarn-application \\\\
+
|\\t-Dyarn.ship-files=.*flink-sql-client.*jar;.*flink-sql-gateway.*jar;$tempUdfJar;.*hive-site.xml;$customShipFiles
\\\\
+ |\\t-Dyarn.application.name=$customAppName \\\\
+ |\\t-Dyarn.tags=$customYarnTags,KYUUBI \\\\
+ |\\t-Dcontainerized.master.env.FLINK_CONF_DIR=. \\\\
+ |\\t-Dcontainerized.master.env.HIVE_CONF_DIR=. \\\\
+ |\\t-Dexecution.target=yarn-application \\\\
+ |\\t-c org.apache.kyuubi.engine.flink.FlinkSQLEngine
.*kyuubi-flink-sql-engine_.*jar""".stripMargin +
+ "(?: \\\\\\n\\t--conf \\S+=\\S+)+")
+ // scalastyle:on line.size.limit
+ val regex = new Regex(expectedCommands)
+ val matcher = regex.pattern.matcher(actualCommands)
+ assert(matcher.matches())
+ }
}