This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
     new 933271e5b [Improve] flink job release state improvement
933271e5b is described below

commit 933271e5b756ac761d5db83a3120c05f841d214c
Author: benjobs <[email protected]>
AuthorDate: Sun Apr 14 23:38:26 2024 +0800

    [Improve] flink job release state improvement
---
 .../core/task/FlinkK8sChangeEventListener.java     |  4 +++
 .../src/enums/flinkEnum.ts                         |  2 +-
 .../src/locales/lang/en/flink/app.ts               |  1 +
 .../src/locales/lang/en/setting/flinkCluster.ts    |  2 +-
 .../src/locales/lang/zh-CN/flink/app.ts            | 39 +++++++++++----------
 .../src/locales/lang/zh-CN/setting/flinkCluster.ts | 40 +++++++++++-----------
 .../src/views/flink/app/EditFlink.vue              |  2 +-
 .../src/views/flink/app/EditStreamPark.vue         |  2 +-
 .../flink/app/components/AppView/AppDashboard.vue  | 14 ++++----
 .../src/views/flink/app/components/State.tsx       |  8 ++---
 .../src/views/flink/app/data/detail.data.ts        |  2 +-
 .../src/views/flink/app/data/index.ts              |  6 ++--
 .../src/views/flink/app/hooks/useApp.tsx           |  6 +++-
 .../flink/app/hooks/useCreateAndEditSchema.ts      |  4 +--
 .../src/views/flink/app/utils/index.ts             |  4 +--
 .../src/views/setting/FlinkCluster/index.vue       | 26 ++++++++++----
 .../setting/FlinkCluster/useClusterSetting.ts      |  8 ++---
 .../flink/kubernetes/FlinkK8sWatchController.scala |  5 +--
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 32 ++++++++++-------
 19 files changed, 118 insertions(+), 89 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index a006aa085..ec1791074 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -24,6 +24,7 @@ import org.apache.streampark.console.core.enums.OptionState;
 import org.apache.streampark.console.core.metrics.flink.CheckPoints;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.alert.AlertService;
+import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
 import org.apache.streampark.flink.kubernetes.enums.FlinkJobState;
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode;
 import 
