This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch session-cluster in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit d2f67fd60afe82ff65d5cea3ee2e93d0dd95dbc2 Author: benjobs <[email protected]> AuthorDate: Sat Nov 26 02:02:31 2022 +0800 [Bug] flink yarn session cluster bug fixed --- .../core/controller/FlinkClusterController.java | 3 +- .../console/core/entity/Application.java | 9 ++- .../console/core/entity/FlinkCluster.java | 46 +++++++++-- .../console/core/mapper/FlinkClusterMapper.java | 2 +- .../console/core/service/FlinkClusterService.java | 4 +- .../core/service/impl/FlinkClusterServiceImpl.java | 93 +++++++++++----------- .../resources/mapper/core/FlinkClusterMapper.xml | 7 +- .../src/views/flink/app/utils/index.ts | 4 - .../src/views/flink/setting/AddCluster.vue | 15 +--- .../src/views/flink/setting/EditCluster.vue | 16 +--- .../views/flink/setting/hooks/useClusterSetting.ts | 85 ++++++++++++-------- .../flink/submit/bean/DeployRequest.scala | 1 - .../impl/KubernetesNativeSessionSubmit.scala | 3 +- .../flink/submit/impl/YarnSessionSubmit.scala | 1 - 14 files changed, 160 insertions(+), 129 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java index 5927012e5..2482ad13c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java @@ -57,7 +57,7 @@ public class FlinkClusterController { @PostMapping("check") public RestResponse check(FlinkCluster cluster) { - String checkResult = flinkClusterService.check(cluster); + ResponseResult checkResult = flinkClusterService.check(cluster); return RestResponse.success(checkResult); } @@ -78,7 +78,6 @@ public class FlinkClusterController { flinkCluster.setAddress(cluster.getAddress()); flinkCluster.setExecutionMode(cluster.getExecutionMode()); flinkCluster.setDynamicProperties(cluster.getDynamicProperties()); - flinkCluster.setFlameGraph(cluster.getFlameGraph()); flinkCluster.setFlinkImage(cluster.getFlinkImage()); flinkCluster.setOptions(cluster.getOptions()); flinkCluster.setYarnQueue(cluster.getYarnQueue()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index 8be656d51..7765869bd 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -482,9 +482,12 @@ public class Application implements Serializable { @SneakyThrows @SuppressWarnings("unchecked") public Map<String, Object> getOptionMap() { - Map<String, Object> map = JacksonUtils.read(getOptions(), Map.class); - map.entrySet().removeIf(entry -> entry.getValue() == null); - return map; + if (StringUtils.isNotEmpty(this.options)) { + Map<String, Object> map = JacksonUtils.read(this.options, Map.class); + map.entrySet().removeIf(entry -> entry.getValue() == null); + return map; + } + return Collections.emptyMap(); } @JsonIgnore diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java index 9ee264226..965bfbda4 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java @@ -24,6 +24,7 @@ import org.apache.streampark.common.enums.FlinkK8sRestExposedType; import org.apache.streampark.common.enums.ResolveOrder; import org.apache.streampark.common.util.HttpClientUtils; import org.apache.streampark.console.base.util.JacksonUtils; +import org.apache.streampark.console.core.metrics.flink.Overview; import org.apache.streampark.flink.submit.FlinkSubmitter; import com.baomidou.mybatisplus.annotation.IdType; @@ -35,12 +36,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import lombok.Data; import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.CoreOptions; import org.apache.http.client.config.RequestConfig; import java.io.Serializable; import java.net.MalformedURLException; import java.net.URI; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -86,8 +89,6 @@ public class FlinkCluster implements Serializable { private Integer k8sRestExposedType; - private Boolean flameGraph; - private String k8sConf; private Integer resolveOrder; @@ -115,13 +116,18 @@ public class FlinkCluster implements Serializable { @JsonIgnore @SneakyThrows public Map<String, Object> getOptionMap() { - Map<String, Object> map = JacksonUtils.read(getOptions(), Map.class); - if (ExecutionMode.YARN_SESSION.equals(getExecutionModeEnum())) { - map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName); - map.put(ConfigConst.KEY_YARN_APP_QUEUE(), this.yarnQueue); + if (StringUtils.isNotEmpty(this.options)) { + Map<String, Object> map = JacksonUtils.read(this.options, Map.class); + if (ExecutionMode.YARN_SESSION.equals(getExecutionModeEnum())) { + map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName); + if (StringUtils.isNotEmpty(this.yarnQueue)) { + map.put(ConfigConst.KEY_YARN_APP_QUEUE(), this.yarnQueue); + } + } + map.entrySet().removeIf(entry -> entry.getValue() == null); + return map; } - map.entrySet().removeIf(entry -> entry.getValue() == null); - return map; + return Collections.emptyMap(); } @JsonIgnore @@ -159,6 +165,30 @@ public class FlinkCluster implements Serializable { return false; } + public boolean verifyFlinkYarnCluster() { + if (address == null) { + return false; + } + String[] array = address.split(","); + for (String url : array) { + try { + new URI(url); + } catch (Exception ignored) { + return false; + } + try { + String format = "%s/proxy/%s/overview"; + String yarnSessionUrl = String.format(format, url, this.clusterId); + String result = HttpClientUtils.httpGetRequest(yarnSessionUrl, RequestConfig.custom().setConnectTimeout(2000).build()); + JacksonUtils.read(result, Overview.class); + return true; + } catch (Exception ignored) { + // + } + } + return false; + } + @JsonIgnore public Map<String, String> getFlinkConfig() throws MalformedURLException, JsonProcessingException { URI activeAddress = this.getActiveAddress(); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java index 428aa8e99..d9e9b6558 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java @@ -24,7 +24,7 @@ import org.apache.ibatis.annotations.Param; public interface FlinkClusterMapper extends BaseMapper<FlinkCluster> { - Boolean existsByClusterId(@Param("clusterId") String clusterId); + Boolean existsByClusterId(@Param("clusterId") String clusterId, @Param("id") Long id); Boolean existsByClusterName(@Param("clusterName") String clusterName, @Param("id") Long id); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java index a2c525441..2be56cb3e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java @@ -24,7 +24,7 @@ import com.baomidou.mybatisplus.extension.service.IService; public interface FlinkClusterService extends IService<FlinkCluster> { - String check(FlinkCluster flinkCluster); + ResponseResult check(FlinkCluster flinkCluster); ResponseResult create(FlinkCluster flinkCluster); @@ -36,7 +36,7 @@ public interface FlinkClusterService extends IService<FlinkCluster> { ResponseResult shutdown(FlinkCluster flinkCluster); - Boolean existsByClusterId(String clusterId); + Boolean existsByClusterId(String clusterId, Long id); Boolean existsByClusterName(String clusterName, Long id); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index da14fa90a..8217aee40 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -17,11 +17,10 @@ package org.apache.streampark.console.core.service.impl; -import org.apache.streampark.common.enums.ApplicationType; import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.common.util.ThreadUtils; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.console.base.exception.ApiDetailException; import org.apache.streampark.console.core.bean.ResponseResult; import org.apache.streampark.console.core.entity.FlinkCluster; import org.apache.streampark.console.core.entity.FlinkEnv; @@ -49,10 +48,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; -import java.io.Serializable; import java.util.Date; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -84,44 +80,60 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli private SettingService settingService; @Override - public String check(FlinkCluster cluster) { - if (null == cluster.getClusterName() || null == cluster.getExecutionMode()) { - return "error"; - } - //1) Check if name is duplicate, if it already exists + public ResponseResult check(FlinkCluster cluster) { + ResponseResult result = new ResponseResult(); + result.setStatus(0); + + //1) Check name is already exists Boolean existsByClusterName = this.existsByClusterName(cluster.getClusterName(), cluster.getId()); if (existsByClusterName) { - return "exists"; + result.setMsg("clusterName is already exists,please check!"); + result.setStatus(1); + return result; + } + + //2) Check target-cluster is already exists + String clusterId = cluster.getClusterId(); + if (StringUtils.isNotEmpty(clusterId)) { + Boolean existsByClusterId = this.existsByClusterId(clusterId, cluster.getId()); + if (existsByClusterId) { + result.setMsg("the clusterId " + clusterId + " is already exists,please check!"); + result.setStatus(2); + return result; + } } + // 3) Check connection if (ExecutionMode.REMOTE.equals(cluster.getExecutionModeEnum())) { - //2) Check if the connection can be made to - return cluster.verifyConnection() ? "success" : "fail"; + if (!cluster.verifyConnection()) { + result.setMsg("the remote cluster connection failed, please check!"); + result.setStatus(3); + return result; + } + } else if (ExecutionMode.YARN_SESSION.equals(cluster.getExecutionModeEnum())) { + if (cluster.getId() == null && !StringUtils.isAllBlank(cluster.getAddress(), cluster.getClusterId())) { + if (!cluster.verifyFlinkYarnCluster()) { + result.setMsg("the flink cluster connection failed, please check!"); + result.setStatus(4); + return result; + } + } } - return "success"; + return result; } @Override public ResponseResult create(FlinkCluster flinkCluster) { ResponseResult result = new ResponseResult(); - if (StringUtils.isBlank(flinkCluster.getClusterName())) { - result.setMsg("clusterName can't empty!"); - result.setStatus(0); - return result; - } - String clusterId = flinkCluster.getClusterId(); - if (StringUtils.isNoneBlank(clusterId)) { - Boolean existsByClusterId = this.existsByClusterId(clusterId); - if (existsByClusterId) { - result.setMsg("the clusterId" + clusterId + "is already exists,please check!"); - result.setStatus(0); - return result; - } - } flinkCluster.setUserId(commonService.getUserId()); flinkCluster.setCreateTime(new Date()); - // remote mode directly set STARTED - if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) { + if (ExecutionMode.YARN_SESSION.equals(flinkCluster.getExecutionModeEnum())) { + if (StringUtils.isAllBlank(flinkCluster.getAddress(), flinkCluster.getClusterId())) { + flinkCluster.setClusterState(ClusterState.CREATED.getValue()); + } else { + flinkCluster.setClusterState(ClusterState.STARTED.getValue()); + } + } else if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) { flinkCluster.setClusterState(ClusterState.STARTED.getValue()); } else { flinkCluster.setClusterState(ClusterState.CREATED.getValue()); @@ -167,7 +179,6 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli flinkEnv.getFlinkVersion(), flinkCluster.getClusterId(), executionModeEnum, - flinkCluster.getFlameGraph() ? getFlameGraph(flinkCluster) : null, flinkCluster.getProperties(), kubernetesDeployParam ); @@ -176,8 +187,8 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS); if (null != deployResponse) { if (deployResponse.message() == null) { - updateWrapper.set(FlinkCluster::getClusterId, deployResponse.clusterId()); updateWrapper.set(FlinkCluster::getAddress, deployResponse.address()); + updateWrapper.set(FlinkCluster::getClusterId, deployResponse.clusterId()); updateWrapper.set(FlinkCluster::getClusterState, ClusterState.STARTED.getValue()); updateWrapper.set(FlinkCluster::getException, null); update(updateWrapper); @@ -198,8 +209,7 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli updateWrapper.set(FlinkCluster::getException, e.toString()); update(updateWrapper); result.setStatus(0); - result.setMsg("deploy cluster failed, Caused By: " + ExceptionUtils.getStackTrace(e)); - return result; + throw new ApiDetailException(e); } } @@ -277,8 +287,8 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli } @Override - public Boolean existsByClusterId(String clusterId) { - return this.baseMapper.existsByClusterId(clusterId); + public Boolean existsByClusterId(String clusterId, Long id) { + return this.baseMapper.existsByClusterId(clusterId, id); } @Override @@ -307,15 +317,4 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli return result; } - private Map<String, Serializable> getFlameGraph(FlinkCluster flinkCluster) { - Map<String, Serializable> flameGraph = new HashMap<>(8); - flameGraph.put("reporter", "org.apache.streampark.plugin.profiling.reporter.HttpReporter"); - flameGraph.put("type", ApplicationType.STREAMPARK_FLINK.getType()); - flameGraph.put("id", flinkCluster.getId()); - flameGraph.put("url", settingService.getStreamParkAddress().concat("/metrics/report")); - flameGraph.put("token", Utils.uuid()); - flameGraph.put("sampleInterval", 1000 * 60 * 2); - flameGraph.put("metricInterval", 1000 * 60 * 2); - return flameGraph; - } } diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml index 1e1a85c9d..e22a44869 100644 --- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml @@ -47,7 +47,12 @@ <select id="existsByClusterId" resultType="java.lang.Boolean" parameterType="java.lang.String"> select count(1) from t_flink_cluster - where cluster_id=#{clusterId} + <where> + cluster_id=#{clusterId} + <if test="id != null"> + and id <> #{id} + </if> + </where> limit 1 </select> diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts index 21d5d1944..4f1b42492 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts @@ -291,7 +291,3 @@ export const filterOption = (input: string, options: Recordable) => { export function isK8sExecMode(mode: number): boolean { return [ExecModeEnum.KUBERNETES_SESSION, ExecModeEnum.KUBERNETES_APPLICATION].includes(mode); } -// session mode -export function isSessionMode(mode: number): boolean { - return [ExecModeEnum.YARN_SESSION, ExecModeEnum.KUBERNETES_SESSION].includes(mode); -} diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue index 0ee7c4afb..f67f0cbe7 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/AddCluster.vue @@ -50,7 +50,8 @@ const params = handleSubmitParams(values); if (Object.keys(params).length > 0) { const res = await fetchCheckCluster(params); - if (res === 'success') { + const status = parseInt(res.status); + if (status === 0) { const resp = await fetchCreateCluster(params); if (resp.status) { Swal.fire({ @@ -63,18 +64,8 @@ } else { Swal.fire(resp.msg); } - } else if (res === 'exists') { - Swal.fire( - 'Failed', - 'the cluster name: ' + values.clusterName + ' is already exists,please check', - 'error', - ); } else { - Swal.fire( - 'Failed', - 'the address is invalid or connection failure, please check', - 'error', - ); + Swal.fire('Failed', res.msg, 'error'); } } } catch (error) { diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue b/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue index 0b58d3842..32836455e 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/EditCluster.vue @@ -65,7 +65,8 @@ id: cluster.id, }); const res = await fetchCheckCluster(params); - if (res === 'success') { + const status = parseInt(res.status); + if (status === 0) { const resp = await fetchUpdateCluster(params); if (resp.status) { Swal.fire({ @@ -78,18 +79,8 @@ } else { Swal.fire(resp.data.msg); } - } else if (res === 'exists') { - Swal.fire( - 'Failed', - 'the cluster name: ' + values.clusterName + ' is already exists,please check', - 'error', - ); } else { - Swal.fire( - 'Failed', - 'the address is invalid or connection failure, please check', - 'error', - ); + Swal.fire('Failed', res.msg, 'error'); } } } catch (error) { @@ -122,7 +113,6 @@ flinkImage: cluster.flinkImage, serviceAccount: cluster.serviceAccount, k8sConf: cluster.k8sConf, - flameGraph: cluster.flameGraph, k8sNamespace: cluster.k8sNamespace, ...resetParams, }); diff --git a/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts b/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts index 77de7dca3..74698ad46 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/setting/hooks/useClusterSetting.ts @@ -34,7 +34,7 @@ import { fetchK8sNamespaces, fetchSessionClusterIds, } from '/@/api/flink/app/flinkHistory'; -import { handleFormValue, isSessionMode } from '../../app/utils'; +import { handleFormValue } from '../../app/utils'; import { useMessage } from '/@/hooks/web/useMessage'; import { ClusterAddTypeEnum } from '/@/enums/appEnum'; import { useI18n } from '/@/hooks/web/useI18n'; @@ -89,6 +89,22 @@ export const useClusterSetting = () => { } } } + + function isAddExistYarnSession(value: Recordable) { + return ( + value.executionMode == ExecModeEnum.YARN_SESSION && + value.addType == ClusterAddTypeEnum.ADD_EXISTING + ); + } + + // session mode + function isShowInSessionMode(value: Recordable): boolean { + if (value.executionMode == ExecModeEnum.YARN_SESSION) { + return value.addType == ClusterAddTypeEnum.ADD_NEW; + } + return value.executionMode == ExecModeEnum.KUBERNETES_SESSION; + } + const getClusterSchema = computed((): FormSchema[] => { return [ { @@ -127,18 +143,10 @@ export const useClusterSetting = () => { }, rules: [{ required: true, message: 'Flink Version is required' }], }, - { - field: 'yarnQueue', - label: 'Yarn Queue', - component: 'Input', - componentProps: { - placeholder: 'Please enter yarn queue', - }, - ifShow: ({ values }) => values.executionMode == ExecModeEnum.YARN_SESSION, - }, { field: 'addType', label: t('flink.setting.cluster.form.addType'), + ifShow: ({ values }) => values.executionMode == ExecModeEnum.YARN_SESSION, component: 'Select', defaultValue: ClusterAddTypeEnum.ADD_EXISTING, componentProps: { @@ -156,6 +164,17 @@ export const useClusterSetting = () => { ], }, }, + { + field: 'yarnQueue', + label: 'Yarn Queue', + component: 'Input', + componentProps: { + placeholder: 'Please enter yarn queue', + }, + ifShow: ({ values }) => + values.executionMode == ExecModeEnum.YARN_SESSION && + values.addType == ClusterAddTypeEnum.ADD_NEW, + }, { field: 'address', label: 'Address', @@ -168,7 +187,8 @@ export const useClusterSetting = () => { : 'Please enter cluster address, e.g: http://host:port', }; }, - ifShow: ({ values }) => values.addType == ClusterAddTypeEnum.ADD_EXISTING, + ifShow: ({ values }) => + isAddExistYarnSession(values) || values.executionMode == ExecModeEnum.REMOTE, rules: [{ required: true, message: t('flink.setting.cluster.required.address') }], }, { @@ -178,9 +198,7 @@ export const useClusterSetting = () => { componentProps: { placeholder: 'Please enter Yarn Session clusterId', }, - ifShow: ({ values }) => - values.addType == ClusterAddTypeEnum.ADD_EXISTING && - values.executionMode == ExecModeEnum.YARN_SESSION, + ifShow: ({ values }) => isAddExistYarnSession(values), rules: [{ required: true, message: t('flink.setting.cluster.required.clusterId') }], }, { @@ -254,7 +272,7 @@ export const useClusterSetting = () => { { field: 'resolveOrder', label: 'Resolve Order', - ifShow: ({ values }) => isSessionMode(values.executionMode), + ifShow: ({ values }) => isShowInSessionMode(values), component: 'Select', componentProps: { placeholder: 'classloader.resolve-order', options: resolveOrder }, rules: [{ required: true, message: 'Resolve Order is required', type: 'number' }], @@ -262,7 +280,7 @@ export const useClusterSetting = () => { { field: 'slot', label: 'Task Slots', - ifShow: ({ values }) => isSessionMode(values.executionMode), + ifShow: ({ values }) => isShowInSessionMode(values), component: 'InputNumber', componentProps: { placeholder: 'Number of slots per TaskManager', @@ -274,14 +292,14 @@ export const useClusterSetting = () => { { field: 'totalOptions', label: 'Total Memory Options', - ifShow: ({ values }) => isSessionMode(values.executionMode), + ifShow: ({ values }) => isShowInSessionMode(values), component: 'Select', render: (renderCallbackParams) => renderTotalMemory(renderCallbackParams), }, { field: 'totalItem', label: 'totalItem', - ifShow: ({ values }) => isSessionMode(values.executionMode), + ifShow: ({ values }) => isShowInSessionMode(values), component: 'Select', renderColContent: ({ model, field }) => renderOptionsItems(model, 'totalOptions', field, '.memory', true), @@ -289,7 +307,7 @@ export const useClusterSetting = () => { { field: 'jmOptions', label: 'JM Memory Options', - ifShow: ({ values }) => isSessionMode(values.executionMode), + ifShow: ({ values }) => isShowInSessionMode(values), component: 'Select', componentProps: { showSearch: true, @@ -304,7 +322,7 @@ export const useClusterSetting = () => { { field: 'jmOptionsItem', label: 'jmOptionsItem', - ifShow: ({ values }) => isSessionMode(values.executionMode), + ifShow: ({ values }) => isShowInSessionMode(values), component: 'Select', renderColContent: ({ model, field }) => renderOptionsItems(model, 'jmOptions', field, 'jobmanager.memory.'), @@ -312,7 +330,7 @@ export const useClusterSetting = () => { { field: 'tmOptions', label: 'TM Memory Options', - ifShow: ({ values }) => isSessionMode(values.executionMode), + ifShow: ({ values }) => isShowInSessionMode(values), component: 'Select', componentProps: { showSearch: true, @@ -327,7 +345,7 @@ export const useClusterSetting = () => { { field: 'tmOptionsItem', label: 'tmOptionsItem', - ifShow: ({ values }) => isSessionMode(values.executionMode), + ifShow: ({ values }) => isShowInSessionMode(values), component: 'Select', renderColContent: ({ model, field }) => renderOptionsItems(model, 'tmOptions', field, 'taskmanager.memory.'), @@ -335,7 +353,7 @@ export const useClusterSetting = () => { { field: 'dynamicProperties', label: 'Dynamic Properties', - ifShow: ({ values }) => isSessionMode(values.executionMode), + ifShow: ({ values }) => isShowInSessionMode(values), component: 'Input', render: (renderCallbackParams) => renderDynamicProperties(renderCallbackParams), }, @@ -366,14 +384,18 @@ export const useClusterSetting = () => { }); return params; case ExecModeEnum.YARN_SESSION: - Object.assign(params, { - options: JSON.stringify(options), - yarnQueue: values.yarnQueue || 'default', - dynamicProperties: values.dynamicProperties, - resolveOrder: values.resolveOrder, - address: values.address, - flameGraph: values.flameGraph, - }); + if (values.addType === ClusterAddTypeEnum.ADD_EXISTING) { + Object.assign(params, { + address: values.address, + }); + } else { + Object.assign(params, { + options: JSON.stringify(options), + yarnQueue: values.yarnQueue || 'default', + dynamicProperties: values.dynamicProperties, + resolveOrder: values.resolveOrder, + }); + } return params; case ExecModeEnum.KUBERNETES_SESSION: Object.assign(params, { @@ -386,7 +408,6 @@ export const useClusterSetting = () => { k8sConf: values.k8sConf, flinkImage: values.flinkImage || null, address: values.address, - flameGraph: values.flameGraph, }); return params; default: diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala index 278d46758..264653d49 100644 --- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala +++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala @@ -31,7 +31,6 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions case class DeployRequest(flinkVersion: FlinkVersion, clusterId: String, executionMode: ExecutionMode, - flameGraph: JavaMap[String, java.io.Serializable], properties: JavaMap[String, Any], @Nullable k8sDeployParam: KubernetesDeployParam) { diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala index 9e91914ff..9872d082a 100644 --- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala +++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala @@ -130,7 +130,6 @@ object KubernetesNativeSessionSubmit extends KubernetesNativeSubmitTrait with Lo | exposedType : ${deployRequest.k8sDeployParam.flinkRestExposedType} | serviceAccount : ${deployRequest.k8sDeployParam.serviceAccount} | flinkImage : ${deployRequest.k8sDeployParam.flinkImage} - | flameGraph : ${deployRequest.flameGraph != null} | properties : ${deployRequest.properties.mkString(" ")} |------------------------------------------------------------------------------------------- |""".stripMargin) @@ -158,7 +157,7 @@ object KubernetesNativeSessionSubmit extends KubernetesNativeSubmitTrait with Lo client = clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient } if (client.getWebInterfaceURL != null) { - DeployResponse(client.getWebInterfaceURL, client.getClusterId.toString) + DeployResponse(client.getWebInterfaceURL, client.getClusterId) } else { null } diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala index 21b216e77..128771131 100644 --- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala +++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala @@ -172,7 +172,6 @@ object YarnSessionSubmit extends YarnSubmitTrait { | flinkVersion : ${deployRequest.flinkVersion.version} | execMode : ${deployRequest.executionMode.name()} | clusterId : ${deployRequest.clusterId} - | flameGraph : ${deployRequest.flameGraph != null} | properties : ${deployRequest.properties.mkString(" ")} |------------------------------------------------------------------------------------------- |""".stripMargin)
