This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new 5d9f5fbe6 [Improve] get flink config minor improvement
5d9f5fbe6 is described below

commit 5d9f5fbe6be65b25612002905439e7ebbf2a386b
Author: benjobs <[email protected]>
AuthorDate: Sun Sep 15 20:44:34 2024 +0800

    [Improve] get flink config minor improvement
---
 .../flink/client/bean/SubmitRequest.scala          | 32 +++++++++++++---------
 .../impl/KubernetesNativeSessionClient.scala       |  3 ++
 .../flink/client/impl/YarnSessionClient.scala      |  2 ++
 .../flink/client/trait/FlinkClientTrait.scala      | 14 ++++++++++
 .../flink/core/FlinkStreamTableTrait.scala         |  2 +-
 .../apache/streampark/flink/cli/SqlClient.scala    |  2 +-
 6 files changed, 40 insertions(+), 15 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 49854e74e..4cdcf07c1 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -102,19 +102,25 @@ case class SubmitRequest(
   lazy val flinkDefaultConfiguration: Configuration = {
     
Try(GlobalConfiguration.loadConfiguration(s"${flinkVersion.flinkHome}/conf")) 
match {
       case Success(value) =>
-        value
-          .keySet()
-          .foreach(
-            k => {
-              val v = value.getString(k, null)
-              if (v != null) {
-                val result = v
-                  .replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", 
effectiveAppName)
-                  .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", id.toString)
-                value.setString(k, result)
-              }
-            })
-        value
+        executionMode match {
+          case ExecutionMode.YARN_SESSION | 
ExecutionMode.KUBERNETES_NATIVE_SESSION |
+              ExecutionMode.REMOTE =>
+            value
+          case _ =>
+            value
+              .keySet()
+              .foreach(
+                k => {
+                  val v = value.getString(k, null)
+                  if (v != null) {
+                    val result = v
+                      .replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", 
effectiveAppName)
+                      .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", 
id.toString)
+                    value.setString(k, result)
+                  }
+                })
+            value
+        }
       case _ => new Configuration()
     }
   }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 51e6fd63d..4c3cf5712 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -150,6 +150,9 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
          |""".stripMargin)
 
     val flinkConfig = getFlinkK8sConfig(deployRequest)
+
+    replaceConfig(flinkConfig, "\\$\\{job(Name|name)}|\\$job(Name|name)", 
deployRequest.clusterName)
+
     val kubeClient = 
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
 
     var clusterDescriptor: KubernetesClusterDescriptor = null
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index eb49ad4a1..76d86b8b6 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -77,6 +77,8 @@ object YarnSessionClient extends YarnClientTrait {
       // app name
       .safeSet(YarnConfigOptions.APPLICATION_NAME, deployRequest.clusterName)
 
+    replaceConfig(flinkConfig, "\\$\\{job(Name|name)}|\\$job(Name|name)", 
deployRequest.clusterName)
+
     logInfo(s"""
                
|------------------------------------------------------------------
                |Effective submit configuration: $flinkConfig
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 896767ca3..d741df50f 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -496,4 +496,18 @@ trait FlinkClientTrait extends Logger {
       })
   }
 
+  def replaceConfig(flinkConfig: Configuration, regexp: String, replacement: 
String): Unit = {
+    flinkConfig
+      .keySet()
+      .foreach(
+        k => {
+          val v = flinkConfig.getString(k, null)
+          if (v != null) {
+            val result = v
+              .replaceAll(regexp, replacement)
+            flinkConfig.setString(k, result)
+          }
+        })
+  }
+
 }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
index b0a746005..47acb23ea 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
@@ -72,7 +72,7 @@ abstract class FlinkStreamTableTrait(
 
   /** Recommended to use this Api to start tasks */
   def start(name: String = null): JobExecutionResult = {
-    val appName = parameter.getAppName(name, true)
+    val appName = parameter.getAppName(name, required = true)
     execute(appName)
   }
 
diff --git 
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
 
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index c80d520f9..5410e026a 100644
--- 
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++ 
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -53,7 +53,7 @@ object SqlClient extends App {
 
   private[this] val mode = sets.find(_.operands.head == 
ExecutionOptions.RUNTIME_MODE.key()) match {
     case Some(e) =>
-      // 1) flink sql execution.runtime-mode has highest priority
+      // 1) flink sql execution.runtime-mode has the highest priority
       val m = e.operands(1).toUpperCase()
       arguments += s"--${ExecutionOptions.RUNTIME_MODE.key()} $m"
       m

Reply via email to