This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new 072998b4f [Improve] support the variable in Flink configuration (#4065)
072998b4f is described below

commit 072998b4f365c7670ec00fa9299f774a7468d8cd
Author: benjobs <[email protected]>
AuthorDate: Sun Sep 15 15:06:08 2024 +0800

    [Improve] support the variable in Flink configuration (#4065)
    
    * [Improve] support the variable in Flink configuration
    
    * [Improve] program args bug fixed.
    
    * [Improve] modal style improvement
---
 .../streampark/common/utils/CommonUtils.java       | 26 -------
 .../streampark/console/core/entity/FlinkEnv.java   | 14 ----
 .../console/core/service/FlinkEnvService.java      |  4 ++
 .../core/service/impl/ApplicationServiceImpl.java  |  4 +-
 .../core/service/impl/FlinkEnvServiceImpl.java     | 26 +++++++
 .../core/service/impl/SavepointServiceImpl.java    | 10 ++-
 .../console/core/task/FlinkK8sWatcherWrapper.java  |  2 +-
 .../src/design/index.less                          | 12 ++++
 .../src/design/public.less                         |  5 --
 .../components/AppView/StartApplicationModal.vue   |  8 +--
 .../components/AppView/StopApplicationModal.vue    |  7 +-
 .../src/views/flink/app/hooks/useApp.tsx           | 10 +--
 .../src/views/flink/app/hooks/useFlinkRender.tsx   |  2 +-
 .../src/views/flink/app/styles/Add.less            |  1 +
 .../flink/variable/components/VariableModal.vue    | 10 +--
 .../views/setting/Alarm/components/AlertModal.vue  |  2 +-
 .../setting/ExternalLink/components/Modal.vue      |  2 +-
 .../views/setting/FlinkHome/components/Modal.vue   | 11 +--
 .../flink/client/bean/SubmitRequest.scala          | 23 +++++-
 .../flink/client/impl/YarnSessionClient.scala      | 14 ++--
 .../flink/client/trait/FlinkClientTrait.scala      | 81 +++++++---------------
 .../flink/core/FlinkTableInitializer.scala         |  1 +
 22 files changed, 121 insertions(+), 154 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java
deleted file mode 100644
index eb022491c..000000000
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.common.utils;
-
-public class CommonUtils {
-  private CommonUtils() {}
-
-  public static String fixedValueBaseVar(String configValue, String jobName) {
-    return configValue.replace("${jobName}", jobName);
-  }
-}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index 4662aeb6e..9ccf90773 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -20,7 +20,6 @@ package org.apache.streampark.console.core.entity;
 import org.apache.streampark.common.conf.FlinkVersion;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
-import org.apache.streampark.common.utils.CommonUtils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApiDetailException;
 
@@ -38,7 +37,6 @@ import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.Date;
 import java.util.Map;
-import java.util.Properties;
 
 @Getter
 @Setter
@@ -115,18 +113,6 @@ public class FlinkEnv implements Serializable {
     return PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
   }
 
-  @JsonIgnore
-  public Properties getFlinkConfig(Application application) {
-    String flinkYamlString = DeflaterUtils.unzipString(flinkConf);
-    Properties flinkConfig = new Properties();
-    Map<String, String> config = 
PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
-    for (Map.Entry<String, String> entry : config.entrySet()) {
-      String value = CommonUtils.fixedValueBaseVar(entry.getValue(), 
application.getJobName());
-      flinkConfig.setProperty(entry.getKey(), value);
-    }
-    return flinkConfig;
-  }
-
   @JsonIgnore
   public FlinkVersion getFlinkVersion() {
     if (this.flinkVersion == null) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
index bbcbe8af8..53930320c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
@@ -18,12 +18,14 @@
 package org.apache.streampark.console.core.service;
 
 import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.FlinkEnv;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.service.IService;
 
 import java.io.IOException;
+import java.util.Properties;
 
 public interface FlinkEnvService extends IService<FlinkEnv> {
 
@@ -97,4 +99,6 @@ public interface FlinkEnvService extends IService<FlinkEnv> {
   void validity(Long id);
 
   IPage<FlinkEnv> findPage(FlinkEnv flinkEnv, RestRequest restRequest);
+
+  Properties getFlinkConfig(FlinkEnv flinkEnv, Application application);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 881706aca..a15aefaee 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1820,7 +1820,9 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
     if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
       String archiveDir =
-          
flinkEnv.getFlinkConfig(application).getProperty(JobManagerOptions.ARCHIVE_DIR.key());
+          flinkEnvService
+              .getFlinkConfig(flinkEnv, application)
+              .getProperty(JobManagerOptions.ARCHIVE_DIR.key());
       if (archiveDir != null) {
         properties.put(JobManagerOptions.ARCHIVE_DIR.key(), archiveDir);
       }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
index 9a71f896c..1f97dd6a0 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
@@ -17,9 +17,12 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.common.util.PropertiesUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.FlinkEnv;
 import org.apache.streampark.console.core.entity.Project;
 import org.apache.streampark.console.core.mapper.FlinkEnvMapper;
@@ -27,6 +30,8 @@ import 
org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 
+import org.apache.commons.lang3.StringUtils;
+
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -40,6 +45,8 @@ import 
org.springframework.transaction.annotation.Transactional;
 import java.io.File;
 import java.io.IOException;
 import java.util.Date;
+import java.util.Map;
+import java.util.Properties;
 
 @Slf4j
 @Service
@@ -164,6 +171,25 @@ public class FlinkEnvServiceImpl extends 
ServiceImpl<FlinkEnvMapper, FlinkEnv>
     return this.baseMapper.findPage(page, flinkEnv);
   }
 
+  @Override
+  public Properties getFlinkConfig(FlinkEnv flinkEnv, Application application) 
{
+    String flinkYamlString = 
DeflaterUtils.unzipString(flinkEnv.getFlinkConf());
+    Properties flinkConfig = new Properties();
+    Map<String, String> config = 
PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
+    for (Map.Entry<String, String> entry : config.entrySet()) {
+      String value = entry.getValue();
+      if (StringUtils.isNotBlank(application.getJobName())) {
+        value =
+            value.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", 
application.getJobName());
+      }
+      if (application.getId() != null) {
+        value = value.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", 
application.getId().toString());
+      }
+      flinkConfig.setProperty(entry.getKey(), value);
+    }
+    return flinkConfig;
+  }
+
   private void checkOrElseAlert(FlinkEnv flinkEnv) {
 
     // 1.check exists
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index ebefe6197..cfc112842 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -156,7 +156,7 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
 
     if (cpThreshold == 0) {
       String flinkConfNumRetained =
-          flinkEnv.getFlinkConfig(application).getProperty(numRetainedKey);
+          flinkEnvService.getFlinkConfig(flinkEnv, 
application).getProperty(numRetainedKey);
       int numRetainedDefaultValue = 1;
       if (flinkConfNumRetained != null) {
         try {
@@ -293,7 +293,7 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
     if (StringUtils.isBlank(savepointPath)) {
       // flink
       FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
-      Properties flinkConfig = flinkEnv.getFlinkConfig(application);
+      Properties flinkConfig = flinkEnvService.getFlinkConfig(flinkEnv, 
application);
       savepointPath =
           flinkConfig.getProperty(
               CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
@@ -306,10 +306,8 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
   @Override
   public String processPath(String path, String jobName, Long jobId) {
     if (StringUtils.isNotBlank(path)) {
-      return path.replaceAll("\\$job(Id|id)", jobId.toString())
-          .replaceAll("\\$\\{job(Id|id)}", jobId.toString())
-          .replaceAll("\\$job(Name|name)", jobName)
-          .replaceAll("\\$\\{job(Name|name)}", jobName);
+      return path.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", 
jobName)
+          .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", jobId.toString());
     }
     return path;
   }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index a09f85ef9..123c399a5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -119,7 +119,7 @@ public class FlinkK8sWatcherWrapper {
 
   public TrackId toTrackId(Application app) {
     FlinkEnv flinkEnv = flinkEnvService.getById(app.getVersionId());
-    Properties properties = flinkEnv.getFlinkConfig(app);
+    Properties properties = flinkEnvService.getFlinkConfig(flinkEnv, app);
 
     Map<String, String> dynamicProperties =
         
PropertiesUtils.extractDynamicPropertiesAsJava(app.getDynamicProperties());
diff --git a/streampark-console/streampark-console-webapp/src/design/index.less 
b/streampark-console/streampark-console-webapp/src/design/index.less
index d787ae632..6cfb2301b 100644
--- a/streampark-console/streampark-console-webapp/src/design/index.less
+++ b/streampark-console/streampark-console-webapp/src/design/index.less
@@ -115,3 +115,15 @@ textarea.ant-input,
 .ant-upload.ant-upload-drag {
   border-radius: 1px !important;
 }
+
+.pop-tip {
+  display: inline-block;
+  margin-top: 5px;
+  color: darkgrey;
+}
+
+[data-theme='dark'] {
+  .pop-tip {
+    color: #666;
+  }
+}
diff --git 
a/streampark-console/streampark-console-webapp/src/design/public.less 
b/streampark-console/streampark-console-webapp/src/design/public.less
index 547c88b53..ba112f7d1 100644
--- a/streampark-console/streampark-console-webapp/src/design/public.less
+++ b/streampark-console/streampark-console-webapp/src/design/public.less
@@ -56,8 +56,3 @@
     opacity: 0.75;
   }
 }
-
-.extra .conf-switch {
-  color: darkgrey;
-  margin-left: 8px;
-}
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
index 6aef8c2f4..7e8e27421 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
@@ -75,8 +75,8 @@
           checkedChildren: 'ON',
           unCheckedChildren: 'OFF',
         },
-        defaultValue: true,
-        afterItem: () => h('span', { class: 'conf-switch' }, 
t('flink.app.view.savepointTip')),
+        defaultValue: receiveData.historySavePoint && 
receiveData.historySavePoint.length,
+        afterItem: () => h('span', { class: 'pop-tip' }, 
t('flink.app.view.savepointTip')),
       },
       {
         field: 'savepointPath',
@@ -86,7 +86,7 @@
             ? 'Select'
             : 'Input',
         afterItem: () =>
-          h('span', { class: 'conf-switch' }, 
handleSavePointTip(receiveData.historySavePoint)),
+          h('span', { class: 'pop-tip' }, 
handleSavePointTip(receiveData.historySavePoint)),
         slot: 'savepoint',
         ifShow: ({ values }) => values.restoreSavepoint,
         required: true,
@@ -99,7 +99,7 @@
           checkedChildren: 'ON',
           unCheckedChildren: 'OFF',
         },
-        afterItem: () => h('span', { class: 'conf-switch' }, 
t('flink.app.view.ignoreRestoredTip')),
+        afterItem: () => h('span', { class: 'pop-tip' }, 
t('flink.app.view.ignoreRestoredTip')),
         defaultValue: false,
         ifShow: ({ values }) => values.restoreSavepoint,
       },
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
index 5056dea35..4b4a09b33 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
@@ -54,9 +54,8 @@
           checkedChildren: 'ON',
           unCheckedChildren: 'OFF',
         },
-        defaultValue: true,
-        afterItem: () =>
-          h('span', { class: 'conf-switch' }, 
t('flink.app.operation.enableSavePoint')),
+        defaultValue: false,
+        afterItem: () => h('span', { class: 'pop-tip' }, 
t('flink.app.operation.enableSavePoint')),
       },
       {
         field: 'customSavepoint',
@@ -78,7 +77,7 @@
         },
         defaultValue: false,
         ifShow: ({ values }) => !!values.triggerSavepoint,
-        afterItem: () => h('span', { class: 'conf-switch' }, 
t('flink.app.operation.enableDrain')),
+        afterItem: () => h('span', { class: 'pop-tip' }, 
t('flink.app.operation.enableDrain')),
       },
     ],
     colon: true,
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
index b0b627545..6ed5bfe92 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
@@ -226,11 +226,7 @@ export const useFlinkApplication = (openStartModal: Fn) => 
{
       ],
       content: () => {
         return (
-          <Form
-            class="!pt-40px"
-            layout='vertical'
-            baseColProps = {{ span: 20, offset: 2 }}
-          >
+          <Form class="!pt-40px" layout="vertical" baseColProps={{ span: 20, 
offset: 2 }}>
             <Form.Item
               label="Job Name"
               validateStatus={unref(validateStatus)}
@@ -321,8 +317,8 @@ export const useFlinkApplication = (openStartModal: Fn) => {
             class="!pt-40px"
             ref={mappingRef}
             name="mappingForm"
-            baseColProps = {{ span: 20, offset: 2 }}
-            layout='vertical'
+            baseColProps={{ span: 20, offset: 2 }}
+            layout="vertical"
             v-model:model={formValue}
           >
             <Form.Item label="Job Name">
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
index 07e222003..61a2ade8f 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
@@ -232,7 +232,7 @@ export const renderOptionsItems = (
             rules={[{ validator: conf.validator }]}
           />
         )}
-        {conf.type === 'switch' && <span 
class="conf-switch">({conf.placeholder})</span>}
+        {conf.type === 'switch' && <span>({conf.placeholder})</span>}
         <p class="conf-desc"> {descriptionFilter(conf)} </p>
       </Form.Item>
     );
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less
index 5159a19b8..60c599971 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less
@@ -75,6 +75,7 @@
   .conf-desc {
     color: darkgrey;
     margin-bottom: 0;
+    margin-top: 5px;
   }
 
   .sql-desc {
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/variable/components/VariableModal.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/variable/components/VariableModal.vue
index f4cdafae5..f2f71ed2e 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/variable/components/VariableModal.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/variable/components/VariableModal.vue
@@ -120,7 +120,7 @@
         },
         defaultValue: false,
         afterItem: () =>
-          h('span', { class: 'conf-switch' }, 
t('flink.variable.form.desensitizationDesc')),
+          h('span', { class: 'pop-tip' }, 
t('flink.variable.form.desensitizationDesc')),
       },
       {
         field: 'description',
@@ -188,11 +188,3 @@
     }
   }
 </script>
