This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
new 933271e5b [Improve] flink job release state improvement
933271e5b is described below
commit 933271e5b756ac761d5db83a3120c05f841d214c
Author: benjobs <[email protected]>
AuthorDate: Sun Apr 14 23:38:26 2024 +0800
[Improve] flink job release state improvement
---
.../core/task/FlinkK8sChangeEventListener.java | 4 +++
.../src/enums/flinkEnum.ts | 2 +-
.../src/locales/lang/en/flink/app.ts | 1 +
.../src/locales/lang/en/setting/flinkCluster.ts | 2 +-
.../src/locales/lang/zh-CN/flink/app.ts | 39 +++++++++++----------
.../src/locales/lang/zh-CN/setting/flinkCluster.ts | 40 +++++++++++-----------
.../src/views/flink/app/EditFlink.vue | 2 +-
.../src/views/flink/app/EditStreamPark.vue | 2 +-
.../flink/app/components/AppView/AppDashboard.vue | 14 ++++----
.../src/views/flink/app/components/State.tsx | 8 ++---
.../src/views/flink/app/data/detail.data.ts | 2 +-
.../src/views/flink/app/data/index.ts | 6 ++--
.../src/views/flink/app/hooks/useApp.tsx | 6 +++-
.../flink/app/hooks/useCreateAndEditSchema.ts | 4 +--
.../src/views/flink/app/utils/index.ts | 4 +--
.../src/views/setting/FlinkCluster/index.vue | 26 ++++++++++----
.../setting/FlinkCluster/useClusterSetting.ts | 8 ++---
.../flink/kubernetes/FlinkK8sWatchController.scala | 5 +--
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 32 ++++++++++-------
19 files changed, 118 insertions(+), 89 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index a006aa085..ec1791074 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -24,6 +24,7 @@ import org.apache.streampark.console.core.enums.OptionState;
import org.apache.streampark.console.core.metrics.flink.CheckPoints;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.alert.AlertService;
+import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import org.apache.streampark.flink.kubernetes.enums.FlinkJobState;
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode;
import
org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent;
@@ -64,6 +65,9 @@ public class FlinkK8sChangeEventListener {
private static final Cache<Long, Byte> SAVEPOINT_CACHE =
Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
+ @Autowired private FlinkK8sWatcherWrapper flinkK8sWatcherWrapper;
+
+ @Autowired private FlinkK8sWatcher registerFlinkK8sWatcher;
/**
* Catch FlinkJobStatusChangeEvent then storage it persistently to db.
Actually update
diff --git
a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
index e1194e2b4..030bc0711 100644
--- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
+++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
@@ -13,7 +13,7 @@ export enum BuildStateEnum {
/* ExecutionMode */
export enum ExecModeEnum {
/** remote (standalone) */
- REMOTE = 1,
+ STANDALONE = 1,
/** yarn per-job (deprecated, please use yarn-application mode) */
YARN_PER_JOB = 2,
/** yarn session */
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
index c1dfd92e3..919d15b54 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
@@ -76,6 +76,7 @@ export default {
starting: 'STARTING',
restarting: 'RESTARTING',
running: 'RUNNING',
+ savepoint: 'SAVEPOINT',
failing: 'FAILING',
failed: 'FAILED',
lost: 'LOST',
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkCluster.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkCluster.ts
index 2e7c48e91..5b3557fcc 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkCluster.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkCluster.ts
@@ -50,7 +50,7 @@ export default {
clusterName: 'Please enter cluster name',
executionMode: 'Please enter cluster name',
versionId: 'please select Flink Version',
- addressRemoteMode: 'Please enter jobManager URL',
+ addressRemoteMode: 'Please type jobManager URL',
addressNoRemoteMode: 'Please enter cluster address, e.g:
http://host:port',
yarnSessionClusterId: 'Please enter Yarn Session cluster',
k8sConf: '~/.kube/config',
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index 151512868..da35e2893 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -20,14 +20,14 @@ export default {
searchName: '作业名称',
tags: '作业标签',
owner: '创建者',
- flinkVersion: 'Flink版本',
+ flinkVersion: 'Flink 版本',
duration: '运行时长',
modifiedTime: '修改时间',
runStatus: '运行状态',
releaseBuild: '发布状态',
jobType: '作业类型',
developmentMode: '作业模式',
- executionMode: '执行模式',
+ executionMode: '运行模式',
historyVersion: '历史版本',
dependency: '作业依赖',
appConf: '作业配置',
@@ -35,16 +35,16 @@ export default {
parallelism: '并行度',
restartSize: '(失败后)重启次数',
faultAlertTemplate: '告警模板',
- checkPointFailureOptions: 'Checkpoint告警策略',
+ checkPointFailureOptions: 'Checkpoint 告警策略',
totalMemoryOptions: '总内存',
- jmMemoryOptions: 'JM内存',
- tmMemoryOptions: 'TM内存',
+ jmMemoryOptions: 'JM 内存',
+ tmMemoryOptions: 'TM 内存',
podTemplate: 'Kubernetes Pod 模板',
flinkCluster: 'Flink 集群',
yarnQueue: 'Yarn 队列',
mavenPom: 'maven pom',
uploadJar: '上传依赖Jar文件',
- kubernetesNamespace: 'K8S命名空间',
+ kubernetesNamespace: 'K8S 命名空间',
kubernetesClusterId: 'K8S ClusterId',
kubernetesCluster: 'Kubernetes Session Cluster',
flinkBaseDockerImage: 'Flink基础docker镜像',
@@ -78,6 +78,7 @@ export default {
failed: '作业失败',
lost: '作业失联',
cancelling: '取消中',
+ savepoint: '快照中',
canceled: '已取消',
finished: '已完成',
suspended: '已暂停',
@@ -245,8 +246,8 @@ export default {
addAppTips: {
developmentModePlaceholder: '请选择开发模式',
developmentModeIsRequiredMessage: '开发模式必填',
- executionModePlaceholder: '请选择执行模式',
- executionModeIsRequiredMessage: '执行模式必填',
+ executionModePlaceholder: '请选择运行模式',
+ executionModeIsRequiredMessage: '运行模式必填',
hadoopEnvInitMessage: 'hadoop环境检查失败, 请检查配置',
resourceFromMessage: '资源来源必填',
mainClassPlaceholder: '请输入程序入口类',
@@ -257,16 +258,16 @@ export default {
appTypePlaceholder: '请选择作业类型',
appTypeIsRequiredMessage: '作业类型必填',
programJarIsRequiredMessage: '程序jar文件必填',
- useSysHadoopConf: '使用系统hadoop配置',
- flinkVersionIsRequiredMessage: 'Flink版本必填',
+ useSysHadoopConf: '使用系统 hadoop 配置',
+ flinkVersionIsRequiredMessage: 'Flink 版本必填',
appNamePlaceholder: '请输入作业名称',
appNameIsRequiredMessage: '作业名称必填',
appNameNotUniqueMessage: '作业名称必须唯一, 输入的作业名称已经存在',
- appNameExistsInYarnMessage: '该作业名称已经在YARN集群中存在,不能重复。请检查',
- appNameExistsInK8sMessage: '该作业名称已经在K8S集群中存在,不能重复。请检查',
+ appNameExistsInYarnMessage: '该作业名称已经在 YARN 集群中存在,不能重复。请检查',
+ appNameExistsInK8sMessage: '该作业名称已经在 K8S 集群中存在,不能重复。请检查',
appNameValid: '作业名称不合法',
appNameRole: '作业必须遵循以下规则:',
- K8sSessionClusterIdRole: 'K8S集群ID必要遵循以下规则:',
+ K8sSessionClusterIdRole: 'K8S 集群ID必要遵循以下规则:',
appNameK8sClusterIdRole:
'当前部署模式是 K8s Application模式,会将作业名称作为k8s的 clusterId,因此作业名称要遵循以下规则:',
appNameK8sClusterIdRoleLength: '不应超过 45 个字符',
@@ -282,12 +283,12 @@ export default {
alertTemplatePlaceholder: '告警模板',
totalMemoryOptionsPlaceholder: '请选择要设置的资源参数',
tmPlaceholder: '请选择要设置的资源参数',
- yarnQueuePlaceholder: '请输入yarn队列标签名称',
+ yarnQueuePlaceholder: '请输入 YARN 队列 标签名称',
descriptionPlaceholder: '请输入此作业的描述',
serviceAccountPlaceholder: '请输入K8S服务账号(service-account)',
- kubernetesNamespacePlaceholder: '请输入K8S命名空间, 如: default',
- kubernetesClusterIdPlaceholder: '请选择K8S ClusterId',
- kubernetesClusterIdIsRequiredMessage: 'K8S ClusterId必填',
+ kubernetesNamespacePlaceholder: '请输入 K8S 命名空间, 如: default',
+ kubernetesClusterIdPlaceholder: '请选择 K8S ClusterId',
+ kubernetesClusterIdIsRequiredMessage: 'K8S ClusterId 必填',
flinkImagePlaceholder:
'请输入Flink基础docker镜像的标签,如:flink:1.13.0-scala_2.11-java8',
flinkImageIsRequiredMessage: 'Flink基础docker镜像是必填的',
k8sRestExposedTypePlaceholder: 'K8S服务对外类型',
@@ -300,8 +301,8 @@ export default {
minute: '分钟',
count: '次数',
officialDoc: '官网文档',
- checkPointFailureOptions: 'CheckPoint失败策略',
- checkpointFailureRateInterval: 'checkpoint失败间隔',
+ checkPointFailureOptions: 'CheckPoint 失败策略',
+ checkpointFailureRateInterval: 'checkpoint 失败间隔',
maxFailuresPerInterval: '最大失败次数',
checkPointFailureNote:
'checkpoint 失败处理策略, 例如: 在 5 分钟内 (checkpoint的失败间隔), 如果 checkpoint 失败次数超过
10 次 (checkpoint最大失败次数),会触发操作(发送告警或者重启作业)',
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
index eaf9e396a..495302739 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
@@ -24,48 +24,48 @@ export default {
form: {
clusterName: '集群名称',
address: '集群URL',
- executionMode: '执行模式',
- versionId: 'Flink版本',
+ executionMode: '运行模式',
+ versionId: 'Flink 版本',
addType: '添加类型',
addExisting: '已有集群',
addNew: '全新集群',
- yarnQueue: 'Yarn队列',
- yarnSessionClusterId: 'Yarn Session模式集群ID',
- k8sNamespace: 'K8S命名空间',
- k8sClusterId: 'K8S集群ID',
- k8sSessionCluster: 'k8s Session集群',
- serviceAccount: 'K8S服务账号',
- k8sConf: 'K8S环境Kube配置文件',
- flinkImage: 'Flink基础docker镜像',
- k8sRestExposedType: 'K8S服务对外类型',
+ yarnQueue: 'Yarn 队列',
+ yarnSessionClusterId: 'Yarn Session 模式集群ID',
+ k8sNamespace: 'K8S 命名空间',
+ k8sClusterId: 'K8S 集群ID',
+ k8sSessionCluster: 'k8s Session 集群',
+ serviceAccount: 'K8S 服务账号',
+ k8sConf: 'K8S 环境Kube配置文件',
+ flinkImage: 'Flink 基础docker镜像',
+ k8sRestExposedType: 'K8S 服务对外类型',
resolveOrder: '类加载顺序',
taskSlots: '任务槽数',
- jmOptions: 'JM内存',
- tmOptions: 'TM内存',
+ jmOptions: 'JM 内存',
+ tmOptions: 'TM 内存',
dynamicProperties: '动态参数',
clusterDescription: '集群描述',
},
placeholder: {
addType: '请选择集群添加类型',
clusterName: '请输入集群名称',
- executionMode: '请选择执行模式',
- versionId: '请选择Flink版本',
- addressRemoteMode: '请输入Flink 集群JobManager URL访问地址',
+ executionMode: '请选择运行模式',
+ versionId: '请选择 Flink 版本',
+ addressRemoteMode: '请输入 Flink 集群 JobManager URL 访问地址',
addressNoRemoteMode: '请输入集群地址,例如:http://host:port',
yarnSessionClusterId: '请输入Yarn Session模式集群ID',
k8sConf: '示例:~/.kube/config',
flinkImage: '请输入Flink基础docker镜像的标签,如:flink:1.13.0-scala_2.11-java8',
k8sRestExposedType: 'kubernetes.rest-service.exposed.type',
resolveOrder: 'classloader.resolve-order',
- taskSlots: '每个TaskManager的插槽数',
+ taskSlots: '每个 TaskManager 的插槽数',
totalOptions: '总内存',
- jmOptions: '请选择要设置的jm资源参数',
- tmOptions: '请选择要设置的tm资源参数',
+ jmOptions: '请选择要设置的 JobManager 资源参数',
+ tmOptions: '请选择要设置的 TaskManager 资源参数',
clusterDescription: '集群描述信息, 如: 生产flink 1.16集群',
},
required: {
address: '必须填写集群地址',
- executionMode: '执行模式必填',
+ executionMode: '运行模式必填',
clusterId: 'Yarn Session Cluster 为必填项',
versionId: 'Flink 版本必选',
flinkImage: 'link基础docker镜像是必填的',
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
index 8f40c8f2a..e23707599 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
@@ -118,7 +118,7 @@
Object.assign(defaultParams, { flinkClusterId: app.flinkClusterId });
} else if (app.executionMode == ExecModeEnum.YARN_SESSION) {
Object.assign(defaultParams, { flinkClusterId: app.flinkClusterId });
- } else if (app.executionMode == ExecModeEnum.REMOTE) {
+ } else if (app.executionMode == ExecModeEnum.STANDALONE) {
}
if (!executionMode) {
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
index 0ff995bec..1b8ff0a06 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
@@ -127,7 +127,7 @@
};
switch (app.executionMode) {
- case ExecModeEnum.REMOTE:
+ case ExecModeEnum.STANDALONE:
defaultParams['remoteClusterId'] = app.flinkClusterId;
break;
case ExecModeEnum.YARN_SESSION:
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/AppDashboard.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/AppDashboard.vue
index 4f6559f37..395309a45 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/AppDashboard.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/AppDashboard.vue
@@ -32,6 +32,13 @@
const res = await fetchDashboard();
if (res) {
Object.assign(dashBigScreenMap, {
+ runningJob: {
+ staticstics: { title: t('flink.app.dashboard.runningJobs'), value:
res.runningJob },
+ footer: [
+ { title: t('flink.app.dashboard.totalTask'), value:
res.task.total },
+ { title: t('flink.app.dashboard.runningTask'), value:
res.task.running },
+ ],
+ },
availiableTask: {
staticstics: {
title: t('flink.app.dashboard.availableTaskSlots'),
@@ -42,13 +49,6 @@
{ title: t('flink.app.dashboard.taskManagers'), value:
res.totalTM },
],
},
- runningJob: {
- staticstics: { title: t('flink.app.dashboard.runningJobs'), value:
res.runningJob },
- footer: [
- { title: t('flink.app.dashboard.totalTask'), value:
res.task.total },
- { title: t('flink.app.dashboard.runningTask'), value:
res.task.running },
- ],
- },
jobManager: {
staticstics: { title: t('flink.app.dashboard.jobManagerMemory'),
value: res.jmMemory },
footer: [
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.tsx
index 564b32d01..8aae8b47e 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.tsx
@@ -78,22 +78,22 @@ export const stateMap = {
export const optionStateMap = {
[OptionStateEnum.RELEASING]: {
color: '#1ABBDC',
- title: 'RELEASING',
+ title: t('flink.app.releaseState.releasing'),
class: 'status-processing-deploying',
},
[OptionStateEnum.CANCELLING]: {
color: '#faad14',
- title: 'CANCELLING',
+ title: t('flink.app.runState.cancelling'),
class: 'status-processing-cancelling',
},
[OptionStateEnum.STARTING]: {
color: '#1AB58E',
- title: 'STARTING',
+ title: t('flink.app.runState.starting'),
class: 'status-processing-starting',
},
[OptionStateEnum.SAVEPOINTING]: {
color: '#faad14',
- title: 'SAVEPOINT',
+ title: t('flink.app.runState.savepoint'),
class: 'status-processing-cancelling',
},
};
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/data/detail.data.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/data/detail.data.ts
index fc5d884c7..7e71e2a69 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/data/detail.data.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/data/detail.data.ts
@@ -132,7 +132,7 @@ export const getBackupColumns = (): BasicColumn[] => [
export const getOptionLogColumns = (): BasicColumn[] => [
{ title: 'Operation Name', dataIndex: 'optionName', width: 150 },
- { title: 'Application Id', dataIndex: 'yarnAppId' },
+ { title: 'Cluster Id', dataIndex: 'yarnAppId' },
{ title: 'JobManager URL', dataIndex: 'jobManagerUrl' },
{ title: 'Start Status', dataIndex: 'success', width: 120 },
{ title: 'Option Time', dataIndex: 'optionTime', width: 200 },
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/data/index.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/data/index.ts
index fa1eed09c..007ece8d8 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/data/index.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/data/index.ts
@@ -61,12 +61,12 @@ export const k8sRestExposedType = [
];
export const executionModes = [
- { label: 'remote', value: ExecModeEnum.REMOTE, disabled: false },
+ { label: 'standalone', value: ExecModeEnum.STANDALONE, disabled: false },
{ label: 'yarn application', value: ExecModeEnum.YARN_APPLICATION, disabled:
false },
{ label: 'yarn session', value: ExecModeEnum.YARN_SESSION, disabled: false },
- { label: 'kubernetes session', value: ExecModeEnum.KUBERNETES_SESSION,
disabled: false },
+ { label: 'k8s session', value: ExecModeEnum.KUBERNETES_SESSION, disabled:
false },
{
- label: 'kubernetes application',
+ label: 'k8s application',
value: ExecModeEnum.KUBERNETES_APPLICATION,
disabled: false,
},
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
index c22f3a46a..e457c2b52 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
@@ -336,7 +336,11 @@ export const useFlinkApplication = (openStartModal: Fn) =>
{
name="clusterId"
rules={[{ required: true, message: 'YARN ApplicationId is
required' }]}
>
- <Input type="text" placeholder="ApplicationId"
v-model:value={formValue.clusterId} />
+ <Input
+ type="text"
+ placeholder="ApplicationId"
+ v-model:value={formValue.clusterId}
+ />
</Form.Item>
)}
<Form.Item
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index 09e58f493..a98aa9395 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -196,8 +196,8 @@ export const useCreateAndEditSchema = (
field: 'remoteClusterId',
label: t('flink.app.flinkCluster'),
component: 'Select',
- render: (param) =>
renderFlinkCluster(getExecutionCluster(ExecModeEnum.REMOTE), param),
- ifShow: ({ values }) => values.executionMode == ExecModeEnum.REMOTE,
+ render: (param) =>
renderFlinkCluster(getExecutionCluster(ExecModeEnum.STANDALONE), param),
+ ifShow: ({ values }) => values.executionMode ==
ExecModeEnum.STANDALONE,
rules: [
{ required: true, message:
t('flink.app.addAppTips.flinkClusterIsRequiredMessage') },
],
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 9bcc01284..626113cd6 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -105,7 +105,7 @@ export function descriptionFilter(option) {
export async function handleView(app: AppListRecord, yarn: Nullable<string>) {
const executionMode = app['executionMode'];
- if (executionMode == ExecModeEnum.REMOTE) {
+ if (executionMode == ExecModeEnum.STANDALONE) {
const res = await fetchRemoteURL(app.flinkClusterId);
window.open(res + '/#/job/' + app.jobId + '/overview');
} else if (
@@ -269,7 +269,7 @@ function getFlinkClusterId(values: Recordable) {
if (values.executionMode == ExecModeEnum.YARN_SESSION) {
return values.yarnSessionClusterId;
}
- if (values.executionMode == ExecModeEnum.REMOTE) {
+ if (values.executionMode == ExecModeEnum.STANDALONE) {
return values.remoteClusterId;
}
if (values.executionMode == ExecModeEnum.KUBERNETES_SESSION) {
diff --git
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/index.vue
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/index.vue
index 2e43765f5..4137459b1 100644
---
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/index.vue
+++
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/index.vue
@@ -25,7 +25,7 @@
<script lang="ts" setup name="FlinkClusterSetting">
import { onMounted, ref } from 'vue';
import { SvgIcon } from '/@/components/Icon';
- import { List, Popconfirm, Tooltip, Card } from 'ant-design-vue';
+ import { List, Popconfirm, Tooltip, Card, Tag } from 'ant-design-vue';
import { ClusterStateEnum, ExecModeEnum } from '/@/enums/flinkEnum';
import {
PauseCircleOutlined,
@@ -156,16 +156,28 @@
<div class="list-content" style="width: 20%">
<div class="list-content-item">
<span>{{ t('setting.flinkCluster.form.executionMode') }}</span>
- <p style="margin-top: 10px">
- {{ item.executionModeEnum.toLowerCase() }}
+ <p style="margin-top: 10px" v-if="item.executionMode ===
ExecModeEnum.STANDALONE">
+ <Tag color="#2db7f5">standalone</Tag>
+ </p>
+ <p
+ style="margin-top: 10px"
+ v-else-if="item.executionMode === ExecModeEnum.YARN_SESSION"
+ >
+ <Tag color="#87d068">yarn session</Tag>
+ </p>
+ <p
+ style="margin-top: 10px"
+ v-else-if="item.executionMode ===
ExecModeEnum.KUBERNETES_SESSION"
+ >
+ <Tag color="#108ee9">k8s session</Tag>
</p>
</div>
</div>
<div
class="list-content"
- style="width: 30%"
+ style="width: 35%"
v-if="
- item.executionMode === ExecModeEnum.REMOTE ||
+ item.executionMode === ExecModeEnum.STANDALONE ||
item.executionMode === ExecModeEnum.YARN_SESSION
"
>
@@ -194,7 +206,7 @@
<template v-if="handleIsStart(item)">
<Tooltip :title="t('setting.flinkCluster.stop')">
<a-button
- :disabled="item.executionMode === ExecModeEnum.REMOTE"
+ :disabled="item.executionMode === ExecModeEnum.STANDALONE"
v-auth="'cluster:create'"
@click="handleShutdownCluster(item)"
shape="circle"
@@ -209,7 +221,7 @@
<template v-else>
<Tooltip :title="t('setting.flinkCluster.start')">
<a-button
- :disabled="item.executionMode === ExecModeEnum.REMOTE"
+ :disabled="item.executionMode === ExecModeEnum.STANDALONE"
v-auth="'cluster:create'"
@click="handleDeployCluster(item)"
shape="circle"
diff --git
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/useClusterSetting.ts
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/useClusterSetting.ts
index 14451b34e..7dd966d1d 100644
---
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/useClusterSetting.ts
+++
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/useClusterSetting.ts
@@ -118,8 +118,8 @@ export const useClusterSetting = () => {
placeholder: t('setting.flinkCluster.placeholder.executionMode'),
options: [
{
- label: 'remote',
- value: ExecModeEnum.REMOTE,
+ label: 'standalone',
+ value: ExecModeEnum.STANDALONE,
},
{ label: 'yarn session', value: ExecModeEnum.YARN_SESSION },
{ label: 'kubernetes session', value:
ExecModeEnum.KUBERNETES_SESSION },
@@ -147,7 +147,7 @@ export const useClusterSetting = () => {
componentProps: {
placeholder: t('setting.flinkCluster.placeholder.addressRemoteMode'),
},
- ifShow: ({ values }) => values.executionMode == ExecModeEnum.REMOTE,
+ ifShow: ({ values }) => values.executionMode ==
ExecModeEnum.STANDALONE,
rules: [{ required: true, message:
t('setting.flinkCluster.required.address') }],
},
{
@@ -333,7 +333,7 @@ export const useClusterSetting = () => {
};
switch (values.executionMode) {
- case ExecModeEnum.REMOTE:
+ case ExecModeEnum.STANDALONE:
Object.assign(params, {
address: values.address,
});
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
index 584b1df6f..bc9d48208 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
@@ -81,10 +81,11 @@ class FlinkK8sWatchController extends Logger with
AutoCloseable {
def collectAccGroupMetric(groupId: String): FlinkMetricCV = {
// get cluster metrics that in tracking
val empty = FlinkMetricCV.empty(groupId)
- getActiveWatchingIds() match {
+ getActiveWatchingIds().filter(_.groupId == groupId) match {
case k if k.isEmpty => empty
case k =>
- flinkMetrics.getAll(for (elem <- k) yield ClusterKey.of(elem)) match {
+ flinkMetrics.getAll(
+ for (elem <- k if elem.groupId == groupId) yield
ClusterKey.of(elem)) match {
case m if m.isEmpty => empty
case m => m.values.fold(empty)((x, y) => x + y)
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 9c5590f0a..9243e10f7 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.{ChangeEventBus,
FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever}
import org.apache.streampark.flink.kubernetes.enums.{FlinkJobState,
FlinkK8sExecuteMode}
-import
org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.APPLICATION
+import
org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION,
SESSION}
import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
import org.apache.streampark.flink.kubernetes.model._
@@ -126,7 +126,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
case _ =>
touchSessionJob(trackId) match {
case Some(state) =>
- if (state.jobState == FlinkJobState.LOST) {
+ if (FlinkJobState.isEndState(state.jobState)) {
// can't find that job in the k8s cluster.
watchController.unWatching(trackId)
}
@@ -222,16 +222,19 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
}
- lazy val deployExists = KubernetesRetriever.isDeploymentExists(
- trackId.namespace,
- trackId.clusterId
- )
-
- if (FlinkJobState.isEndState(jobState.jobState) && !deployExists) {
- // remove trackId from cache of job that needs to be untracked
- watchController.unWatching(trackId)
- if (trackId.executeMode == APPLICATION) {
- watchController.endpoints.invalidate(trackId.toClusterKey)
+ if (FlinkJobState.isEndState(jobState.jobState)) {
+ trackId.executeMode match {
+ case APPLICATION =>
+ val deployExists = KubernetesRetriever.isDeploymentExists(
+ trackId.namespace,
+ trackId.clusterId
+ )
+ if (!deployExists) {
+ watchController.endpoints.invalidate(trackId.toClusterKey)
+ watchController.unWatching(trackId)
+ }
+ case SESSION =>
+ watchController.unWatching(trackId)
}
}
}
@@ -307,6 +310,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
val jobState = trackId match {
case id if watchController.canceling.has(id) =>
logger.info(s"trackId ${trackId.toString} is canceling")
+ watchController.trackIds.invalidate(id)
FlinkJobState.CANCELED
case _ =>
// whether deployment exists on kubernetes cluster
@@ -386,7 +390,9 @@ object FlinkJobStatusWatcher {
current match {
case FlinkJobState.POS_TERMINATED | FlinkJobState.TERMINATED =>
previous match {
- case FlinkJobState.CANCELLING => FlinkJobState.CANCELED
+ case FlinkJobState.CANCELLING => {
+ FlinkJobState.CANCELED
+ }
case FlinkJobState.FAILING => FlinkJobState.FAILED
case _ =>
current match {