This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new 072998b4f [Improve] support the variable in Flink configuration (#4065)
072998b4f is described below
commit 072998b4f365c7670ec00fa9299f774a7468d8cd
Author: benjobs <[email protected]>
AuthorDate: Sun Sep 15 15:06:08 2024 +0800
[Improve] support the variable in Flink configuration (#4065)
* [Improve] support the variable in Flink configuration
* [Improve] program args bug fixed.
* [Improve] modal style improvement
---
.../streampark/common/utils/CommonUtils.java | 26 -------
.../streampark/console/core/entity/FlinkEnv.java | 14 ----
.../console/core/service/FlinkEnvService.java | 4 ++
.../core/service/impl/ApplicationServiceImpl.java | 4 +-
.../core/service/impl/FlinkEnvServiceImpl.java | 26 +++++++
.../core/service/impl/SavepointServiceImpl.java | 10 ++-
.../console/core/task/FlinkK8sWatcherWrapper.java | 2 +-
.../src/design/index.less | 12 ++++
.../src/design/public.less | 5 --
.../components/AppView/StartApplicationModal.vue | 8 +--
.../components/AppView/StopApplicationModal.vue | 7 +-
.../src/views/flink/app/hooks/useApp.tsx | 10 +--
.../src/views/flink/app/hooks/useFlinkRender.tsx | 2 +-
.../src/views/flink/app/styles/Add.less | 1 +
.../flink/variable/components/VariableModal.vue | 10 +--
.../views/setting/Alarm/components/AlertModal.vue | 2 +-
.../setting/ExternalLink/components/Modal.vue | 2 +-
.../views/setting/FlinkHome/components/Modal.vue | 11 +--
.../flink/client/bean/SubmitRequest.scala | 23 +++++-
.../flink/client/impl/YarnSessionClient.scala | 14 ++--
.../flink/client/trait/FlinkClientTrait.scala | 81 +++++++---------------
.../flink/core/FlinkTableInitializer.scala | 1 +
22 files changed, 121 insertions(+), 154 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java
b/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java
deleted file mode 100644
index eb022491c..000000000
---
a/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.common.utils;
-
-public class CommonUtils {
- private CommonUtils() {}
-
- public static String fixedValueBaseVar(String configValue, String jobName) {
- return configValue.replace("${jobName}", jobName);
- }
-}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index 4662aeb6e..9ccf90773 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -20,7 +20,6 @@ package org.apache.streampark.console.core.entity;
import org.apache.streampark.common.conf.FlinkVersion;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.PropertiesUtils;
-import org.apache.streampark.common.utils.CommonUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;
@@ -38,7 +37,6 @@ import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Map;
-import java.util.Properties;
@Getter
@Setter
@@ -115,18 +113,6 @@ public class FlinkEnv implements Serializable {
return PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
}
- @JsonIgnore
- public Properties getFlinkConfig(Application application) {
- String flinkYamlString = DeflaterUtils.unzipString(flinkConf);
- Properties flinkConfig = new Properties();
- Map<String, String> config =
PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
- for (Map.Entry<String, String> entry : config.entrySet()) {
- String value = CommonUtils.fixedValueBaseVar(entry.getValue(),
application.getJobName());
- flinkConfig.setProperty(entry.getKey(), value);
- }
- return flinkConfig;
- }
-
@JsonIgnore
public FlinkVersion getFlinkVersion() {
if (this.flinkVersion == null) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
index bbcbe8af8..53930320c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
@@ -18,12 +18,14 @@
package org.apache.streampark.console.core.service;
import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkEnv;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
import java.io.IOException;
+import java.util.Properties;
public interface FlinkEnvService extends IService<FlinkEnv> {
@@ -97,4 +99,6 @@ public interface FlinkEnvService extends IService<FlinkEnv> {
void validity(Long id);
IPage<FlinkEnv> findPage(FlinkEnv flinkEnv, RestRequest restRequest);
+
+ Properties getFlinkConfig(FlinkEnv flinkEnv, Application application);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 881706aca..a15aefaee 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1820,7 +1820,9 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
String archiveDir =
-
flinkEnv.getFlinkConfig(application).getProperty(JobManagerOptions.ARCHIVE_DIR.key());
+ flinkEnvService
+ .getFlinkConfig(flinkEnv, application)
+ .getProperty(JobManagerOptions.ARCHIVE_DIR.key());
if (archiveDir != null) {
properties.put(JobManagerOptions.ARCHIVE_DIR.key(), archiveDir);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
index 9a71f896c..1f97dd6a0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
@@ -17,9 +17,12 @@
package org.apache.streampark.console.core.service.impl;
+import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.entity.Project;
import org.apache.streampark.console.core.mapper.FlinkEnvMapper;
@@ -27,6 +30,8 @@ import
org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
+import org.apache.commons.lang3.StringUtils;
+
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -40,6 +45,8 @@ import
org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.io.IOException;
import java.util.Date;
+import java.util.Map;
+import java.util.Properties;
@Slf4j
@Service
@@ -164,6 +171,25 @@ public class FlinkEnvServiceImpl extends
ServiceImpl<FlinkEnvMapper, FlinkEnv>
return this.baseMapper.findPage(page, flinkEnv);
}
+ @Override
+ public Properties getFlinkConfig(FlinkEnv flinkEnv, Application application)
{
+ String flinkYamlString =
DeflaterUtils.unzipString(flinkEnv.getFlinkConf());
+ Properties flinkConfig = new Properties();
+ Map<String, String> config =
PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
+ for (Map.Entry<String, String> entry : config.entrySet()) {
+ String value = entry.getValue();
+ if (StringUtils.isNotBlank(application.getJobName())) {
+ value =
+ value.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)",
application.getJobName());
+ }
+ if (application.getId() != null) {
+ value = value.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)",
application.getId().toString());
+ }
+ flinkConfig.setProperty(entry.getKey(), value);
+ }
+ return flinkConfig;
+ }
+
private void checkOrElseAlert(FlinkEnv flinkEnv) {
// 1.check exists
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index ebefe6197..cfc112842 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -156,7 +156,7 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
if (cpThreshold == 0) {
String flinkConfNumRetained =
- flinkEnv.getFlinkConfig(application).getProperty(numRetainedKey);
+ flinkEnvService.getFlinkConfig(flinkEnv,
application).getProperty(numRetainedKey);
int numRetainedDefaultValue = 1;
if (flinkConfNumRetained != null) {
try {
@@ -293,7 +293,7 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
if (StringUtils.isBlank(savepointPath)) {
// flink
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
- Properties flinkConfig = flinkEnv.getFlinkConfig(application);
+ Properties flinkConfig = flinkEnvService.getFlinkConfig(flinkEnv,
application);
savepointPath =
flinkConfig.getProperty(
CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
@@ -306,10 +306,8 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
@Override
public String processPath(String path, String jobName, Long jobId) {
if (StringUtils.isNotBlank(path)) {
- return path.replaceAll("\\$job(Id|id)", jobId.toString())
- .replaceAll("\\$\\{job(Id|id)}", jobId.toString())
- .replaceAll("\\$job(Name|name)", jobName)
- .replaceAll("\\$\\{job(Name|name)}", jobName);
+ return path.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)",
jobName)
+ .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", jobId.toString());
}
return path;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index a09f85ef9..123c399a5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -119,7 +119,7 @@ public class FlinkK8sWatcherWrapper {
public TrackId toTrackId(Application app) {
FlinkEnv flinkEnv = flinkEnvService.getById(app.getVersionId());
- Properties properties = flinkEnv.getFlinkConfig(app);
+ Properties properties = flinkEnvService.getFlinkConfig(flinkEnv, app);
Map<String, String> dynamicProperties =
PropertiesUtils.extractDynamicPropertiesAsJava(app.getDynamicProperties());
diff --git a/streampark-console/streampark-console-webapp/src/design/index.less
b/streampark-console/streampark-console-webapp/src/design/index.less
index d787ae632..6cfb2301b 100644
--- a/streampark-console/streampark-console-webapp/src/design/index.less
+++ b/streampark-console/streampark-console-webapp/src/design/index.less
@@ -115,3 +115,15 @@ textarea.ant-input,
.ant-upload.ant-upload-drag {
border-radius: 1px !important;
}
+
+.pop-tip {
+ display: inline-block;
+ margin-top: 5px;
+ color: darkgrey;
+}
+
+[data-theme='dark'] {
+ .pop-tip {
+ color: #666;
+ }
+}
diff --git
a/streampark-console/streampark-console-webapp/src/design/public.less
b/streampark-console/streampark-console-webapp/src/design/public.less
index 547c88b53..ba112f7d1 100644
--- a/streampark-console/streampark-console-webapp/src/design/public.less
+++ b/streampark-console/streampark-console-webapp/src/design/public.less
@@ -56,8 +56,3 @@
opacity: 0.75;
}
}
-
-.extra .conf-switch {
- color: darkgrey;
- margin-left: 8px;
-}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
index 6aef8c2f4..7e8e27421 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
@@ -75,8 +75,8 @@
checkedChildren: 'ON',
unCheckedChildren: 'OFF',
},
- defaultValue: true,
- afterItem: () => h('span', { class: 'conf-switch' },
t('flink.app.view.savepointTip')),
+ defaultValue: receiveData.historySavePoint &&
receiveData.historySavePoint.length,
+ afterItem: () => h('span', { class: 'pop-tip' },
t('flink.app.view.savepointTip')),
},
{
field: 'savepointPath',
@@ -86,7 +86,7 @@
? 'Select'
: 'Input',
afterItem: () =>
- h('span', { class: 'conf-switch' },
handleSavePointTip(receiveData.historySavePoint)),
+ h('span', { class: 'pop-tip' },
handleSavePointTip(receiveData.historySavePoint)),
slot: 'savepoint',
ifShow: ({ values }) => values.restoreSavepoint,
required: true,
@@ -99,7 +99,7 @@
checkedChildren: 'ON',
unCheckedChildren: 'OFF',
},
- afterItem: () => h('span', { class: 'conf-switch' },
t('flink.app.view.ignoreRestoredTip')),
+ afterItem: () => h('span', { class: 'pop-tip' },
t('flink.app.view.ignoreRestoredTip')),
defaultValue: false,
ifShow: ({ values }) => values.restoreSavepoint,
},
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
index 5056dea35..4b4a09b33 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
@@ -54,9 +54,8 @@
checkedChildren: 'ON',
unCheckedChildren: 'OFF',
},
- defaultValue: true,
- afterItem: () =>
- h('span', { class: 'conf-switch' },
t('flink.app.operation.enableSavePoint')),
+ defaultValue: false,
+ afterItem: () => h('span', { class: 'pop-tip' },
t('flink.app.operation.enableSavePoint')),
},
{
field: 'customSavepoint',
@@ -78,7 +77,7 @@
},
defaultValue: false,
ifShow: ({ values }) => !!values.triggerSavepoint,
- afterItem: () => h('span', { class: 'conf-switch' },
t('flink.app.operation.enableDrain')),
+ afterItem: () => h('span', { class: 'pop-tip' },
t('flink.app.operation.enableDrain')),
},
],
colon: true,
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
index b0b627545..6ed5bfe92 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
@@ -226,11 +226,7 @@ export const useFlinkApplication = (openStartModal: Fn) =>
{
],
content: () => {
return (
- <Form
- class="!pt-40px"
- layout='vertical'
- baseColProps = {{ span: 20, offset: 2 }}
- >
+ <Form class="!pt-40px" layout="vertical" baseColProps={{ span: 20,
offset: 2 }}>
<Form.Item
label="Job Name"
validateStatus={unref(validateStatus)}
@@ -321,8 +317,8 @@ export const useFlinkApplication = (openStartModal: Fn) => {
class="!pt-40px"
ref={mappingRef}
name="mappingForm"
- baseColProps = {{ span: 20, offset: 2 }}
- layout='vertical'
+ baseColProps={{ span: 20, offset: 2 }}
+ layout="vertical"
v-model:model={formValue}
>
<Form.Item label="Job Name">
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
index 07e222003..61a2ade8f 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
@@ -232,7 +232,7 @@ export const renderOptionsItems = (
rules={[{ validator: conf.validator }]}
/>
)}
- {conf.type === 'switch' && <span
class="conf-switch">({conf.placeholder})</span>}
+ {conf.type === 'switch' && <span>({conf.placeholder})</span>}
<p class="conf-desc"> {descriptionFilter(conf)} </p>
</Form.Item>
);
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less
b/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less
index 5159a19b8..60c599971 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less
@@ -75,6 +75,7 @@
.conf-desc {
color: darkgrey;
margin-bottom: 0;
+ margin-top: 5px;
}
.sql-desc {
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/variable/components/VariableModal.vue
b/streampark-console/streampark-console-webapp/src/views/flink/variable/components/VariableModal.vue
index f4cdafae5..f2f71ed2e 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/variable/components/VariableModal.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/variable/components/VariableModal.vue
@@ -120,7 +120,7 @@
},
defaultValue: false,
afterItem: () =>
- h('span', { class: 'conf-switch' },
t('flink.variable.form.desensitizationDesc')),
+ h('span', { class: 'pop-tip' },
t('flink.variable.form.desensitizationDesc')),
},
{
field: 'description',
@@ -188,11 +188,3 @@
}
}
</script>
-
-<style lang="less">
- .conf-switch {
- display: inline-block;
- margin-top: 10px;
- color: darkgrey;
- }
-</style>
diff --git
a/streampark-console/streampark-console-webapp/src/views/setting/Alarm/components/AlertModal.vue
b/streampark-console/streampark-console-webapp/src/views/setting/Alarm/components/AlertModal.vue
index 287f1c123..161639a20 100644
---
a/streampark-console/streampark-console-webapp/src/views/setting/Alarm/components/AlertModal.vue
+++
b/streampark-console/streampark-console-webapp/src/views/setting/Alarm/components/AlertModal.vue
@@ -56,7 +56,7 @@
allowClear: true,
placeholder: t('setting.alarm.alertNamePlaceHolder'),
},
- afterItem: () => h('span', { class: 'conf-switch' },
t('setting.alarm.alertNameTips')),
+ afterItem: () => h('span', { class: 'pop-tip' },
t('setting.alarm.alertNameTips')),
dynamicRules: () => {
return [
{
diff --git
a/streampark-console/streampark-console-webapp/src/views/setting/ExternalLink/components/Modal.vue
b/streampark-console/streampark-console-webapp/src/views/setting/ExternalLink/components/Modal.vue
index 786b65a88..10cff60ff 100644
---
a/streampark-console/streampark-console-webapp/src/views/setting/ExternalLink/components/Modal.vue
+++
b/streampark-console/streampark-console-webapp/src/views/setting/ExternalLink/components/Modal.vue
@@ -80,7 +80,7 @@
afterItem: () =>
h(
'span',
- { class: 'conf-switch' },
+ { class: 'pop-tip' },
'Supported variables: {job_id}, {yarn_id}, {job_name}, Example:
https://grafana/flink-monitoring?var-JobId=var-JobId={job_id}',
),
rules: [
diff --git
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/components/Modal.vue
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/components/Modal.vue
index b59a0112b..b39c75a2e 100644
---
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/components/Modal.vue
+++
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/components/Modal.vue
@@ -42,7 +42,7 @@
allowClear: true,
},
afterItem: () =>
- h('span', { class: 'conf-switch' },
t('setting.flinkHome.operateMessage.flinkNameTips')),
+ h('span', { class: 'pop-tip' },
t('setting.flinkHome.operateMessage.flinkNameTips')),
rules: [
{ required: true, message:
t('setting.flinkHome.operateMessage.flinkNameIsRequired') },
],
@@ -56,7 +56,7 @@
allowClear: true,
},
afterItem: () =>
- h('span', { class: 'conf-switch' },
t('setting.flinkHome.operateMessage.flinkHomeTips')),
+ h('span', { class: 'pop-tip' },
t('setting.flinkHome.operateMessage.flinkHomeTips')),
rules: [
{ required: true, message:
t('setting.flinkHome.operateMessage.flinkHomeIsRequired') },
],
@@ -172,10 +172,3 @@
</div>
</BasicModal>
</template>
-<style lang="less">
- .conf-switch {
- display: inline-block;
- margin-top: 10px;
- color: darkgrey;
- }
-</style>
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 86bc2bf8d..49854e74e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -26,6 +26,7 @@ import org.apache.streampark.flink.util.FlinkUtils
import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper
import org.apache.commons.io.FileUtils
+import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions,
SavepointRestoreSettings}
import javax.annotation.Nullable
@@ -34,7 +35,7 @@ import java.io.File
import java.util.{Map => JavaMap}
import scala.collection.JavaConversions._
-import scala.util.Try
+import scala.util.{Success, Try}
case class SubmitRequest(
flinkVersion: FlinkVersion,
@@ -98,6 +99,26 @@ case class SubmitRequest(
}
}
+ lazy val flinkDefaultConfiguration: Configuration = {
+
Try(GlobalConfiguration.loadConfiguration(s"${flinkVersion.flinkHome}/conf"))
match {
+ case Success(value) =>
+ value
+ .keySet()
+ .foreach(
+ k => {
+ val v = value.getString(k, null)
+ if (v != null) {
+ val result = v
+ .replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)",
effectiveAppName)
+ .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", id.toString)
+ value.setString(k, result)
+ }
+ })
+ value
+ case _ => new Configuration()
+ }
+ }
+
def hasProp(key: String): Boolean = properties.containsKey(key)
def getProp(key: String): Any = properties.get(key)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index db0c3db71..eb49ad4a1 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -184,15 +184,11 @@ object YarnSessionClient extends YarnClientTrait {
var clusterDescriptor: YarnClusterDescriptor = null
var client: ClusterClient[ApplicationId] = null
try {
- val flinkConfig =
getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
- shutDownRequest.properties.foreach(
- m =>
- m._2 match {
- case v if v != null => flinkConfig.setString(m._1, m._2.toString)
- case _ =>
- })
- flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID,
shutDownRequest.clusterId)
- flinkConfig.safeSet(DeploymentOptions.TARGET,
YarnDeploymentTarget.SESSION.getName)
+ val flinkConfig = new Configuration()
+ flinkConfig
+ .safeSet(YarnConfigOptions.APPLICATION_ID, shutDownRequest.clusterId)
+ .safeSet(DeploymentOptions.TARGET,
YarnDeploymentTarget.SESSION.getName)
+ .safeSet(YarnConfigOptions.APPLICATION_TAGS, "streampark")
val yarnClusterDescriptor = getYarnClusterDescriptor(flinkConfig)
val applicationId: ApplicationId = yarnClusterDescriptor._1
clusterDescriptor = yarnClusterDescriptor._2
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index ff7c7304c..896767ca3 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -114,11 +114,11 @@ trait FlinkClientTrait extends Logger {
.safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
submitRequest.jobId)
if
(!submitRequest.hasProp(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())) {
- val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
- submitRequest.flinkVersion.flinkHome)
// state.checkpoints.num-retained
val retainedOption = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS
- flinkConfig.safeSet(retainedOption,
flinkDefaultConfiguration.get(retainedOption))
+ flinkConfig.safeSet(
+ retainedOption,
+ submitRequest.flinkDefaultConfiguration.get(retainedOption))
}
// 2) set savepoint parameter
@@ -279,29 +279,20 @@ trait FlinkClientTrait extends Logger {
throw new IllegalStateException("No valid command-line found.")
}
- private[client] def getFlinkDefaultConfiguration(flinkHome: String):
Configuration = {
-
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new
Configuration())
- }
-
- private[client] def getOptionFromDefaultFlinkConfig[T](
- flinkHome: String,
- option: ConfigOption[T]): T = {
- getFlinkDefaultConfiguration(flinkHome).get(option)
- }
-
private[this] def getCustomCommandLines(flinkHome: String):
JavaList[CustomCommandLine] = {
- val flinkDefaultConfiguration: Configuration =
getFlinkDefaultConfiguration(flinkHome)
// 1. find the configuration directory
val configurationDirectory = s"$flinkHome/conf"
// 2. load the custom command lines
- loadCustomCommandLines(flinkDefaultConfiguration, configurationDirectory)
+ val flinkConfig =
+
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new
Configuration())
+ loadCustomCommandLines(flinkConfig, configurationDirectory)
}
private[client] def getParallelism(submitRequest: SubmitRequest): Integer = {
if (submitRequest.properties.containsKey(KEY_FLINK_PARALLELISM())) {
Integer.valueOf(submitRequest.properties.get(KEY_FLINK_PARALLELISM()).toString)
} else {
- getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
+ submitRequest.flinkDefaultConfiguration
.getInteger(CoreOptions.DEFAULT_PARALLELISM,
CoreOptions.DEFAULT_PARALLELISM.defaultValue())
}
}
@@ -371,20 +362,20 @@ trait FlinkClientTrait extends Logger {
val commandLine = FlinkRunOption.parse(commandLineOptions, cliArgs, true)
- val activeCommandLine = validateAndGetActiveCommandLine(
- getCustomCommandLines(submitRequest.flinkVersion.flinkHome),
- commandLine)
+ val activeCommandLine = {
+ val customCommandLines: JavaList[CustomCommandLine] = {
+ // 1. find the configuration directory
+ val configurationDirectory =
s"${submitRequest.flinkVersion.flinkHome}/conf"
+ // 2. load the custom command lines
+ loadCustomCommandLines(submitRequest.flinkDefaultConfiguration,
configurationDirectory)
+ }
+ validateAndGetActiveCommandLine(customCommandLines, commandLine)
+ }
- val configuration =
- applyConfiguration(
- submitRequest.flinkVersion.flinkHome,
- activeCommandLine,
- commandLine,
- submitRequest.id.toString,
- submitRequest.effectiveAppName)
+ val configuration = new
Configuration(submitRequest.flinkDefaultConfiguration)
+ configuration.addAll(activeCommandLine.toConfiguration(commandLine))
commandLine -> configuration
-
}
private[client] def getCommandLineOptions(flinkHome: String) = {
@@ -417,10 +408,16 @@ trait FlinkClientTrait extends Logger {
}
FlinkRunOption.parse(commandLineOptions, cliArgs, true)
}
+
val activeCommandLine =
validateAndGetActiveCommandLine(getCustomCommandLines(flinkHome),
commandLine)
- val flinkConfig = applyConfiguration(flinkHome, activeCommandLine,
commandLine)
- flinkConfig
+
+ val flinkDefaultConfiguration =
+
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new
Configuration())
+
+ val configuration = new Configuration(flinkDefaultConfiguration)
+ configuration.addAll(activeCommandLine.toConfiguration(commandLine))
+ configuration
}
private[this] def extractProgramArgs(submitRequest: SubmitRequest):
JavaList[String] = {
@@ -445,32 +442,6 @@ trait FlinkClientTrait extends Logger {
Lists.newArrayList(programArgs: _*)
}
- private[this] def applyConfiguration(
- flinkHome: String,
- activeCustomCommandLine: CustomCommandLine,
- commandLine: CommandLine,
- jobId: String = null,
- jobName: String = null): Configuration = {
-
- require(activeCustomCommandLine != null, "activeCustomCommandLine must not
be null.")
- val configuration = new Configuration()
- val flinkDefaultConfiguration = getFlinkDefaultConfiguration(flinkHome)
- flinkDefaultConfiguration.keySet.foreach(
- key => {
- val value = flinkDefaultConfiguration.getString(key, null)
- var result = value
- if (value != null && StringUtils.isNotBlank(jobName)) {
- result = value.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)",
jobName)
- }
- if (jobId != null) {
- result = result.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", jobId)
- }
- configuration.setString(key, result)
- })
- configuration.addAll(activeCustomCommandLine.toConfiguration(commandLine))
- configuration
- }
-
implicit private[client] class EnhanceFlinkConfiguration(flinkConfig:
Configuration) {
def safeSet[T](option: ConfigOption[T], value: T): Configuration = {
flinkConfig match {
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 56267cbda..fc2d073fc 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -196,6 +196,7 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
.mergeWith(ParameterTool.fromMap(appConf))
.mergeWith(ParameterTool.fromMap(tableConf))
.mergeWith(ParameterTool.fromMap(sqlConf))
+ .mergeWith(cliParameterTool)
FlinkConfiguration(parameterTool, envConfig, tableConfig)
}