This is an automated email from the ASF dual-hosted git repository.

wangzhen pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new 6d0c874a7 [KYUUBI #6344] FlinkProcessBuilder prioritizes user 
configurations
6d0c874a7 is described below

commit 6d0c874a70d80bd6ccae7f49b044374e3b162ef2
Author: wforget <[email protected]>
AuthorDate: Tue May 14 09:54:57 2024 +0800

    [KYUUBI #6344] FlinkProcessBuilder prioritizes user configurations
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #6344
    
    `FlinkProcessBuilder` specifies `yarn.ship-files`, `yarn.application.name` 
and `yarn.tags` configurations of kyuubi platform. Sometimes we also need to 
customize these configurations, so we should prioritize these user 
configurations.
    
    ## Describe Your Solution ๐Ÿ”ง
    
    FlinkProcessBuilder prioritizes user configurations.
    
    ## Types of changes :bookmark:
    
    - [x] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    added new unit test
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [X] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6342 from wForget/hotfix2.
    
    Closes #6344
    
    feca972ca [wforget] address comment
    17df0844d [wforget] fix test and add flink constant
    ece91cc0c [wforget] FlinkProcessBuilder prioritizes user configurations
    
    Authored-by: wforget <[email protected]>
    Signed-off-by: wforget <[email protected]>
    (cherry picked from commit c8e645734b0cee93627741c24d3e9d8ff16a4e11)
    Signed-off-by: wforget <[email protected]>
---
 .../kyuubi/engine/KyuubiApplicationManager.scala   |  5 +++-
 .../kyuubi/engine/flink/FlinkProcessBuilder.scala  | 17 +++++++++---
 .../engine/flink/FlinkProcessBuilderSuite.scala    | 31 ++++++++++++++++++++++
 3 files changed, 49 insertions(+), 4 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 1afdcc3cf..16a06a134 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -115,7 +115,10 @@ object KyuubiApplicationManager {
   }
 
   private def setupFlinkYarnTag(tag: String, conf: KyuubiConf): Unit = {
-    val originalTag = conf.getOption(FlinkProcessBuilder.YARN_TAG_KEY).map(_ + 
",").getOrElse("")
+    val originalTag = conf
+      
.getOption(s"${FlinkProcessBuilder.FLINK_CONF_PREFIX}.${FlinkProcessBuilder.YARN_TAG_KEY}")
+      .orElse(conf.getOption(FlinkProcessBuilder.YARN_TAG_KEY))
+      .map(_ + ",").getOrElse("")
     val newTag = s"${originalTag}KYUUBI" + 
Some(tag).filterNot(_.isEmpty).map("," + _).getOrElse("")
     conf.set(FlinkProcessBuilder.YARN_TAG_KEY, newTag)
   }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 4ae714dee..241b7ec78 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -115,10 +115,15 @@ class FlinkProcessBuilder(
           flinkExtraJars += s"$hiveConfFile"
         }
 
+        val customFlinkConf = conf.getAllWithPrefix(FLINK_CONF_PREFIX, "")
+        // add custom yarn.ship-files
+        flinkExtraJars ++= customFlinkConf.get(YARN_SHIP_FILES_KEY)
+        val yarnAppName = customFlinkConf.get(YARN_APPLICATION_NAME_KEY)
+          .orElse(conf.getOption(APP_KEY))
         buffer += "-t"
         buffer += "yarn-application"
         buffer += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}"
-        buffer += s"-Dyarn.application.name=${conf.getOption(APP_KEY).get}"
+        buffer += s"-Dyarn.application.name=${yarnAppName.get}"
         buffer += s"-Dyarn.tags=${conf.getOption(YARN_TAG_KEY).get}"
         buffer += "-Dcontainerized.master.env.FLINK_CONF_DIR=."
 
@@ -126,8 +131,10 @@ class FlinkProcessBuilder(
           buffer += "-Dcontainerized.master.env.HIVE_CONF_DIR=."
         }
 
-        val customFlinkConf = conf.getAllWithPrefix("flink", "")
-        customFlinkConf.filter(_._1 != "app.name").foreach { case (k, v) =>
+        customFlinkConf.filter { case (k, _) =>
+          !Seq("app.name", YARN_SHIP_FILES_KEY, YARN_APPLICATION_NAME_KEY, 
YARN_TAG_KEY)
+            .contains(k)
+        }.foreach { case (k, v) =>
           buffer += s"-D$k=$v"
         }
 
@@ -213,8 +220,12 @@ class FlinkProcessBuilder(
 
 object FlinkProcessBuilder {
   final val FLINK_EXEC_FILE = "flink"
+  final val FLINK_CONF_PREFIX = "flink"
   final val APP_KEY = "flink.app.name"
   final val YARN_TAG_KEY = "yarn.tags"
+  final val YARN_SHIP_FILES_KEY = "yarn.ship-files"
+  final val YARN_APPLICATION_NAME_KEY = "yarn.application.name"
+
   final val FLINK_HADOOP_CLASSPATH_KEY = "FLINK_HADOOP_CLASSPATH"
   final val FLINK_PROXY_USER_KEY = "HADOOP_PROXY_USER"
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 88d3a2d14..383e132dc 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -167,4 +167,35 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
     }
     matchActualAndExpectedApplicationMode(builder)
   }
+
+  test("user configuration takes priority") {
+    val customShipFiles = "testFile1.jar;testFile2.jar"
+    val customAppName = "testAppName"
+    val customYarnTags = "testTag1,testTag2"
+    val builderConf = applicationModeConf
+    builderConf.set("flink.yarn.ship-files", customShipFiles)
+    builderConf.set("flink.yarn.application.name", customAppName)
+    builderConf.set("flink.yarn.tags", customYarnTags)
+    val builder = new FlinkProcessBuilder("test", true, builderConf) {
+      override def env: Map[String, String] = envWithAllHadoop
+    }
+    val actualCommands = builder.toString
+    // scalastyle:off line.size.limit
+    val expectedCommands =
+      escapePaths(
+        s"""${builder.flinkExecutable} run-application \\\\
+           |\\t-t yarn-application \\\\
+           
|\\t-Dyarn.ship-files=.*flink-sql-client.*jar;.*flink-sql-gateway.*jar;$tempUdfJar;.*hive-site.xml;$customShipFiles
 \\\\
+           |\\t-Dyarn.application.name=$customAppName \\\\
+           |\\t-Dyarn.tags=$customYarnTags,KYUUBI \\\\
+           |\\t-Dcontainerized.master.env.FLINK_CONF_DIR=. \\\\
+           |\\t-Dcontainerized.master.env.HIVE_CONF_DIR=. \\\\
+           |\\t-Dexecution.target=yarn-application \\\\
+           |\\t-c org.apache.kyuubi.engine.flink.FlinkSQLEngine 
.*kyuubi-flink-sql-engine_.*jar""".stripMargin +
+          "(?: \\\\\\n\\t--conf \\S+=\\S+)+")
+    // scalastyle:on line.size.limit
+    val regex = new Regex(expectedCommands)
+    val matcher = regex.pattern.matcher(actualCommands)
+    assert(matcher.matches())
+  }
 }

Reply via email to