This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch k8s-jobId
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 5241b32b9792ee033303740dbef861fabdbebfbe
Author: benjobs <[email protected]>
AuthorDate: Sun Sep 15 12:07:24 2024 +0800

    [Improve] support the variable in Flink configuration
---
 .../streampark/common/utils/CommonUtils.java       | 26 -------
 .../streampark/console/core/entity/FlinkEnv.java   | 14 ----
 .../console/core/service/FlinkEnvService.java      |  4 ++
 .../core/service/impl/ApplicationServiceImpl.java  |  4 +-
 .../core/service/impl/FlinkEnvServiceImpl.java     | 26 +++++++
 .../core/service/impl/SavepointServiceImpl.java    | 10 ++-
 .../console/core/task/FlinkK8sWatcherWrapper.java  |  2 +-
 .../flink/client/bean/SubmitRequest.scala          | 23 +++++-
 .../flink/client/impl/YarnSessionClient.scala      | 14 ++--
 .../flink/client/trait/FlinkClientTrait.scala      | 81 +++++++---------------
 10 files changed, 91 insertions(+), 113 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java
deleted file mode 100644
index eb022491c..000000000
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.common.utils;
-
-public class CommonUtils {
-  private CommonUtils() {}
-
-  public static String fixedValueBaseVar(String configValue, String jobName) {
-    return configValue.replace("${jobName}", jobName);
-  }
-}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index 4662aeb6e..9ccf90773 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -20,7 +20,6 @@ package org.apache.streampark.console.core.entity;
 import org.apache.streampark.common.conf.FlinkVersion;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
-import org.apache.streampark.common.utils.CommonUtils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApiDetailException;
 
@@ -38,7 +37,6 @@ import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.Date;
 import java.util.Map;
-import java.util.Properties;
 
 @Getter
 @Setter
@@ -115,18 +113,6 @@ public class FlinkEnv implements Serializable {
     return PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
   }
 