org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent;
@@ -64,6 +65,9 @@ public class FlinkK8sChangeEventListener {
 
   private static final Cache<Long, Byte> SAVEPOINT_CACHE =
       Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
+  @Autowired private FlinkK8sWatcherWrapper flinkK8sWatcherWrapper;
+
+  @Autowired private FlinkK8sWatcher registerFlinkK8sWatcher;
 
   /**
    * Catch FlinkJobStatusChangeEvent then storage it persistently to db. 
Actually update
diff --git 
a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts 
b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
index e1194e2b4..030bc0711 100644
--- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
+++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
@@ -13,7 +13,7 @@ export enum BuildStateEnum {
 /* ExecutionMode  */
 export enum ExecModeEnum {
   /** remote (standalone) */
-  REMOTE = 1,
+  STANDALONE = 1,
   /** yarn per-job (deprecated, please use yarn-application mode) */
   YARN_PER_JOB = 2,
   /** yarn session */
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
index c1dfd92e3..919d15b54 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
@@ -76,6 +76,7 @@ export default {
     starting: 'STARTING',
     restarting: 'RESTARTING',
     running: 'RUNNING',
+    savepoint: 'SAVEPOINT',
     failing: 'FAILING',
     failed: 'FAILED',
     lost: 'LOST',
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkCluster.ts
 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkCluster.ts
index 2e7c48e91..5b3557fcc 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkCluster.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkCluster.ts
@@ -50,7 +50,7 @@ export default {
     clusterName: 'Please enter cluster name',
     executionMode: 'Please enter cluster name',
     versionId: 'please select Flink Version',
-    addressRemoteMode: 'Please enter jobManager URL',
+    addressRemoteMode: 'Please type jobManager URL',
     addressNoRemoteMode: 'Please enter cluster address,  e.g: 
http://host:port',
     yarnSessionClusterId: 'Please enter Yarn Session cluster',
     k8sConf: '~/.kube/config',
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index 151512868..da35e2893 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -20,14 +20,14 @@ export default {
   searchName: '作业名称',
   tags: '作业标签',
   owner: '创建者',
-  flinkVersion: 'Flink版本',
+  flinkVersion: 'Flink 版本',
   duration: '运行时长',
   modifiedTime: '修改时间',
   runStatus: '运行状态',
   releaseBuild: '发布状态',
   jobType: '作业类型',
   developmentMode: '作业模式',
-  executionMode: '执行模式',
+  executionMode: '运行模式',
   historyVersion: '历史版本',
   dependency: '作业依赖',
   appConf: '作业配置',
@@ -35,16 +35,16 @@ export default {
   parallelism: '并行度',
   restartSize: '(失败后)重启次数',
   faultAlertTemplate: '告警模板',
-  checkPointFailureOptions: 'Checkpoint告警策略',
+  checkPointFailureOptions: 'Checkpoint 告警策略',
   totalMemoryOptions: '总内存',
-  jmMemoryOptions: 'JM内存',
-  tmMemoryOptions: 'TM内存',
+  jmMemoryOptions: 'JM 内存',
+  tmMemoryOptions: 'TM 内存',
   podTemplate: 'Kubernetes Pod 模板',
   flinkCluster: 'Flink 集群',
   yarnQueue: 'Yarn 队列',
   mavenPom: 'maven pom',
   uploadJar: '上传依赖Jar文件',
-  kubernetesNamespace: 'K8S命名空间',
+  kubernetesNamespace: 'K8S 命名空间',
   kubernetesClusterId: 'K8S ClusterId',
   kubernetesCluster: 'Kubernetes Session Cluster',
   flinkBaseDockerImage: 'Flink基础docker镜像',
@@ -78,6 +78,7 @@ export default {
     failed: '作业失败',
     lost: '作业失联',
     cancelling: '取消中',
+    savepoint: '快照中',
     canceled: '已取消',
     finished: '已完成',
     suspended: '已暂停',
@@ -245,8 +246,8 @@ export default {
   addAppTips: {
     developmentModePlaceholder: '请选择开发模式',
     developmentModeIsRequiredMessage: '开发模式必填',
-    executionModePlaceholder: '请选择执行模式',
-    executionModeIsRequiredMessage: '执行模式必填',
+    executionModePlaceholder: '请选择运行模式',
+    executionModeIsRequiredMessage: '运行模式必填',
     hadoopEnvInitMessage: 'hadoop环境检查失败, 请检查配置',
     resourceFromMessage: '资源来源必填',
     mainClassPlaceholder: '请输入程序入口类',
@@ -257,16 +258,16 @@ export default {
     appTypePlaceholder: '请选择作业类型',
     appTypeIsRequiredMessage: '作业类型必填',
     programJarIsRequiredMessage: '程序jar文件必填',
-    useSysHadoopConf: '使用系统hadoop配置',
-    flinkVersionIsRequiredMessage: 'Flink版本必填',
+    useSysHadoopConf: '使用系统 hadoop 配置',
+    flinkVersionIsRequiredMessage: 'Flink 版本必填',
     appNamePlaceholder: '请输入作业名称',
     appNameIsRequiredMessage: '作业名称必填',
     appNameNotUniqueMessage: '作业名称必须唯一, 输入的作业名称已经存在',
-    appNameExistsInYarnMessage: '该作业名称已经在YARN集群中存在,不能重复。请检查',
-    appNameExistsInK8sMessage: '该作业名称已经在K8S集群中存在,不能重复。请检查',
+    appNameExistsInYarnMessage: '该作业名称已经在 YARN 集群中存在,不能重复。请检查',
+    appNameExistsInK8sMessage: '该作业名称已经在 K8S 集群中存在,不能重复。请检查',
     appNameValid: '作业名称不合法',
     appNameRole: '作业必须遵循以下规则:',
-    K8sSessionClusterIdRole: 'K8S集群ID必要遵循以下规则:',
+    K8sSessionClusterIdRole: 'K8S 集群ID必要遵循以下规则:',
     appNameK8sClusterIdRole:
       '当前部署模式是 K8s Application模式,会将作业名称作为k8s的 clusterId,因此作业名称要遵循以下规则:',
     appNameK8sClusterIdRoleLength: '不应超过 45 个字符',
@@ -282,12 +283,12 @@ export default {
     alertTemplatePlaceholder: '告警模板',
     totalMemoryOptionsPlaceholder: '请选择要设置的资源参数',
     tmPlaceholder: '请选择要设置的资源参数',
-    yarnQueuePlaceholder: '请输入yarn队列标签名称',
+    yarnQueuePlaceholder: '请输入 YARN 队列 标签名称',
     descriptionPlaceholder: '请输入此作业的描述',
     serviceAccountPlaceholder: '请输入K8S服务账号(service-account)',
-    kubernetesNamespacePlaceholder: '请输入K8S命名空间, 如: default',
-    kubernetesClusterIdPlaceholder: '请选择K8S ClusterId',
-    kubernetesClusterIdIsRequiredMessage: 'K8S ClusterId必填',
+    kubernetesNamespacePlaceholder: '请输入 K8S 命名空间, 如: default',
+    kubernetesClusterIdPlaceholder: '请选择 K8S ClusterId',
+    kubernetesClusterIdIsRequiredMessage: 'K8S ClusterId 必填',
     flinkImagePlaceholder: 
'请输入Flink基础docker镜像的标签,如:flink:1.13.0-scala_2.11-java8',
     flinkImageIsRequiredMessage: 'Flink基础docker镜像是必填的',
     k8sRestExposedTypePlaceholder: 'K8S服务对外类型',
@@ -300,8 +301,8 @@ export default {
     minute: '分钟',
     count: '次数',
     officialDoc: '官网文档',
-    checkPointFailureOptions: 'CheckPoint失败策略',
-    checkpointFailureRateInterval: 'checkpoint失败间隔',
+    checkPointFailureOptions: 'CheckPoint 失败策略',
+    checkpointFailureRateInterval: 'checkpoint 失败间隔',
     maxFailuresPerInterval: '最大失败次数',
     checkPointFailureNote:
       'checkpoint 失败处理策略, 例如: 在 5 分钟内 (checkpoint的失败间隔), 如果 checkpoint 失败次数超过 
10 次 (checkpoint最大失败次数),会触发操作(发送告警或者重启作业)',
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
index eaf9e396a..495302739 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
@@ -24,48 +24,48 @@ export default {
   form: {
     clusterName: '集群名称',
     address: '集群URL',
-    executionMode: '执行模式',
-    versionId: 'Flink版本',
+    executionMode: '运行模式',
+    versionId: 'Flink 版本',
     addType: '添加类型',
     addExisting: '已有集群',
     addNew: '全新集群',
-    yarnQueue: 'Yarn队列',
-    yarnSessionClusterId: 'Yarn Session模式集群ID',
-    k8sNamespace: 'K8S命名空间',
-    k8sClusterId: 'K8S集群ID',
-    k8sSessionCluster: 'k8s Session集群',
-    serviceAccount: 'K8S服务账号',
-    k8sConf: 'K8S环境Kube配置文件',
-    flinkImage: 'Flink基础docker镜像',
-    k8sRestExposedType: 'K8S服务对外类型',
+    yarnQueue: 'Yarn 队列',
+    yarnSessionClusterId: 'Yarn Session 模式集群ID',
+    k8sNamespace: 'K8S 命名空间',
+    k8sClusterId: 'K8S 集群ID',
+    k8sSessionCluster: 'k8s Session 集群',
+    serviceAccount: 'K8S 服务账号',
+    k8sConf: 'K8S 环境Kube配置文件',
+    flinkImage: 'Flink 基础docker镜像',
+    k8sRestExposedType: 'K8S 服务对外类型',
     resolveOrder: '类加载顺序',
     taskSlots: '任务槽数',
-    jmOptions: 'JM内存',
-    tmOptions: 'TM内存',
+    jmOptions: 'JM 内存',
+    tmOptions: 'TM 内存',
     dynamicProperties: '动态参数',
     clusterDescription: '集群描述',
   },
   placeholder: {
     addType: '请选择集群添加类型',
     clusterName: '请输入集群名称',
-    executionMode: '请选择执行模式',
-    versionId: '请选择Flink版本',
-    addressRemoteMode: '请输入Flink 集群JobManager URL访问地址',
+    executionMode: '请选择运行模式',
+    versionId: '请选择 Flink 版本',
+    addressRemoteMode: '请输入 Flink 集群 JobManager URL 访问地址',
     addressNoRemoteMode: '请输入集群地址,例如:http://host:port',
     yarnSessionClusterId: '请输入Yarn Session模式集群ID',
     k8sConf: '示例:~/.kube/config',
     flinkImage: '请输入Flink基础docker镜像的标签,如:flink:1.13.0-scala_2.11-java8',
     k8sRestExposedType: 'kubernetes.rest-service.exposed.type',
     resolveOrder: 'classloader.resolve-order',
-    taskSlots: '每个TaskManager的插槽数',
+    taskSlots: '每个 TaskManager 的插槽数',
     totalOptions: '总内存',
-    jmOptions: '请选择要设置的jm资源参数',
-    tmOptions: '请选择要设置的tm资源参数',
+    jmOptions: '请选择要设置的 JobManager 资源参数',
+    tmOptions: '请选择要设置的 TaskManager 资源参数',
     clusterDescription: '集群描述信息, 如: 生产flink 1.16集群',
   },
   required: {
     address: '必须填写集群地址',
-    executionMode: '执行模式必填',
+    executionMode: '运行模式必填',
     clusterId: 'Yarn Session Cluster 为必填项',
     versionId: 'Flink 版本必选',
     flinkImage: 'link基础docker镜像是必填的',
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
index 8f40c8f2a..e23707599 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
@@ -118,7 +118,7 @@
         Object.assign(defaultParams, { flinkClusterId: app.flinkClusterId });
       } else if (app.executionMode == ExecModeEnum.YARN_SESSION) {
         Object.assign(defaultParams, { flinkClusterId: app.flinkClusterId });
-      } else if (app.executionMode == ExecModeEnum.REMOTE) {
+      } else if (app.executionMode == ExecModeEnum.STANDALONE) {
       }
 
       if (!executionMode) {
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
index 0ff995bec..1b8ff0a06 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
@@ -127,7 +127,7 @@
       };
 
       switch (app.executionMode) {
-        case ExecModeEnum.REMOTE:
+        case ExecModeEnum.STANDALONE:
           defaultParams['remoteClusterId'] = app.flinkClusterId;
           break;
         case ExecModeEnum.YARN_SESSION:
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/AppDashboard.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/AppDashboard.vue
index 4f6559f37..395309a45 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/AppDashboard.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/AppDashboard.vue
@@ -32,6 +32,13 @@
       const res = await fetchDashboard();
       if (res) {
         Object.assign(dashBigScreenMap, {
+          runningJob: {
+            staticstics: { title: t('flink.app.dashboard.runningJobs'), value: 
res.runningJob },
+            footer: [
+              { title: t('flink.app.dashboard.totalTask'), value: 
res.task.total },
+              { title: t('flink.app.dashboard.runningTask'), value: 
res.task.running },
+            ],
+          },
           availiableTask: {
             staticstics: {
               title: t('flink.app.dashboard.availableTaskSlots'),
@@ -42,13 +49,6 @@
               { title: t('flink.app.dashboard.taskManagers'), value: 
res.totalTM },
             ],
           },
-          runningJob: {
-            staticstics: { title: t('flink.app.dashboard.runningJobs'), value: 
res.runningJob },
-            footer: [
-              { title: t('flink.app.dashboard.totalTask'), value: 
res.task.total },
-              { title: t('flink.app.dashboard.runningTask'), value: 
res.task.running },
-            ],
-          },
           jobManager: {
             staticstics: { title: t('flink.app.dashboard.jobManagerMemory'), 
value: res.jmMemory },
             footer: [
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.tsx
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.tsx
index 564b32d01..8aae8b47e 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.tsx
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/State.tsx
@@ -78,22 +78,22 @@ export const stateMap = {
 export const optionStateMap = {
   [OptionStateEnum.RELEASING]: {
     color: '#1ABBDC',
-    title: 'RELEASING',
+    title: t('flink.app.releaseState.releasing'),
     class: 'status-processing-deploying',
   },
   [OptionStateEnum.CANCELLING]: {
     color: '#faad14',
-    title: 'CANCELLING',
+    title: t('flink.app.runState.cancelling'),
     class: 'status-processing-cancelling',
   },
   [OptionStateEnum.STARTING]: {
     color: '#1AB58E',
-    title: 'STARTING',
+    title: t('flink.app.runState.starting'),
     class: 'status-processing-starting',
   },
   [OptionStateEnum.SAVEPOINTING]: {
     color: '#faad14',
-    title: 'SAVEPOINT',
+    title: t('flink.app.runState.savepoint'),
     class: 'status-processing-cancelling',
   },
 };
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/data/detail.data.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/data/detail.data.ts
index fc5d884c7..7e71e2a69 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/data/detail.data.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/data/detail.data.ts
@@ -132,7 +132,7 @@ export const getBackupColumns = (): BasicColumn[] => [
 
 export const getOptionLogColumns = (): BasicColumn[] => [
   { title: 'Operation Name', dataIndex: 'optionName', width: 150 },
-  { title: 'Application Id', dataIndex: 'yarnAppId' },
+  { title: 'Cluster Id', dataIndex: 'yarnAppId' },
   { title: 'JobManager URL', dataIndex: 'jobManagerUrl' },
   { title: 'Start Status', dataIndex: 'success', width: 120 },
   { title: 'Option Time', dataIndex: 'optionTime', width: 200 },
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/data/index.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/data/index.ts
index fa1eed09c..007ece8d8 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/data/index.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/data/index.ts
@@ -61,12 +61,12 @@ export const k8sRestExposedType = [
 ];
 
 export const executionModes = [
-  { label: 'remote', value: ExecModeEnum.REMOTE, disabled: false },
+  { label: 'standalone', value: ExecModeEnum.STANDALONE, disabled: false },
   { label: 'yarn application', value: ExecModeEnum.YARN_APPLICATION, disabled: 
false },
   { label: 'yarn session', value: ExecModeEnum.YARN_SESSION, disabled: false },
-  { label: 'kubernetes session', value: ExecModeEnum.KUBERNETES_SESSION, 
disabled: false },
+  { label: 'k8s session', value: ExecModeEnum.KUBERNETES_SESSION, disabled: 
false },
   {
-    label: 'kubernetes application',
+    label: 'k8s application',
     value: ExecModeEnum.KUBERNETES_APPLICATION,
     disabled: false,
   },
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
index c22f3a46a..e457c2b52 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
@@ -336,7 +336,11 @@ export const useFlinkApplication = (openStartModal: Fn) => 
{
                 name="clusterId"
                 rules={[{ required: true, message: 'YARN ApplicationId is 
required' }]}
               >
-                <Input type="text" placeholder="ApplicationId" 
v-model:value={formValue.clusterId} />
+                <Input
+                  type="text"
+                  placeholder="ApplicationId"
+                  v-model:value={formValue.clusterId}
+                />
               </Form.Item>
             )}
             <Form.Item
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index 09e58f493..a98aa9395 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -196,8 +196,8 @@ export const useCreateAndEditSchema = (
         field: 'remoteClusterId',
         label: t('flink.app.flinkCluster'),
         component: 'Select',
-        render: (param) => 
renderFlinkCluster(getExecutionCluster(ExecModeEnum.REMOTE), param),
-        ifShow: ({ values }) => values.executionMode == ExecModeEnum.REMOTE,
+        render: (param) => 
renderFlinkCluster(getExecutionCluster(ExecModeEnum.STANDALONE), param),
+        ifShow: ({ values }) => values.executionMode == 
ExecModeEnum.STANDALONE,
         rules: [
           { required: true, message: 
t('flink.app.addAppTips.flinkClusterIsRequiredMessage') },
         ],
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 9bcc01284..626113cd6 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -105,7 +105,7 @@ export function descriptionFilter(option) {
 
 export async function handleView(app: AppListRecord, yarn: Nullable<string>) {
   const executionMode = app['executionMode'];
-  if (executionMode == ExecModeEnum.REMOTE) {
+  if (executionMode == ExecModeEnum.STANDALONE) {
     const res = await fetchRemoteURL(app.flinkClusterId);
     window.open(res + '/#/job/' + app.jobId + '/overview');
   } else if (
@@ -269,7 +269,7 @@ function getFlinkClusterId(values: Recordable) {
   if (values.executionMode == ExecModeEnum.YARN_SESSION) {
     return values.yarnSessionClusterId;
   }
-  if (values.executionMode == ExecModeEnum.REMOTE) {
+  if (values.executionMode == ExecModeEnum.STANDALONE) {
     return values.remoteClusterId;
   }
   if (values.executionMode == ExecModeEnum.KUBERNETES_SESSION) {
diff --git 
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/index.vue
 
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/index.vue
index 2e43765f5..4137459b1 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/index.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/index.vue
@@ -25,7 +25,7 @@
 <script lang="ts" setup name="FlinkClusterSetting">
   import { onMounted, ref } from 'vue';
   import { SvgIcon } from '/@/components/Icon';
-  import { List, Popconfirm, Tooltip, Card } from 'ant-design-vue';
+  import { List, Popconfirm, Tooltip, Card, Tag } from 'ant-design-vue';
   import { ClusterStateEnum, ExecModeEnum } from '/@/enums/flinkEnum';
   import {
     PauseCircleOutlined,
@@ -156,16 +156,28 @@
           <div class="list-content" style="width: 20%">
             <div class="list-content-item">
               <span>{{ t('setting.flinkCluster.form.executionMode') }}</span>
-              <p style="margin-top: 10px">
-                {{ item.executionModeEnum.toLowerCase() }}
+              <p style="margin-top: 10px" v-if="item.executionMode === 
ExecModeEnum.STANDALONE">
+                <Tag color="#2db7f5">standalone</Tag>
+              </p>
+              <p
+                style="margin-top: 10px"
+                v-else-if="item.executionMode === ExecModeEnum.YARN_SESSION"
+              >
+                <Tag color="#87d068">yarn session</Tag>
+              </p>
+              <p
+                style="margin-top: 10px"
+                v-else-if="item.executionMode === 
ExecModeEnum.KUBERNETES_SESSION"
+              >
+                <Tag color="#108ee9">k8s session</Tag>
               </p>
             </div>
           </div>
           <div
             class="list-content"
-            style="width: 30%"
+            style="width: 35%"
             v-if="
-              item.executionMode === ExecModeEnum.REMOTE ||
+              item.executionMode === ExecModeEnum.STANDALONE ||
               item.executionMode === ExecModeEnum.YARN_SESSION
             "
           >
@@ -194,7 +206,7 @@
             <template v-if="handleIsStart(item)">
               <Tooltip :title="t('setting.flinkCluster.stop')">
                 <a-button
-                  :disabled="item.executionMode === ExecModeEnum.REMOTE"
+                  :disabled="item.executionMode === ExecModeEnum.STANDALONE"
                   v-auth="'cluster:create'"
                   @click="handleShutdownCluster(item)"
                   shape="circle"
@@ -209,7 +221,7 @@
             <template v-else>
               <Tooltip :title="t('setting.flinkCluster.start')">
                 <a-button
-                  :disabled="item.executionMode === ExecModeEnum.REMOTE"
+                  :disabled="item.executionMode === ExecModeEnum.STANDALONE"
                   v-auth="'cluster:create'"
                   @click="handleDeployCluster(item)"
                   shape="circle"
diff --git 
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/useClusterSetting.ts
 
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/useClusterSetting.ts
index 14451b34e..7dd966d1d 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/useClusterSetting.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/useClusterSetting.ts
@@ -118,8 +118,8 @@ export const useClusterSetting = () => {
           placeholder: t('setting.flinkCluster.placeholder.executionMode'),
           options: [
             {
-              label: 'remote',
-              value: ExecModeEnum.REMOTE,
+              label: 'standalone',
+              value: ExecModeEnum.STANDALONE,
             },
             { label: 'yarn session', value: ExecModeEnum.YARN_SESSION },
             { label: 'kubernetes session', value: 
ExecModeEnum.KUBERNETES_SESSION },
@@ -147,7 +147,7 @@ export const useClusterSetting = () => {
         componentProps: {
           placeholder: t('setting.flinkCluster.placeholder.addressRemoteMode'),
         },
-        ifShow: ({ values }) => values.executionMode == ExecModeEnum.REMOTE,
+        ifShow: ({ values }) => values.executionMode == 
ExecModeEnum.STANDALONE,
         rules: [{ required: true, message: 
t('setting.flinkCluster.required.address') }],
       },
       {
@@ -333,7 +333,7 @@ export const useClusterSetting = () => {
     };
 
     switch (values.executionMode) {
-      case ExecModeEnum.REMOTE:
+      case ExecModeEnum.STANDALONE:
         Object.assign(params, {
           address: values.address,
         });
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
index 584b1df6f..bc9d48208 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
@@ -81,10 +81,11 @@ class FlinkK8sWatchController extends Logger with 
AutoCloseable {
   def collectAccGroupMetric(groupId: String): FlinkMetricCV = {
     // get cluster metrics that in tracking
     val empty = FlinkMetricCV.empty(groupId)
-    getActiveWatchingIds() match {
+    getActiveWatchingIds().filter(_.groupId == groupId) match {
       case k if k.isEmpty => empty
       case k =>
-        flinkMetrics.getAll(for (elem <- k) yield ClusterKey.of(elem)) match {
+        flinkMetrics.getAll(
+          for (elem <- k if elem.groupId == groupId) yield 
ClusterKey.of(elem)) match {
           case m if m.isEmpty => empty
           case m => m.values.fold(empty)((x, y) => x + y)
         }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 9c5590f0a..9243e10f7 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.kubernetes.{ChangeEventBus, 
FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever}
 import org.apache.streampark.flink.kubernetes.enums.{FlinkJobState, 
FlinkK8sExecuteMode}
-import 
org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.APPLICATION
+import 
org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, 
SESSION}
 import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
 import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
 import org.apache.streampark.flink.kubernetes.model._
@@ -126,7 +126,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
                 case _ =>
                   touchSessionJob(trackId) match {
                     case Some(state) =>
-                      if (state.jobState == FlinkJobState.LOST) {
+                      if (FlinkJobState.isEndState(state.jobState)) {
                         // can't find that job in the k8s cluster.
                         watchController.unWatching(trackId)
                       }
@@ -222,16 +222,19 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
       eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
     }
 
-    lazy val deployExists = KubernetesRetriever.isDeploymentExists(
-      trackId.namespace,
-      trackId.clusterId
-    )
-
-    if (FlinkJobState.isEndState(jobState.jobState) && !deployExists) {
-      // remove trackId from cache of job that needs to be untracked
-      watchController.unWatching(trackId)
-      if (trackId.executeMode == APPLICATION) {
-        watchController.endpoints.invalidate(trackId.toClusterKey)
+    if (FlinkJobState.isEndState(jobState.jobState)) {
+      trackId.executeMode match {
+        case APPLICATION =>
+          val deployExists = KubernetesRetriever.isDeploymentExists(
+            trackId.namespace,
+            trackId.clusterId
+          )
+          if (!deployExists) {
+            watchController.endpoints.invalidate(trackId.toClusterKey)
+            watchController.unWatching(trackId)
+          }
+        case SESSION =>
+          watchController.unWatching(trackId)
       }
     }
   }
@@ -307,6 +310,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
     val jobState = trackId match {
       case id if watchController.canceling.has(id) =>
         logger.info(s"trackId ${trackId.toString} is canceling")
+        watchController.trackIds.invalidate(id)
         FlinkJobState.CANCELED
       case _ =>
         // whether deployment exists on kubernetes cluster
@@ -386,7 +390,9 @@ object FlinkJobStatusWatcher {
     current match {
       case FlinkJobState.POS_TERMINATED | FlinkJobState.TERMINATED =>
         previous match {
-          case FlinkJobState.CANCELLING => FlinkJobState.CANCELED
+          case FlinkJobState.CANCELLING => {
+            FlinkJobState.CANCELED
+          }
           case FlinkJobState.FAILING => FlinkJobState.FAILED
           case _ =>
             current match {

Reply via email to