-
-<style lang="less">
-  .conf-switch {
-    display: inline-block;
-    margin-top: 10px;
-    color: darkgrey;
-  }
-</style>
diff --git 
a/streampark-console/streampark-console-webapp/src/views/setting/Alarm/components/AlertModal.vue
 
b/streampark-console/streampark-console-webapp/src/views/setting/Alarm/components/AlertModal.vue
index 287f1c123..161639a20 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/setting/Alarm/components/AlertModal.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/setting/Alarm/components/AlertModal.vue
@@ -56,7 +56,7 @@
           allowClear: true,
           placeholder: t('setting.alarm.alertNamePlaceHolder'),
         },
-        afterItem: () => h('span', { class: 'conf-switch' }, 
t('setting.alarm.alertNameTips')),
+        afterItem: () => h('span', { class: 'pop-tip' }, 
t('setting.alarm.alertNameTips')),
         dynamicRules: () => {
           return [
             {
diff --git 
a/streampark-console/streampark-console-webapp/src/views/setting/ExternalLink/components/Modal.vue
 
b/streampark-console/streampark-console-webapp/src/views/setting/ExternalLink/components/Modal.vue
index 786b65a88..10cff60ff 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/setting/ExternalLink/components/Modal.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/setting/ExternalLink/components/Modal.vue
@@ -80,7 +80,7 @@
         afterItem: () =>
           h(
             'span',
-            { class: 'conf-switch' },
+            { class: 'pop-tip' },
             'Supported variables: {job_id}, {yarn_id}, {job_name}, Example: 
https://grafana/flink-monitoring?var-JobId=var-JobId={job_id}',
           ),
         rules: [
diff --git 
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/components/Modal.vue
 
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/components/Modal.vue
index b59a0112b..b39c75a2e 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/components/Modal.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/components/Modal.vue
@@ -42,7 +42,7 @@
           allowClear: true,
         },
         afterItem: () =>
-          h('span', { class: 'conf-switch' }, 
t('setting.flinkHome.operateMessage.flinkNameTips')),
+          h('span', { class: 'pop-tip' }, 
t('setting.flinkHome.operateMessage.flinkNameTips')),
         rules: [
           { required: true, message: 
t('setting.flinkHome.operateMessage.flinkNameIsRequired') },
         ],
@@ -56,7 +56,7 @@
           allowClear: true,
         },
         afterItem: () =>
-          h('span', { class: 'conf-switch' }, 
t('setting.flinkHome.operateMessage.flinkHomeTips')),
+          h('span', { class: 'pop-tip' }, 
t('setting.flinkHome.operateMessage.flinkHomeTips')),
         rules: [
           { required: true, message: 
t('setting.flinkHome.operateMessage.flinkHomeIsRequired') },
         ],
@@ -172,10 +172,3 @@
     </div>
   </BasicModal>
 </template>
-<style lang="less">
-  .conf-switch {
-    display: inline-block;
-    margin-top: 10px;
-    color: darkgrey;
-  }
-</style>
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 86bc2bf8d..49854e74e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -26,6 +26,7 @@ import org.apache.streampark.flink.util.FlinkUtils
 import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper
 
 import org.apache.commons.io.FileUtils
+import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
 import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, 
SavepointRestoreSettings}
 
 import javax.annotation.Nullable
@@ -34,7 +35,7 @@ import java.io.File
 import java.util.{Map => JavaMap}
 
 import scala.collection.JavaConversions._
-import scala.util.Try
+import scala.util.{Success, Try}
 
 case class SubmitRequest(
     flinkVersion: FlinkVersion,
@@ -98,6 +99,26 @@ case class SubmitRequest(
     }
   }
 
+  lazy val flinkDefaultConfiguration: Configuration = {
+    
Try(GlobalConfiguration.loadConfiguration(s"${flinkVersion.flinkHome}/conf")) 
match {
+      case Success(value) =>
+        value
+          .keySet()
+          .foreach(
+            k => {
+              val v = value.getString(k, null)
+              if (v != null) {
+                val result = v
+                  .replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", 
effectiveAppName)
+                  .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", id.toString)
+                value.setString(k, result)
+              }
+            })
+        value
+      case _ => new Configuration()
+    }
+  }
+
   def hasProp(key: String): Boolean = properties.containsKey(key)
 
   def getProp(key: String): Any = properties.get(key)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index db0c3db71..eb49ad4a1 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -184,15 +184,11 @@ object YarnSessionClient extends YarnClientTrait {
     var clusterDescriptor: YarnClusterDescriptor = null
     var client: ClusterClient[ApplicationId] = null
     try {
-      val flinkConfig = 
getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
-      shutDownRequest.properties.foreach(
-        m =>
-          m._2 match {
-            case v if v != null => flinkConfig.setString(m._1, m._2.toString)
-            case _ =>
-          })
-      flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID, 
shutDownRequest.clusterId)
-      flinkConfig.safeSet(DeploymentOptions.TARGET, 
YarnDeploymentTarget.SESSION.getName)
+      val flinkConfig = new Configuration()
+      flinkConfig
+        .safeSet(YarnConfigOptions.APPLICATION_ID, shutDownRequest.clusterId)
+        .safeSet(DeploymentOptions.TARGET, 
YarnDeploymentTarget.SESSION.getName)
+        .safeSet(YarnConfigOptions.APPLICATION_TAGS, "streampark")
       val yarnClusterDescriptor = getYarnClusterDescriptor(flinkConfig)
       val applicationId: ApplicationId = yarnClusterDescriptor._1
       clusterDescriptor = yarnClusterDescriptor._2
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index ff7c7304c..896767ca3 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -114,11 +114,11 @@ trait FlinkClientTrait extends Logger {
       .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
submitRequest.jobId)
 
     if 
(!submitRequest.hasProp(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())) {
-      val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
-        submitRequest.flinkVersion.flinkHome)
       // state.checkpoints.num-retained
       val retainedOption = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS
-      flinkConfig.safeSet(retainedOption, 
flinkDefaultConfiguration.get(retainedOption))
+      flinkConfig.safeSet(
+        retainedOption,
+        submitRequest.flinkDefaultConfiguration.get(retainedOption))
     }
 
     // 2) set savepoint parameter