-  @JsonIgnore
-  public Properties getFlinkConfig(Application application) {
-    String flinkYamlString = DeflaterUtils.unzipString(flinkConf);
-    Properties flinkConfig = new Properties();
-    Map<String, String> config = 
PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
-    for (Map.Entry<String, String> entry : config.entrySet()) {
-      String value = CommonUtils.fixedValueBaseVar(entry.getValue(), 
application.getJobName());
-      flinkConfig.setProperty(entry.getKey(), value);
-    }
-    return flinkConfig;
-  }
-
   @JsonIgnore
   public FlinkVersion getFlinkVersion() {
     if (this.flinkVersion == null) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
index bbcbe8af8..53930320c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
@@ -18,12 +18,14 @@
 package org.apache.streampark.console.core.service;
 
 import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.FlinkEnv;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.service.IService;
 
 import java.io.IOException;
+import java.util.Properties;
 
 public interface FlinkEnvService extends IService<FlinkEnv> {
 
@@ -97,4 +99,6 @@ public interface FlinkEnvService extends IService<FlinkEnv> {
   void validity(Long id);
 
   IPage<FlinkEnv> findPage(FlinkEnv flinkEnv, RestRequest restRequest);
+
+  Properties getFlinkConfig(FlinkEnv flinkEnv, Application application);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 881706aca..a15aefaee 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1820,7 +1820,9 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
     if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
       String archiveDir =
-          
flinkEnv.getFlinkConfig(application).getProperty(JobManagerOptions.ARCHIVE_DIR.key());
+          flinkEnvService
+              .getFlinkConfig(flinkEnv, application)
+              .getProperty(JobManagerOptions.ARCHIVE_DIR.key());
       if (archiveDir != null) {
         properties.put(JobManagerOptions.ARCHIVE_DIR.key(), archiveDir);
       }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
index 9a71f896c..1f97dd6a0 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
@@ -17,9 +17,12 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.common.util.PropertiesUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.FlinkEnv;
 import org.apache.streampark.console.core.entity.Project;
 import org.apache.streampark.console.core.mapper.FlinkEnvMapper;
@@ -27,6 +30,8 @@ import 
org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 
+import org.apache.commons.lang3.StringUtils;
+
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -40,6 +45,8 @@ import 
org.springframework.transaction.annotation.Transactional;
 import java.io.File;
 import java.io.IOException;
 import java.util.Date;
+import java.util.Map;
+import java.util.Properties;
 
 @Slf4j
 @Service
@@ -164,6 +171,25 @@ public class FlinkEnvServiceImpl extends 
ServiceImpl<FlinkEnvMapper, FlinkEnv>
     return this.baseMapper.findPage(page, flinkEnv);
   }
 
+  @Override
+  public Properties getFlinkConfig(FlinkEnv flinkEnv, Application application) 
{
+    String flinkYamlString = 
DeflaterUtils.unzipString(flinkEnv.getFlinkConf());
+    Properties flinkConfig = new Properties();
+    Map<String, String> config = 
PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
+    for (Map.Entry<String, String> entry : config.entrySet()) {
+      String value = entry.getValue();
+      if (StringUtils.isNotBlank(application.getJobName())) {
+        value =
+            value.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", 
application.getJobName());
+      }
+      if (application.getId() != null) {
+        value = value.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", 
application.getId().toString());
+      }
+      flinkConfig.setProperty(entry.getKey(), value);
+    }
+    return flinkConfig;
+  }
+
   private void checkOrElseAlert(FlinkEnv flinkEnv) {
 
     // 1.check exists
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index ebefe6197..cfc112842 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -156,7 +156,7 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
 
     if (cpThreshold == 0) {
       String flinkConfNumRetained =
-          flinkEnv.getFlinkConfig(application).getProperty(numRetainedKey);
+          flinkEnvService.getFlinkConfig(flinkEnv, 
application).getProperty(numRetainedKey);
       int numRetainedDefaultValue = 1;
       if (flinkConfNumRetained != null) {
         try {
@@ -293,7 +293,7 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
     if (StringUtils.isBlank(savepointPath)) {
       // flink
       FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
-      Properties flinkConfig = flinkEnv.getFlinkConfig(application);
+      Properties flinkConfig = flinkEnvService.getFlinkConfig(flinkEnv, 
application);
       savepointPath =
           flinkConfig.getProperty(
               CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
@@ -306,10 +306,8 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
   @Override
   public String processPath(String path, String jobName, Long jobId) {
     if (StringUtils.isNotBlank(path)) {
-      return path.replaceAll("\\$job(Id|id)", jobId.toString())
-          .replaceAll("\\$\\{job(Id|id)}", jobId.toString())
-          .replaceAll("\\$job(Name|name)", jobName)
-          .replaceAll("\\$\\{job(Name|name)}", jobName);
+      return path.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", 
jobName)
+          .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", jobId.toString());
     }
     return path;
   }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index a09f85ef9..123c399a5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -119,7 +119,7 @@ public class FlinkK8sWatcherWrapper {
 
   public TrackId toTrackId(Application app) {
     FlinkEnv flinkEnv = flinkEnvService.getById(app.getVersionId());
-    Properties properties = flinkEnv.getFlinkConfig(app);
+    Properties properties = flinkEnvService.getFlinkConfig(flinkEnv, app);
 
     Map<String, String> dynamicProperties =
         
PropertiesUtils.extractDynamicPropertiesAsJava(app.getDynamicProperties());
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 86bc2bf8d..49854e74e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -26,6 +26,7 @@ import org.apache.streampark.flink.util.FlinkUtils
 import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper
 
 import org.apache.commons.io.FileUtils
+import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
 import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, 
SavepointRestoreSettings}
 
 import javax.annotation.Nullable
@@ -34,7 +35,7 @@ import java.io.File
 import java.util.{Map => JavaMap}
 
 import scala.collection.JavaConversions._
-import scala.util.Try
+import scala.util.{Success, Try}
 
 case class SubmitRequest(
     flinkVersion: FlinkVersion,
@@ -98,6 +99,26 @@ case class SubmitRequest(
     }
   }
 
+  lazy val flinkDefaultConfiguration: Configuration = {
+    
Try(GlobalConfiguration.loadConfiguration(s"${flinkVersion.flinkHome}/conf")) 
match {
+      case Success(value) =>
+        value
+          .keySet()
+          .foreach(
+            k => {
+              val v = value.getString(k, null)
+              if (v != null) {
+                val result = v
+                  .replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", 
effectiveAppName)
+                  .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", id.toString)
+                value.setString(k, result)
+              }
+            })
+        value
+      case _ => new Configuration()
+    }
+  }
+
   def hasProp(key: String): Boolean = properties.containsKey(key)
 
   def getProp(key: String): Any = properties.get(key)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index db0c3db71..eb49ad4a1 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -184,15 +184,11 @@ object YarnSessionClient extends YarnClientTrait {
     var clusterDescriptor: YarnClusterDescriptor = null
     var client: ClusterClient[ApplicationId] = null
     try {
-      val flinkConfig = 
getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
-      shutDownRequest.properties.foreach(
-        m =>
-          m._2 match {
-            case v if v != null => flinkConfig.setString(m._1, m._2.toString)
-            case _ =>
-          })
-      flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID, 
shutDownRequest.clusterId)
-      flinkConfig.safeSet(DeploymentOptions.TARGET, 
YarnDeploymentTarget.SESSION.getName)
+      val flinkConfig = new Configuration()
+      flinkConfig
+        .safeSet(YarnConfigOptions.APPLICATION_ID, shutDownRequest.clusterId)
+        .safeSet(DeploymentOptions.TARGET, 
YarnDeploymentTarget.SESSION.getName)
+        .safeSet(YarnConfigOptions.APPLICATION_TAGS, "streampark")
       val yarnClusterDescriptor = getYarnClusterDescriptor(flinkConfig)
       val applicationId: ApplicationId = yarnClusterDescriptor._1
       clusterDescriptor = yarnClusterDescriptor._2
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index ff7c7304c..896767ca3 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -114,11 +114,11 @@ trait FlinkClientTrait extends Logger {
       .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
submitRequest.jobId)
 
     if 
(!submitRequest.hasProp(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())) {
-      val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
-        submitRequest.flinkVersion.flinkHome)
       // state.checkpoints.num-retained
       val retainedOption = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS
-      flinkConfig.safeSet(retainedOption, 
flinkDefaultConfiguration.get(retainedOption))
+      flinkConfig.safeSet(
+        retainedOption,
+        submitRequest.flinkDefaultConfiguration.get(retainedOption))
     }
 
     // 2) set savepoint parameter
@@ -279,29 +279,20 @@ trait FlinkClientTrait extends Logger {
     throw new IllegalStateException("No valid command-line found.")
   }
 
-  private[client] def getFlinkDefaultConfiguration(flinkHome: String): 
Configuration = {
-    
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new 
Configuration())
-  }
-
-  private[client] def getOptionFromDefaultFlinkConfig[T](
-      flinkHome: String,
-      option: ConfigOption[T]): T = {
-    getFlinkDefaultConfiguration(flinkHome).get(option)
-  }
-
   private[this] def getCustomCommandLines(flinkHome: String): 
JavaList[CustomCommandLine] = {
-    val flinkDefaultConfiguration: Configuration = 
getFlinkDefaultConfiguration(flinkHome)
     // 1. find the configuration directory
     val configurationDirectory = s"$flinkHome/conf"
     // 2. load the custom command lines
-    loadCustomCommandLines(flinkDefaultConfiguration, configurationDirectory)
+    val flinkConfig =
+      
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new 
Configuration())
+    loadCustomCommandLines(flinkConfig, configurationDirectory)
   }
 
   private[client] def getParallelism(submitRequest: SubmitRequest): Integer = {
     if (submitRequest.properties.containsKey(KEY_FLINK_PARALLELISM())) {
       
Integer.valueOf(submitRequest.properties.get(KEY_FLINK_PARALLELISM()).toString)
     } else {
-      getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
+      submitRequest.flinkDefaultConfiguration
         .getInteger(CoreOptions.DEFAULT_PARALLELISM, 
CoreOptions.DEFAULT_PARALLELISM.defaultValue())
     }
   }
@@ -371,20 +362,20 @@ trait FlinkClientTrait extends Logger {
 
     val commandLine = FlinkRunOption.parse(commandLineOptions, cliArgs, true)
 
-    val activeCommandLine = validateAndGetActiveCommandLine(
-      getCustomCommandLines(submitRequest.flinkVersion.flinkHome),
-      commandLine)
+    val activeCommandLine = {
+      val customCommandLines: JavaList[CustomCommandLine] = {
+        // 1. find the configuration directory
+        val configurationDirectory = 
s"${submitRequest.flinkVersion.flinkHome}/conf"
+        // 2. load the custom command lines
+        loadCustomCommandLines(submitRequest.flinkDefaultConfiguration, 
configurationDirectory)
+      }
+      validateAndGetActiveCommandLine(customCommandLines, commandLine)
+    }
 
-    val configuration =
-      applyConfiguration(
-        submitRequest.flinkVersion.flinkHome,
-        activeCommandLine,
-        commandLine,
-        submitRequest.id.toString,
-        submitRequest.effectiveAppName)
+    val configuration = new 
Configuration(submitRequest.flinkDefaultConfiguration)
+    configuration.addAll(activeCommandLine.toConfiguration(commandLine))
 
     commandLine -> configuration
-
   }
 
   private[client] def getCommandLineOptions(flinkHome: String) = {
@@ -417,10 +408,16 @@ trait FlinkClientTrait extends Logger {
       }
       FlinkRunOption.parse(commandLineOptions, cliArgs, true)
     }
+
     val activeCommandLine =
       validateAndGetActiveCommandLine(getCustomCommandLines(flinkHome), 
commandLine)
-    val flinkConfig = applyConfiguration(flinkHome, activeCommandLine, 
commandLine)
-    flinkConfig
+
+    val flinkDefaultConfiguration =
+      
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new 
Configuration())
+
+    val configuration = new Configuration(flinkDefaultConfiguration)
+    configuration.addAll(activeCommandLine.toConfiguration(commandLine))
+    configuration
   }
 
   private[this] def extractProgramArgs(submitRequest: SubmitRequest): 
JavaList[String] = {
@@ -445,32 +442,6 @@ trait FlinkClientTrait extends Logger {
     Lists.newArrayList(programArgs: _*)
   }
 
-  private[this] def applyConfiguration(
-      flinkHome: String,
-      activeCustomCommandLine: CustomCommandLine,
-      commandLine: CommandLine,
-      jobId: String = null,
-      jobName: String = null): Configuration = {
-
-    require(activeCustomCommandLine != null, "activeCustomCommandLine must not 
be null.")
-    val configuration = new Configuration()
-    val flinkDefaultConfiguration = getFlinkDefaultConfiguration(flinkHome)
-    flinkDefaultConfiguration.keySet.foreach(
-      key => {
-        val value = flinkDefaultConfiguration.getString(key, null)
-        var result = value
-        if (value != null && StringUtils.isNotBlank(jobName)) {
-          result = value.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", 
jobName)
-        }
-        if (jobId != null) {
-          result = result.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", jobId)
-        }
-        configuration.setString(key, result)
-      })
-    configuration.addAll(activeCustomCommandLine.toConfiguration(commandLine))
-    configuration
-  }
-
   implicit private[client] class EnhanceFlinkConfiguration(flinkConfig: 
Configuration) {
     def safeSet[T](option: ConfigOption[T], value: T): Configuration = {
       flinkConfig match {

Reply via email to