This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new c844f1872 [KYUUBI #5200] [FLINK] Optimize Flink application name
generating
c844f1872 is described below
commit c844f18723a4468078025525e93e41f2ae39e13a
Author: Paul Lin <[email protected]>
AuthorDate: Fri Sep 1 03:53:39 2023 +0800
[KYUUBI #5200] [FLINK] Optimize Flink application name generating
### _Why are the changes needed?_
The generated application name is not effective in Flink app mode. The PR
moves the name generating to the `ProcessBuilder`.
The generated app name would be like
`kyuubi_USER_FLINK_SQL_myuser_default_382c0371-8cc1-4aec-90bd-a2acf4de6fac`.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No.
Closes #5200 from link3280/flink_app_name.
Closes #5200
bf06d1c16 [Paul Lin] Fix engine name udf test
6aa09e462 [Paul Lin] Filter out unused conf in app mode
957d18c42 [Paul Lin] Fix test error in local mode
eaa5de9b4 [Paul Lin] Fix engine name missing in tests
109ff46f5 [Paul Lin] Fix test error
efb1cda82 [Paul Lin] Fix compatibility with YARN and local
65e6759b2 [Paul Lin] Remove unused import
49860f65e [Paul Lin] Optimize Flink application name generating
Authored-by: Paul Lin <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/engine/flink/FlinkSQLEngine.scala | 22 +++++-----------------
.../engine/flink/WithFlinkSQLEngineOnYarn.scala | 1 +
.../flink/operation/FlinkOperationLocalSuite.scala | 3 ++-
.../operation/FlinkOperationOnYarnSuite.scala | 2 +-
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 2 +-
.../kyuubi/engine/flink/FlinkProcessBuilder.scala | 5 +++--
.../engine/flink/FlinkProcessBuilderSuite.scala | 2 ++
7 files changed, 15 insertions(+), 22 deletions(-)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index 4b147d020..7e4f31f8a 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.engine.flink
import java.io.File
import java.nio.file.Paths
-import java.time.Instant
import java.util.concurrent.CountDownLatch
import scala.collection.JavaConverters._
@@ -141,32 +140,21 @@ object FlinkSQLEngine extends Logging {
private def setDeploymentConf(executionTarget: String, flinkConf:
Configuration): Unit = {
// forward kyuubi engine variables to flink configuration
- val instant = Instant.now
- val engineName = s"kyuubi_${user}_flink_$instant"
- flinkConf.setString(KYUUBI_ENGINE_NAME, engineName)
+ kyuubiConf.getOption("flink.app.name")
+ .foreach(flinkConf.setString(KYUUBI_ENGINE_NAME, _))
- kyuubiConf.getOption(KYUUBI_SESSION_USER_KEY).foreach(user =>
- flinkConf.setString(KYUUBI_SESSION_USER_KEY, user))
+ kyuubiConf.getOption(KYUUBI_SESSION_USER_KEY)
+ .foreach(flinkConf.setString(KYUUBI_SESSION_USER_KEY, _))
- // set cluster name for per-job and application mode
executionTarget match {
case "yarn-per-job" | "yarn-application" =>
- if (!flinkConf.containsKey("yarn.application.name")) {
- val appName = engineName
- flinkConf.setString("yarn.application.name", appName)
- }
if (flinkConf.containsKey("high-availability.cluster-id")) {
flinkConf.setString(
"yarn.application.id",
flinkConf.toMap.get("high-availability.cluster-id"))
}
- case "kubernetes-application" =>
- if (!flinkConf.containsKey("kubernetes.cluster-id")) {
- val appName = s"kyuubi-${user}-flink-$instant"
- flinkConf.setString("kubernetes.cluster-id", appName)
- }
case other =>
- debug(s"Skip generating app name for execution target $other")
+ debug(s"Skip setting deployment conf for execution target $other")
}
}
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
index 553574e65..49fb947a3 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
@@ -162,6 +162,7 @@ trait WithFlinkSQLEngineOnYarn extends KyuubiFunSuite with
WithFlinkTestResource
command += "-t"
command += "yarn-application"
command += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}"
+ command += s"-Dyarn.application.name=kyuubi_user_flink_paul"
command += s"-Dyarn.tags=KYUUBI,$engineRefId"
command += "-Djobmanager.memory.process.size=1g"
command += "-Dtaskmanager.memory.process.size=1g"
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
index b8f6768cc..279cbea22 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
@@ -36,6 +36,7 @@ class FlinkOperationLocalSuite extends FlinkOperationSuite
Map(
"flink.execution.target" -> "remote",
"flink.high-availability.cluster-id" -> "flink-mini-cluster",
+ "flink.app.name" -> "kyuubi_connection_flink_paul",
HA_NAMESPACE.key -> namespace,
HA_ENGINE_REF_ID.key -> engineRefId,
ENGINE_TYPE.key -> "FLINK_SQL",
@@ -60,7 +61,7 @@ class FlinkOperationLocalSuite extends FlinkOperationSuite
resultSet = statement.executeQuery("select kyuubi_engine_name() as
engine_name")
assert(resultSet.next())
-
assert(resultSet.getString(1).startsWith(s"kyuubi_${Utils.currentUser}_flink"))
+ assert(resultSet.getString(1).equals(s"kyuubi_connection_flink_paul"))
resultSet = statement.executeQuery("select kyuubi_engine_id() as
engine_id")
assert(resultSet.next())
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
index 25e23d82a..401c3b0bd 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
@@ -56,7 +56,7 @@ class FlinkOperationOnYarnSuite extends FlinkOperationSuite
resultSet = statement.executeQuery("select kyuubi_engine_name() as
engine_name")
assert(resultSet.next())
-
assert(resultSet.getString(1).startsWith(s"kyuubi_${Utils.currentUser}_flink"))
+ assert(resultSet.getString(1).equals(s"kyuubi_user_flink_paul"))
resultSet = statement.executeQuery("select kyuubi_engine_id() as
engine_id")
assert(resultSet.next())
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 052f1de0f..160e7f39e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -190,7 +190,7 @@ private[kyuubi] class EngineRef(
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
new SparkProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
case FLINK_SQL =>
- conf.setIfMissing(FlinkProcessBuilder.YARN_APP_KEY, defaultEngineName)
+ conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName)
new FlinkProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
case TRINO =>
new TrinoProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 3da8d1b1a..716db0fcc 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -106,11 +106,12 @@ class FlinkProcessBuilder(
buffer += "-t"
buffer += "yarn-application"
buffer += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}"
+ buffer += s"-Dyarn.application.name=${conf.getOption(APP_KEY).get}"
buffer += s"-Dyarn.tags=${conf.getOption(YARN_TAG_KEY).get}"
buffer += "-Dcontainerized.master.env.FLINK_CONF_DIR=."
val customFlinkConf = conf.getAllWithPrefix("flink", "")
- customFlinkConf.foreach { case (k, v) =>
+ customFlinkConf.filter(_._1 != "app.name").foreach { case (k, v) =>
buffer += s"-D$k=$v"
}
@@ -203,7 +204,7 @@ class FlinkProcessBuilder(
object FlinkProcessBuilder {
final val FLINK_EXEC_FILE = "flink"
- final val YARN_APP_KEY = "yarn.application.name"
+ final val APP_KEY = "flink.app.name"
final val YARN_TAG_KEY = "yarn.tags"
final val FLINK_HADOOP_CLASSPATH_KEY = "FLINK_HADOOP_CLASSPATH"
final val FLINK_PROXY_USER_KEY = "HADOOP_PROXY_USER"
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 45272618d..25295c374 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -41,6 +41,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
private def applicationModeConf = KyuubiConf()
.set("flink.execution.target", "yarn-application")
.set(ENGINE_FLINK_APPLICATION_JARS, tempUdfJar.toString)
+ .set(APP_KEY, "kyuubi_connection_flink_paul")
.set("kyuubi.on", "off")
private val tempFlinkHome = Files.createTempDirectory("flink-home").toFile
@@ -85,6 +86,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
escapePaths(s"${builder.flinkExecutable} run-application ") +
s"-t yarn-application " +
s"-Dyarn.ship-files=.*\\/flink-sql-client.*jar;.*\\/flink-sql-gateway.*jar;$tempUdfJar
" +
+ s"-Dyarn\\.application\\.name=kyuubi_.* " +
s"-Dyarn\\.tags=KYUUBI " +
s"-Dcontainerized\\.master\\.env\\.FLINK_CONF_DIR=\\. " +
s"-Dexecution.target=yarn-application " +