@@ -279,29 +279,20 @@ trait FlinkClientTrait extends Logger {
     throw new IllegalStateException("No valid command-line found.")
   }
 
-  private[client] def getFlinkDefaultConfiguration(flinkHome: String): 
Configuration = {
-    
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new 
Configuration())
-  }
-
-  private[client] def getOptionFromDefaultFlinkConfig[T](
-      flinkHome: String,
-      option: ConfigOption[T]): T = {
-    getFlinkDefaultConfiguration(flinkHome).get(option)
-  }
-
   private[this] def getCustomCommandLines(flinkHome: String): 
JavaList[CustomCommandLine] = {
-    val flinkDefaultConfiguration: Configuration = 
getFlinkDefaultConfiguration(flinkHome)
     // 1. find the configuration directory
     val configurationDirectory = s"$flinkHome/conf"
     // 2. load the custom command lines
-    loadCustomCommandLines(flinkDefaultConfiguration, configurationDirectory)
+    val flinkConfig =
+      
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new 
Configuration())
+    loadCustomCommandLines(flinkConfig, configurationDirectory)
   }
 
   private[client] def getParallelism(submitRequest: SubmitRequest): Integer = {
     if (submitRequest.properties.containsKey(KEY_FLINK_PARALLELISM())) {
       
Integer.valueOf(submitRequest.properties.get(KEY_FLINK_PARALLELISM()).toString)
     } else {
-      getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
+      submitRequest.flinkDefaultConfiguration
         .getInteger(CoreOptions.DEFAULT_PARALLELISM, 
CoreOptions.DEFAULT_PARALLELISM.defaultValue())
     }
   }
