This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new c844f1872 [KYUUBI #5200] [FLINK] Optimize Flink application name 
generating
c844f1872 is described below

commit c844f18723a4468078025525e93e41f2ae39e13a
Author: Paul Lin <[email protected]>
AuthorDate: Fri Sep 1 03:53:39 2023 +0800

    [KYUUBI #5200] [FLINK] Optimize Flink application name generating
    
    ### _Why are the changes needed?_
    
    The generated application name is not effective in Flink app mode. The PR 
moves the name generating to the `ProcessBuilder`.
    
    The generated app name would be like 
`kyuubi_USER_FLINK_SQL_myuser_default_382c0371-8cc1-4aec-90bd-a2acf4de6fac`.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    No.
    
    Closes #5200 from link3280/flink_app_name.
    
    Closes #5200
    
    bf06d1c16 [Paul Lin] Fix engine name udf test
    6aa09e462 [Paul Lin] Filter out unused conf in app mode
    957d18c42 [Paul Lin] Fix test error in local mode
    eaa5de9b4 [Paul Lin] Fix engine name missing in tests
    109ff46f5 [Paul Lin] Fix test error
    efb1cda82 [Paul Lin] Fix compatibility with YARN and local
    65e6759b2 [Paul Lin] Remove unused import
    49860f65e [Paul Lin] Optimize Flink application name generating
    
    Authored-by: Paul Lin <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/engine/flink/FlinkSQLEngine.scala       | 22 +++++-----------------
 .../engine/flink/WithFlinkSQLEngineOnYarn.scala    |  1 +
 .../flink/operation/FlinkOperationLocalSuite.scala |  3 ++-
 .../operation/FlinkOperationOnYarnSuite.scala      |  2 +-
 .../scala/org/apache/kyuubi/engine/EngineRef.scala |  2 +-
 .../kyuubi/engine/flink/FlinkProcessBuilder.scala  |  5 +++--
 .../engine/flink/FlinkProcessBuilderSuite.scala    |  2 ++
 7 files changed, 15 insertions(+), 22 deletions(-)

diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index 4b147d020..7e4f31f8a 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.engine.flink
 
 import java.io.File
 import java.nio.file.Paths
-import java.time.Instant
 import java.util.concurrent.CountDownLatch
 
 import scala.collection.JavaConverters._
@@ -141,32 +140,21 @@ object FlinkSQLEngine extends Logging {
 
   private def setDeploymentConf(executionTarget: String, flinkConf: 
Configuration): Unit = {
     // forward kyuubi engine variables to flink configuration
-    val instant = Instant.now
-    val engineName = s"kyuubi_${user}_flink_$instant"
-    flinkConf.setString(KYUUBI_ENGINE_NAME, engineName)
+    kyuubiConf.getOption("flink.app.name")
+      .foreach(flinkConf.setString(KYUUBI_ENGINE_NAME, _))
 
-    kyuubiConf.getOption(KYUUBI_SESSION_USER_KEY).foreach(user =>
-      flinkConf.setString(KYUUBI_SESSION_USER_KEY, user))
+    kyuubiConf.getOption(KYUUBI_SESSION_USER_KEY)
+      .foreach(flinkConf.setString(KYUUBI_SESSION_USER_KEY, _))
 
-    // set cluster name for per-job and application mode
     executionTarget match {
       case "yarn-per-job" | "yarn-application" =>
-        if (!flinkConf.containsKey("yarn.application.name")) {
-          val appName = engineName
-          flinkConf.setString("yarn.application.name", appName)
-        }
         if (flinkConf.containsKey("high-availability.cluster-id")) {
           flinkConf.setString(
             "yarn.application.id",
             flinkConf.toMap.get("high-availability.cluster-id"))
         }
-      case "kubernetes-application" =>
-        if (!flinkConf.containsKey("kubernetes.cluster-id")) {
-          val appName = s"kyuubi-${user}-flink-$instant"
-          flinkConf.setString("kubernetes.cluster-id", appName)
-        }
       case other =>
-        debug(s"Skip generating app name for execution target $other")
+        debug(s"Skip setting deployment conf for execution target $other")
     }
   }
 }
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
index 553574e65..49fb947a3 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
@@ -162,6 +162,7 @@ trait WithFlinkSQLEngineOnYarn extends KyuubiFunSuite with 
WithFlinkTestResource
     command += "-t"
     command += "yarn-application"
     command += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}"
+    command += s"-Dyarn.application.name=kyuubi_user_flink_paul"
     command += s"-Dyarn.tags=KYUUBI,$engineRefId"
     command += "-Djobmanager.memory.process.size=1g"
     command += "-Dtaskmanager.memory.process.size=1g"
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
index b8f6768cc..279cbea22 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
@@ -36,6 +36,7 @@ class FlinkOperationLocalSuite extends FlinkOperationSuite
     Map(
       "flink.execution.target" -> "remote",
       "flink.high-availability.cluster-id" -> "flink-mini-cluster",
+      "flink.app.name" -> "kyuubi_connection_flink_paul",
       HA_NAMESPACE.key -> namespace,
       HA_ENGINE_REF_ID.key -> engineRefId,
       ENGINE_TYPE.key -> "FLINK_SQL",
@@ -60,7 +61,7 @@ class FlinkOperationLocalSuite extends FlinkOperationSuite
 
       resultSet = statement.executeQuery("select kyuubi_engine_name() as 
engine_name")
       assert(resultSet.next())
-      
assert(resultSet.getString(1).startsWith(s"kyuubi_${Utils.currentUser}_flink"))
+      assert(resultSet.getString(1).equals(s"kyuubi_connection_flink_paul"))
 
       resultSet = statement.executeQuery("select kyuubi_engine_id() as 
engine_id")
       assert(resultSet.next())
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
index 25e23d82a..401c3b0bd 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
@@ -56,7 +56,7 @@ class FlinkOperationOnYarnSuite extends FlinkOperationSuite
 
       resultSet = statement.executeQuery("select kyuubi_engine_name() as 
engine_name")
       assert(resultSet.next())
-      
assert(resultSet.getString(1).startsWith(s"kyuubi_${Utils.currentUser}_flink"))
+      assert(resultSet.getString(1).equals(s"kyuubi_user_flink_paul"))
 
       resultSet = statement.executeQuery("select kyuubi_engine_id() as 
engine_id")
       assert(resultSet.next())
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 052f1de0f..160e7f39e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -190,7 +190,7 @@ private[kyuubi] class EngineRef(
         conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
         new SparkProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
       case FLINK_SQL =>
-        conf.setIfMissing(FlinkProcessBuilder.YARN_APP_KEY, defaultEngineName)
+        conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName)
         new FlinkProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
       case TRINO =>
         new TrinoProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 3da8d1b1a..716db0fcc 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -106,11 +106,12 @@ class FlinkProcessBuilder(
         buffer += "-t"
         buffer += "yarn-application"
         buffer += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}"
+        buffer += s"-Dyarn.application.name=${conf.getOption(APP_KEY).get}"
         buffer += s"-Dyarn.tags=${conf.getOption(YARN_TAG_KEY).get}"
         buffer += "-Dcontainerized.master.env.FLINK_CONF_DIR=."
 
         val customFlinkConf = conf.getAllWithPrefix("flink", "")
-        customFlinkConf.foreach { case (k, v) =>
+        customFlinkConf.filter(_._1 != "app.name").foreach { case (k, v) =>
           buffer += s"-D$k=$v"
         }
 
@@ -203,7 +204,7 @@ class FlinkProcessBuilder(
 
 object FlinkProcessBuilder {
   final val FLINK_EXEC_FILE = "flink"
-  final val YARN_APP_KEY = "yarn.application.name"
+  final val APP_KEY = "flink.app.name"
   final val YARN_TAG_KEY = "yarn.tags"
   final val FLINK_HADOOP_CLASSPATH_KEY = "FLINK_HADOOP_CLASSPATH"
   final val FLINK_PROXY_USER_KEY = "HADOOP_PROXY_USER"
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 45272618d..25295c374 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -41,6 +41,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
   private def applicationModeConf = KyuubiConf()
     .set("flink.execution.target", "yarn-application")
     .set(ENGINE_FLINK_APPLICATION_JARS, tempUdfJar.toString)
+    .set(APP_KEY, "kyuubi_connection_flink_paul")
     .set("kyuubi.on", "off")
 
   private val tempFlinkHome = Files.createTempDirectory("flink-home").toFile
@@ -85,6 +86,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
       escapePaths(s"${builder.flinkExecutable} run-application ") +
         s"-t yarn-application " +
         
s"-Dyarn.ship-files=.*\\/flink-sql-client.*jar;.*\\/flink-sql-gateway.*jar;$tempUdfJar
 " +
+        s"-Dyarn\\.application\\.name=kyuubi_.* " +
         s"-Dyarn\\.tags=KYUUBI " +
         s"-Dcontainerized\\.master\\.env\\.FLINK_CONF_DIR=\\. " +
         s"-Dexecution.target=yarn-application " +

Reply via email to