This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new aaf7c7422 Support the ${jobName} variable in Flink configuration. 
(#4062)
aaf7c7422 is described below

commit aaf7c7422300cac274ad370ff14c0eda0e05ddf6
Author: Darcy <[email protected]>
AuthorDate: Sun Sep 15 11:43:28 2024 +0800

    Support the ${jobName} variable in Flink configuration. (#4062)
    
    * feature: support jobName var in flink conf
    
    * fix issue
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../streampark/common/utils/CommonUtils.java       | 26 ++++++++++++++++++++++
 .../streampark/console/core/entity/FlinkEnv.java   |  8 +++++--
 .../core/service/impl/ApplicationServiceImpl.java  |  2 +-
 .../core/service/impl/SavepointServiceImpl.java    |  5 +++--
 .../console/core/task/FlinkK8sWatcherWrapper.java  |  2 +-
 .../impl/KubernetesNativeSessionClient.scala       |  1 -
 .../flink/client/trait/FlinkClientTrait.scala      | 20 +++++++++++++----
 7 files changed, 53 insertions(+), 11 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java
new file mode 100644
index 000000000..eb022491c
--- /dev/null
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.utils;
+
+public class CommonUtils {
+  private CommonUtils() {}
+
+  public static String fixedValueBaseVar(String configValue, String jobName) {
+    return configValue.replace("${jobName}", jobName);
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index b1ed53da8..4662aeb6e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.entity;
 import org.apache.streampark.common.conf.FlinkVersion;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.common.utils.CommonUtils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApiDetailException;
 
@@ -115,11 +116,14 @@ public class FlinkEnv implements Serializable {
   }
 
   @JsonIgnore
-  public Properties getFlinkConfig() {
+  public Properties getFlinkConfig(Application application) {
     String flinkYamlString = DeflaterUtils.unzipString(flinkConf);
     Properties flinkConfig = new Properties();
     Map<String, String> config = 
PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
-    flinkConfig.putAll(config);
+    for (Map.Entry<String, String> entry : config.entrySet()) {
+      String value = CommonUtils.fixedValueBaseVar(entry.getValue(), 
application.getJobName());
+      flinkConfig.setProperty(entry.getKey(), value);
+    }
     return flinkConfig;
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 2062baee3..881706aca 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1820,7 +1820,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
     if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
       String archiveDir =
-          
flinkEnv.getFlinkConfig().getProperty(JobManagerOptions.ARCHIVE_DIR.key());
+          
flinkEnv.getFlinkConfig(application).getProperty(JobManagerOptions.ARCHIVE_DIR.key());
       if (archiveDir != null) {
         properties.put(JobManagerOptions.ARCHIVE_DIR.key(), archiveDir);
       }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index a1455b281..ebefe6197 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -155,7 +155,8 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
     }
 
     if (cpThreshold == 0) {
-      String flinkConfNumRetained = 
flinkEnv.getFlinkConfig().getProperty(numRetainedKey);
+      String flinkConfNumRetained =
+          flinkEnv.getFlinkConfig(application).getProperty(numRetainedKey);
       int numRetainedDefaultValue = 1;
       if (flinkConfNumRetained != null) {
         try {
@@ -292,7 +293,7 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
     if (StringUtils.isBlank(savepointPath)) {
       // flink
       FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
-      Properties flinkConfig = flinkEnv.getFlinkConfig();
+      Properties flinkConfig = flinkEnv.getFlinkConfig(application);
       savepointPath =
           flinkConfig.getProperty(
               CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index c449b4e21..a09f85ef9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -119,7 +119,7 @@ public class FlinkK8sWatcherWrapper {
 
   public TrackId toTrackId(Application app) {
     FlinkEnv flinkEnv = flinkEnvService.getById(app.getVersionId());
-    Properties properties = flinkEnv.getFlinkConfig();
+    Properties properties = flinkEnv.getFlinkConfig(app);
 
     Map<String, String> dynamicProperties =
         
PropertiesUtils.extractDynamicPropertiesAsJava(app.getDynamicProperties());
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 4bd37db9a..51e6fd63d 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -27,7 +27,6 @@ import 
org.apache.streampark.flink.kubernetes.KubernetesRetriever
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
 import org.apache.streampark.flink.kubernetes.model.ClusterKey
 
-import io.fabric8.kubernetes.api.model.{Config => _}
 import org.apache.commons.lang3.StringUtils
 import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
 import org.apache.flink.configuration._
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 675b0ea08..ff7c7304c 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -376,7 +376,12 @@ trait FlinkClientTrait extends Logger {
       commandLine)
 
     val configuration =
-      applyConfiguration(submitRequest.flinkVersion.flinkHome, 
activeCommandLine, commandLine)
+      applyConfiguration(
+        submitRequest.flinkVersion.flinkHome,
+        activeCommandLine,
+        commandLine,
+        submitRequest.id.toString,
+        submitRequest.effectiveAppName)
 
     commandLine -> configuration
 
@@ -443,7 +448,9 @@ trait FlinkClientTrait extends Logger {
   private[this] def applyConfiguration(
       flinkHome: String,
       activeCustomCommandLine: CustomCommandLine,
-      commandLine: CommandLine): Configuration = {
+      commandLine: CommandLine,
+      jobId: String = null,
+      jobName: String = null): Configuration = {
 
     require(activeCustomCommandLine != null, "activeCustomCommandLine must not 
be null.")
     val configuration = new Configuration()
@@ -451,9 +458,14 @@ trait FlinkClientTrait extends Logger {
     flinkDefaultConfiguration.keySet.foreach(
       key => {
         val value = flinkDefaultConfiguration.getString(key, null)
-        if (value != null) {
-          configuration.setString(key, value)
+        var result = value
+        if (value != null && StringUtils.isNotBlank(jobName)) {
+          result = value.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", 
jobName)
         }
+        if (jobId != null) {
+          result = result.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", jobId)
+        }
+        configuration.setString(key, result)
       })
     configuration.addAll(activeCustomCommandLine.toConfiguration(commandLine))
     configuration

Reply via email to