@@ -371,20 +362,20 @@ trait FlinkClientTrait extends Logger {
 
     val commandLine = FlinkRunOption.parse(commandLineOptions, cliArgs, true)
 
-    val activeCommandLine = validateAndGetActiveCommandLine(
-      getCustomCommandLines(submitRequest.flinkVersion.flinkHome),
-      commandLine)
+    val activeCommandLine = {
+      val customCommandLines: JavaList[CustomCommandLine] = {
+        // 1. find the configuration directory
+        val configurationDirectory = 
s"${submitRequest.flinkVersion.flinkHome}/conf"
+        // 2. load the custom command lines
+        loadCustomCommandLines(submitRequest.flinkDefaultConfiguration, 
configurationDirectory)
+      }
+      validateAndGetActiveCommandLine(customCommandLines, commandLine)
+    }
 
-    val configuration =
-      applyConfiguration(
-        submitRequest.flinkVersion.flinkHome,
-        activeCommandLine,
-        commandLine,
-        submitRequest.id.toString,
-        submitRequest.effectiveAppName)
+    val configuration = new 
Configuration(submitRequest.flinkDefaultConfiguration)
+    configuration.addAll(activeCommandLine.toConfiguration(commandLine))
 
     commandLine -> configuration
-
   }
 
   private[client] def getCommandLineOptions(flinkHome: String) = {
@@ -417,10 +408,16 @@ trait FlinkClientTrait extends Logger {
       }
       FlinkRunOption.parse(commandLineOptions, cliArgs, true)
     }
+
     val activeCommandLine =
       validateAndGetActiveCommandLine(getCustomCommandLines(flinkHome), 
commandLine)
-    val flinkConfig = applyConfiguration(flinkHome, activeCommandLine, 
commandLine)
-    flinkConfig
+
+    val flinkDefaultConfiguration =
+      
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new 
Configuration())
+
+    val configuration = new Configuration(flinkDefaultConfiguration)
+    configuration.addAll(activeCommandLine.toConfiguration(commandLine))
+    configuration
   }
 
   private[this] def extractProgramArgs(submitRequest: SubmitRequest): 
JavaList[String] = {
@@ -445,32 +442,6 @@ trait FlinkClientTrait extends Logger {
     Lists.newArrayList(programArgs: _*)
   }
 
-  private[this] def applyConfiguration(
-      flinkHome: String,
-      activeCustomCommandLine: CustomCommandLine,
-      commandLine: CommandLine,
-      jobId: String = null,
-      jobName: String = null): Configuration = {
-
-    require(activeCustomCommandLine != null, "activeCustomCommandLine must not 
be null.")
-    val configuration = new Configuration()
-    val flinkDefaultConfiguration = getFlinkDefaultConfiguration(flinkHome)
-    flinkDefaultConfiguration.keySet.foreach(
-      key => {
-        val value = flinkDefaultConfiguration.getString(key, null)
-        var result = value
-        if (value != null && StringUtils.isNotBlank(jobName)) {
-          result = value.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", 
jobName)
-        }
-        if (jobId != null) {
-          result = result.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", jobId)
-        }
-        configuration.setString(key, result)
-      })
-    configuration.addAll(activeCustomCommandLine.toConfiguration(commandLine))
-    configuration
-  }
-
   implicit private[client] class EnhanceFlinkConfiguration(flinkConfig: 
Configuration) {
     def safeSet[T](option: ConfigOption[T], value: T): Configuration = {
       flinkConfig match {
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 56267cbda..fc2d073fc 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -196,6 +196,7 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
           .mergeWith(ParameterTool.fromMap(appConf))
           .mergeWith(ParameterTool.fromMap(tableConf))
           .mergeWith(ParameterTool.fromMap(sqlConf))
+          .mergeWith(cliParameterTool)
 
         FlinkConfiguration(parameterTool, envConfig, tableConfig)
       }


Reply via email to