This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
     new ff8b93b04 [Improve] remapping  flink job state bug fixed.
ff8b93b04 is described below

commit ff8b93b040e588c5109e610af33cab566814a083
Author: benjobs <[email protected]>
AuthorDate: Sun Apr 14 19:49:41 2024 +0800

    [Improve] remapping  flink job state bug fixed.
---
 .../src/main/assembly/script/data/mysql-data.sql   |  2 +-
 .../src/main/assembly/script/data/pgsql-data.sql   |  2 +-
 .../main/assembly/script/schema/mysql-schema.sql   |  1 -
 .../main/assembly/script/schema/pgsql-schema.sql   |  1 -
 .../main/assembly/script/upgrade/mysql/2.1.4.sql   | 20 +++--
 .../main/assembly/script/upgrade/pgsql/2.1.4.sql   | 10 ++-
 .../console/core/bean/AlertTemplate.java           |  3 +-
 .../controller/ApplicationHistoryController.java   | 18 +----
 .../console/core/entity/Application.java           |  5 +-
 .../console/core/mapper/ApplicationMapper.java     |  3 +-
 .../console/core/service/ApplicationService.java   |  2 +-
 .../core/service/impl/ApplicationServiceImpl.java  | 89 ++++++++++++----------
 .../core/service/impl/ExternalLinkServiceImpl.java |  2 +-
 .../core/service/impl/FlinkClusterServiceImpl.java |  4 +-
 .../core/service/impl/SavePointServiceImpl.java    | 25 +++---
 .../console/core/task/FlinkAppHttpWatcher.java     |  8 +-
 .../core/task/FlinkK8sChangeEventListener.java     | 23 +++---
 .../src/main/resources/db/data-h2.sql              |  2 +-
 .../src/main/resources/db/schema-h2.sql            |  1 -
 .../resources/mapper/core/ApplicationMapper.xml    | 21 ++---
 .../core/service/alert/AlertServiceTest.java       |  4 +-
 .../src/api/flink/app/flinkHistory.ts              |  6 +-
 .../src/views/flink/app/EditFlink.vue              |  2 +-
 .../src/views/flink/app/hooks/useApp.tsx           |  6 +-
 .../flink/app/hooks/useCreateAndEditSchema.ts      | 12 +--
 .../src/views/flink/app/utils/index.ts             |  4 +-
 .../setting/FlinkCluster/useClusterSetting.ts      |  6 +-
 .../flink/kubernetes/enums/FlinkJobState.scala     |  2 +-
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 32 ++++----
 29 files changed, 152 insertions(+), 164 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
index 79a4766bb..bc9565b5f 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
@@ -30,7 +30,7 @@ insert into `t_team` values (100000, 'default', null, now(), 
now());
 -- ----------------------------
 -- Records of t_flink_app
 -- ----------------------------
-insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL 
Demo', null, null, null, null, null, null, null , null, 100000, null, 1, null, 
null, null, null, null, null, null, '0', 0, null, null, null, null, null, null, 
'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(), 
now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null, 
'streampark,test');
+insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL 
Demo', null, null, null, null, null, null, null , null, 100000, 1, null, null, 
null, null, null, null, null, '0', 0, null, null, null, null, null, null, 
'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(), 
now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null, 
'streampark,test');
 
 -- ----------------------------
 -- Records of t_flink_effective
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
index f38496ff4..bcd706880 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
@@ -25,7 +25,7 @@ insert into "public"."t_team" values (100000, 'default', 
null, now(), now());
 -- ----------------------------
 -- Records of t_flink_app
 -- ----------------------------
-insert into "public"."t_flink_app" values (100000, 100000, 2, 4, null, null, 
'Flink SQL Demo', null, null, null, null, null, null, null , null, 100000, 
null, 1, null, null, null, null, null, null, null, 0, 0, null, null, null, 
null, null, null, 'Flink SQL Demo', 0, null, null, null, null, null, null, 
null, 0, 0, now(), now(), null, 1, true, null, null, null, null, null, null, 
false, null, null, null, 'streampark,test');
+insert into "public"."t_flink_app" values (100000, 100000, 2, 4, null, null, 
'Flink SQL Demo', null, null, null, null, null, null, null , null, 100000, 1, 
null, null, null, null, null, null, null, 0, 0, null, null, null, null, null, 
null, 'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, 
now(), now(), null, 1, true, null, null, null, null, null, null, false, null, 
null, null, 'streampark,test');
 
 -- ----------------------------
 -- Records of t_flink_effective
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 7d21dc70f..95024ed79 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -58,7 +58,6 @@ create table `t_flink_app` (
   `options` longtext collate utf8mb4_general_ci,
   `hot_params` text collate utf8mb4_general_ci,
   `user_id` bigint default null,
-  `app_id` varchar(64) collate utf8mb4_general_ci default null,
   `app_type` tinyint default null,
   `duration` bigint default null,
   `job_id` varchar(64) collate utf8mb4_general_ci default null,
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index 5f92121be..a523c04a4 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -206,7 +206,6 @@ create table "public"."t_flink_app" (
   "options" text collate "pg_catalog"."default",
   "hot_params" text collate "pg_catalog"."default",
   "user_id" int8,
-  "app_id" varchar(64) collate "pg_catalog"."default",
   "app_type" int2,
   "duration" int8,
   "job_id" varchar(64) collate "pg_catalog"."default",
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
index 8d25f822f..04b7f6ea4 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
@@ -17,12 +17,18 @@
 
 use streampark;
 
-set names utf8mb4;
-set foreign_key_checks = 0;
+SET NAMES utf8mb4;
+SET foreign_key_checks = 0;
 
-update `t_flink_app` a inner join `t_flink_cluster` c
-on a.`cluster_id` = c.`cluster_id`
-    and a.`execution_mode` = 5
-    set a.`flink_cluster_id` = c.`id`;
+UPDATE `t_flink_app` a INNER JOIN `t_flink_cluster` c
+ON a.`cluster_id` = c.`cluster_id`
+AND a.`execution_mode` = 5
+SET a.`flink_cluster_id` = c.`id`;
 
-set foreign_key_checks = 1;
+UPDATE `t_flink_app`
+SET `cluster_id` = `app_id`
+WHERE `execution_mode` IN (2,3,5);
+
+ALTER TABLE `t_flink_app` DROP COLUMN `app_id`;
+
+SET foreign_key_checks = 1;
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql
index 9472e343d..6b216e6ec 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql
@@ -18,5 +18,11 @@
 UPDATE t_flink_app
 SET flink_cluster_id = t_flink_cluster.id
     FROM t_flink_cluster
-where t_flink_app.cluster_id = t_flink_cluster.cluster_id
-  and t_flink_app.execution_mode = 5;
+WHERE t_flink_app.cluster_id = t_flink_cluster.cluster_id
+  AND t_flink_app.execution_mode = 5;
+
+UPDATE t_flink_app
+SET cluster_id = app_id
+WHERE execution_mode IN (2,3,5);
+
+ALTER TABLE t_flink_app DROP COLUMN app_id;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index 8dd5b4d7f..bf448fd15 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -56,7 +56,8 @@ public class AlertTemplate implements Serializable {
 
     if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
       String format = "%s/proxy/%s/";
-      String url = String.format(format, YarnUtils.getRMWebAppURL(false), 
application.getAppId());
+      String url =
+          String.format(format, YarnUtils.getRMWebAppURL(false), 
application.getClusterId());
       template.setLink(url);
     } else {
       template.setLink(null);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
index 2ba542336..c40c2f8f0 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
@@ -17,7 +17,6 @@
 
 package org.apache.streampark.console.core.controller;
 
-import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.core.service.ApplicationService;
 
@@ -32,7 +31,6 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
-import java.util.ArrayList;
 import java.util.List;
 
 @Tag(name = "FLINK_APPLICATION_HISTORY_TAG")
@@ -61,20 +59,10 @@ public class ApplicationHistoryController {
   }
 
   @Operation(summary = "List the session cluster history records")
-  @PostMapping("sessionClusterIds")
+  @PostMapping("k8sClusterId")
   @RequiresPermissions("app:create")
-  public RestResponse listSessionClusterId(int executionMode) {
-    List<String> clusterIds;
-    switch (ExecutionMode.of(executionMode)) {
-      case KUBERNETES_NATIVE_SESSION:
-      case YARN_SESSION:
-      case REMOTE:
-        clusterIds = applicationService.getRecentK8sClusterId(executionMode);
-        break;
-      default:
-        clusterIds = new ArrayList<>(0);
-        break;
-    }
+  public RestResponse recentK8sClusterId() {
+    List<String> clusterIds = applicationService.getRecentK8sClusterId();
     return RestResponse.success(clusterIds);
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index cb08078ef..88db35fa1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -83,9 +83,6 @@ public class Application implements Serializable {
   /** The name of the frontend and program displayed in yarn */
   private String jobName;
 
-  @TableField(updateStrategy = FieldStrategy.IGNORED)
-  private String appId;
-
   @TableField(updateStrategy = FieldStrategy.IGNORED)
   private String jobId;
 
@@ -96,7 +93,7 @@ public class Application implements Serializable {
   /** flink version */
   private Long versionId;
 
-  /** k8s cluster id */
+  /** 1. yarn application id(on yarn) 2. k8s application id (on k8s 
application) */
   private String clusterId;
 
   /** flink docker base image */
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index a1f05be74..cf7b8e9cc 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -41,8 +41,7 @@ public interface ApplicationMapper extends 
BaseMapper<Application> {
 
   List<String> getRecentK8sNamespace(@Param("limitSize") Integer limit);
 
-  List<String> getRecentK8sClusterId(
-      @Param("executionMode") Integer executionMode, @Param("limitSize") 
Integer limit);
+  List<String> getRecentK8sClusterId(@Param("limitSize") Integer limit);
 
   List<String> getRecentFlinkBaseImage(@Param("limitSize") Integer limit);
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index db347edf9..b9ce6d768 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -111,7 +111,7 @@ public interface ApplicationService extends 
IService<Application> {
 
   List<String> getRecentK8sNamespace();
 
-  List<String> getRecentK8sClusterId(Integer executionMode);
+  List<String> getRecentK8sClusterId();
 
   List<String> getRecentFlinkBaseImage();
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 21e646d92..f0a2261f9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -603,8 +603,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
   }
 
   @Override
-  public List<String> getRecentK8sClusterId(Integer executionMode) {
-    return baseMapper.getRecentK8sClusterId(executionMode, 
DEFAULT_HISTORY_RECORD_LIMIT);
+  public List<String> getRecentK8sClusterId() {
+    return baseMapper.getRecentK8sClusterId(DEFAULT_HISTORY_RECORD_LIMIT);
   }
 
   @Override
@@ -834,56 +834,58 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         !existsByJobName,
         "[StreamPark] Application names can't be repeated, copy application 
failed.");
 
-    Application oldApp = getById(appParam.getId());
+    Application persist = getById(appParam.getId());
     Application newApp = new Application();
     String jobName = appParam.getJobName();
 
     newApp.setJobName(jobName);
     newApp.setClusterId(
-        ExecutionMode.isSessionMode(oldApp.getExecutionModeEnum()) ? 
oldApp.getClusterId() : null);
-    newApp.setArgs(appParam.getArgs() != null ? appParam.getArgs() : 
oldApp.getArgs());
-    newApp.setVersionId(oldApp.getVersionId());
-
-    newApp.setFlinkClusterId(oldApp.getFlinkClusterId());
-    newApp.setRestartSize(oldApp.getRestartSize());
-    newApp.setJobType(oldApp.getJobType());
-    newApp.setOptions(oldApp.getOptions());
-    newApp.setDynamicProperties(oldApp.getDynamicProperties());
-    newApp.setResolveOrder(oldApp.getResolveOrder());
-    newApp.setExecutionMode(oldApp.getExecutionMode());
-    newApp.setFlinkImage(oldApp.getFlinkImage());
-    newApp.setK8sNamespace(oldApp.getK8sNamespace());
-    newApp.setK8sRestExposedType(oldApp.getK8sRestExposedType());
-    newApp.setK8sPodTemplate(oldApp.getK8sPodTemplate());
-    newApp.setK8sJmPodTemplate(oldApp.getK8sJmPodTemplate());
-    newApp.setK8sTmPodTemplate(oldApp.getK8sTmPodTemplate());
-    newApp.setK8sHadoopIntegration(oldApp.getK8sHadoopIntegration());
-    newApp.setDescription(oldApp.getDescription());
-    newApp.setAlertId(oldApp.getAlertId());
-    newApp.setCpFailureAction(oldApp.getCpFailureAction());
-    newApp.setCpFailureRateInterval(oldApp.getCpFailureRateInterval());
-    newApp.setCpMaxFailureInterval(oldApp.getCpMaxFailureInterval());
-    newApp.setMainClass(oldApp.getMainClass());
-    newApp.setAppType(oldApp.getAppType());
-    newApp.setResourceFrom(oldApp.getResourceFrom());
-    newApp.setProjectId(oldApp.getProjectId());
-    newApp.setModule(oldApp.getModule());
+        ExecutionMode.isSessionMode(persist.getExecutionModeEnum())
+            ? persist.getClusterId()
+            : null);
+    newApp.setArgs(appParam.getArgs() != null ? appParam.getArgs() : 
persist.getArgs());
+    newApp.setVersionId(persist.getVersionId());
+
+    newApp.setFlinkClusterId(persist.getFlinkClusterId());
+    newApp.setRestartSize(persist.getRestartSize());
+    newApp.setJobType(persist.getJobType());
+    newApp.setOptions(persist.getOptions());
+    newApp.setDynamicProperties(persist.getDynamicProperties());
+    newApp.setResolveOrder(persist.getResolveOrder());
+    newApp.setExecutionMode(persist.getExecutionMode());
+    newApp.setFlinkImage(persist.getFlinkImage());
+    newApp.setK8sNamespace(persist.getK8sNamespace());
+    newApp.setK8sRestExposedType(persist.getK8sRestExposedType());
+    newApp.setK8sPodTemplate(persist.getK8sPodTemplate());
+    newApp.setK8sJmPodTemplate(persist.getK8sJmPodTemplate());
+    newApp.setK8sTmPodTemplate(persist.getK8sTmPodTemplate());
+    newApp.setK8sHadoopIntegration(persist.getK8sHadoopIntegration());
+    newApp.setDescription(persist.getDescription());
+    newApp.setAlertId(persist.getAlertId());
+    newApp.setCpFailureAction(persist.getCpFailureAction());
+    newApp.setCpFailureRateInterval(persist.getCpFailureRateInterval());
+    newApp.setCpMaxFailureInterval(persist.getCpMaxFailureInterval());
+    newApp.setMainClass(persist.getMainClass());
+    newApp.setAppType(persist.getAppType());
+    newApp.setResourceFrom(persist.getResourceFrom());
+    newApp.setProjectId(persist.getProjectId());
+    newApp.setModule(persist.getModule());
     newApp.setUserId(serviceHelper.getUserId());
     newApp.setState(FlinkAppState.ADDED.getValue());
     newApp.setRelease(ReleaseState.NEED_RELEASE.get());
     newApp.setOptionState(OptionState.NONE.getValue());
-    newApp.setHotParams(oldApp.getHotParams());
+    newApp.setHotParams(persist.getHotParams());
 
     // createTime & modifyTime
     Date date = new Date();
     newApp.setCreateTime(date);
     newApp.setModifyTime(date);
 
-    newApp.setJar(oldApp.getJar());
-    newApp.setJarCheckSum(oldApp.getJarCheckSum());
-    newApp.setTags(oldApp.getTags());
-    newApp.setTeamId(oldApp.getTeamId());
-    newApp.setDependency(oldApp.getDependency());
+    newApp.setJar(persist.getJar());
+    newApp.setJarCheckSum(persist.getJarCheckSum());
+    newApp.setTags(persist.getTags());
+    newApp.setTeamId(persist.getTeamId());
+    newApp.setDependency(persist.getDependency());
 
     boolean saved = save(newApp);
     if (saved) {
@@ -908,7 +910,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       return newApp.getId();
     } else {
       throw new ApiAlertException(
-          "create application from copy failed, copy source app: " + 
oldApp.getJobName());
+          "create application from copy failed, copy source app: " + 
persist.getJobName());
     }
   }
 
@@ -990,7 +992,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     application.setDynamicProperties(appParam.getDynamicProperties());
     application.setResolveOrder(appParam.getResolveOrder());
     application.setExecutionMode(appParam.getExecutionMode());
-    application.setClusterId(appParam.getClusterId());
+    application.setFlinkClusterId(appParam.getFlinkClusterId());
     application.setFlinkImage(appParam.getFlinkImage());
     application.updateHotParams(appParam);
     application.setK8sRestExposedType(appParam.getK8sRestExposedType());
@@ -1285,6 +1287,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
   @Override
   public void cancel(Application appParam) throws Exception {
     FlinkAppHttpWatcher.setOptionState(appParam.getId(), 
OptionState.CANCELLING);
+
     Application application = getById(appParam.getId());
     application.setState(FlinkAppState.CANCELLING.getValue());
 
@@ -1293,7 +1296,9 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     applicationLog.setAppId(application.getId());
     applicationLog.setJobManagerUrl(application.getJobManagerUrl());
     applicationLog.setOptionTime(new Date());
-    applicationLog.setYarnAppId(application.getClusterId());
+    if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+      applicationLog.setYarnAppId(application.getClusterId());
+    }
 
     if (appParam.getSavePointed()) {
       if (!application.isKubernetesModeJob()) {
@@ -1692,7 +1697,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           }
 
           if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
-            application.setAppId(response.clusterId());
+            application.setClusterId(response.clusterId());
             applicationLog.setYarnAppId(response.clusterId());
           }
 
@@ -1946,7 +1951,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       case YARN_APPLICATION:
       case YARN_PER_JOB:
       case YARN_SESSION:
-        clusterId = application.getAppId();
+        clusterId = application.getClusterId();
         break;
       case KUBERNETES_NATIVE_APPLICATION:
         clusterId = application.getJobName();
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
index 694afed63..b01aed743 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
@@ -88,7 +88,7 @@ public class ExternalLinkServiceImpl extends 
ServiceImpl<ExternalLinkMapper, Ext
     HashMap<String, String> map = new HashMap();
     map.put(PlaceholderType.JOB_ID.get(), app.getJobId());
     map.put(PlaceholderType.JOB_NAME.get(), app.getJobName());
-    map.put(PlaceholderType.YARN_ID.get(), app.getAppId());
+    map.put(PlaceholderType.YARN_ID.get(), app.getClusterId());
     PropertyPlaceholderHelper propertyPlaceholderHelper = new 
PropertyPlaceholderHelper("{", "}");
     link.setRenderedLinkUrl(
         
propertyPlaceholderHelper.replacePlaceholders(link.getLinkUrl().trim(), 
map::get));
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 1240313d9..efc2d295a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -174,7 +174,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       log.info("deploy cluster request: {}", deployRequest);
       Future<DeployResponse> future =
           bootstrapExecutor.submit(() -> FlinkClient.deploy(deployRequest));
-      DeployResponse deployResponse = future.get(5, TimeUnit.SECONDS);
+      DeployResponse deployResponse = future.get();
       if (deployResponse.error() != null) {
         throw new ApiDetailException(
             "deploy cluster "
@@ -313,7 +313,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     try {
       Future<ShutDownResponse> future =
           bootstrapExecutor.submit(() -> FlinkClient.shutdown(deployRequest));
-      ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
+      ShutDownResponse shutDownResponse = future.get();
       if (shutDownResponse.error() != null) {
         throw new ApiDetailException(
             "shutdown cluster failed, error: \n"
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index b380483cc..5b59c9ff4 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -289,16 +289,15 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     applicationLog.setAppId(application.getId());
     applicationLog.setJobManagerUrl(application.getJobManagerUrl());
     applicationLog.setOptionTime(new Date());
-    applicationLog.setYarnAppId(application.getClusterId());
-
+    if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+      applicationLog.setYarnAppId(application.getClusterId());
+    }
     if (!application.isKubernetesModeJob()) {
       FlinkAppHttpWatcher.addSavepoint(application.getId());
       application.setOptionState(OptionState.SAVEPOINTING.getValue());
       application.setOptionTime(new Date());
       this.applicationService.updateById(application);
       flinkAppHttpWatcher.initialize();
-    } else {
-      this.applicationService.updateById(application);
     }
 
     FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
@@ -354,11 +353,13 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
         .whenComplete(
             (t, e) -> {
               applicationLogService.save(applicationLog);
-              application.setOptionState(OptionState.NONE.getValue());
-              application.setOptionTime(new Date());
-              applicationService.update(application);
-              flinkAppHttpWatcher.cleanSavepoint(application);
-              flinkAppHttpWatcher.initialize();
+              if (!application.isKubernetesModeJob()) {
+                application.setOptionState(OptionState.NONE.getValue());
+                application.setOptionTime(new Date());
+                applicationService.update(application);
+                flinkAppHttpWatcher.cleanSavepoint(application);
+                flinkAppHttpWatcher.initialize();
+              }
             });
   }
 
@@ -396,7 +397,9 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
 
   private String getClusterId(Application application, FlinkCluster cluster) {
     if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
-      return application.getClusterId();
+      return 
ExecutionMode.isKubernetesSessionMode(application.getExecutionMode())
+          ? cluster.getClusterId()
+          : application.getClusterId();
     } else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
       if 
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
         Utils.notNull(
@@ -406,7 +409,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
                 application.getFlinkClusterId()));
         return cluster.getClusterId();
       } else {
-        return application.getAppId();
+        return application.getClusterId();
       }
     }
     return null;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 33fbcd310..36e115b8f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -701,12 +701,12 @@ public class FlinkAppHttpWatcher {
   }
 
   private YarnAppInfo httpYarnAppInfo(Application application) throws 
Exception {
-    String reqURL = "ws/v1/cluster/apps/".concat(application.getAppId());
+    String reqURL = "ws/v1/cluster/apps/".concat(application.getClusterId());
     return yarnRestRequest(reqURL, YarnAppInfo.class);
   }
 
   private Overview httpOverview(Application application) throws IOException {
-    String appId = application.getAppId();
+    String appId = application.getClusterId();
     if (appId != null) {
       if 
(application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
           || 
application.getExecutionModeEnum().equals(ExecutionMode.YARN_PER_JOB)) {
@@ -731,7 +731,7 @@ public class FlinkAppHttpWatcher {
       String reqURL;
       if (StringUtils.isEmpty(application.getJobManagerUrl())) {
         String format = "proxy/%s/" + flinkUrl;
-        reqURL = String.format(format, application.getAppId());
+        reqURL = String.format(format, application.getClusterId());
       } else {
         String format = "%s/" + flinkUrl;
         reqURL = String.format(format, application.getJobManagerUrl());
@@ -765,7 +765,7 @@ public class FlinkAppHttpWatcher {
       String reqURL;
       if (StringUtils.isEmpty(application.getJobManagerUrl())) {
         String format = "proxy/%s/" + flinkUrl;
-        reqURL = String.format(format, application.getAppId(), 
application.getJobId());
+        reqURL = String.format(format, application.getClusterId(), 
application.getJobId());
       } else {
         String format = "%s/" + flinkUrl;
         reqURL = String.format(format, application.getJobManagerUrl(), 
application.getJobId());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 47129b165..a006aa085 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -81,17 +81,8 @@ public class FlinkK8sChangeEventListener {
       return;
     }
 
-    Enumeration.Value inferState =
-        FlinkJobStatusWatcher.inferFlinkJobStateFromPersist(
-            jobStatus.jobState(), 
toK8sFlinkJobState(app.getFlinkAppStateEnum()));
-
-    FlinkAppState appState = fromK8sFlinkJobState(inferState);
-    if (app.getFlinkAppStateEnum() == appState) {
-      return;
-    }
-
     // update application record
-    setByJobStatusCV(app, jobStatus, inferState);
+    setByJobStatusCV(app, jobStatus);
 
     applicationService.persistMetrics(app);
 
@@ -155,8 +146,12 @@ public class FlinkK8sChangeEventListener {
     
checkpointProcessor.process(applicationService.getById(event.trackId().appId()),
 checkPoint);
   }
 
-  private void setByJobStatusCV(
-      Application app, JobStatusCV jobStatus, Enumeration.Value inferState) {
+  private void setByJobStatusCV(Application app, JobStatusCV jobStatus) {
+    // infer the final flink job state
+    Enumeration.Value state =
+        FlinkJobStatusWatcher.inferFlinkJobStateFromPersist(
+            jobStatus.jobState(), 
toK8sFlinkJobState(app.getFlinkAppStateEnum()));
+
     // corrective start-time / end-time / duration
     long preStartTime = app.getStartTime() != null ? 
app.getStartTime().getTime() : 0;
     long startTime = Math.max(jobStatus.jobStartTime(), preStartTime);
@@ -164,7 +159,7 @@ public class FlinkK8sChangeEventListener {
     long endTime = Math.max(jobStatus.jobEndTime(), preEndTime);
     long duration = jobStatus.duration();
 
-    if (FlinkJobState.isEndState(inferState)) {
+    if (FlinkJobState.isEndState(state)) {
       if (endTime < startTime) {
         endTime = System.currentTimeMillis();
       }
@@ -173,7 +168,7 @@ public class FlinkK8sChangeEventListener {
       }
     }
 
-    app.setState(fromK8sFlinkJobState(inferState).getValue());
+    app.setState(fromK8sFlinkJobState(state).getValue());
     app.setJobId(jobStatus.jobId());
     app.setTotalTask(jobStatus.taskTotal());
 
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
index 3433433a8..a247d61e6 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
@@ -25,7 +25,7 @@ insert into `t_team` values (100001, 'test', 'The test team', 
now(), now());
 -- ----------------------------
 -- Records of t_flink_app
 -- ----------------------------
-insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL 
Demo', null, null, null, null, null, null, null , null, 100000, null, 1, null, 
null, null, null, null, null, null, '0', 0, null, null, null, null, null, null, 
'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(), 
now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null, 
'streampark,test');
+insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL 
Demo', null, null, null, null, null, null, null , null, 100000, 1, null, null, 
null, null, null, null, null, '0', 0, null, null, null, null, null, null, 
'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(), 
now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null, 
'streampark,test');
 
 -- ----------------------------
 -- Records of t_flink_effective
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index f81e9f1dd..0816a78f1 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -51,7 +51,6 @@ create table if not exists `t_flink_app` (
   `options` longtext,
   `hot_params` text ,
   `user_id` bigint default null,
-  `app_id` varchar(64) default null,
   `app_type` tinyint default null,
   `duration` bigint default null,
   `job_id` varchar(64) default null,
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index b6f71849c..679473f63 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -28,7 +28,6 @@
         <result column="dynamic_properties" jdbcType="LONGVARCHAR" 
property="dynamicProperties"/>
         <result column="hot_params" jdbcType="VARCHAR" property="hotParams"/>
         <result column="job_name" jdbcType="VARCHAR" property="jobName"/>
-        <result column="app_id" jdbcType="VARCHAR" property="appId"/>
         <result column="version_id" jdbcType="BIGINT" property="versionId"/>
         <result column="cluster_id" jdbcType="VARCHAR" property="clusterId"/>
         <result column="flink_cluster_id" jdbcType="BIGINT" 
property="flinkClusterId"/>
@@ -178,13 +177,9 @@
                     and p.name like '%' || #{application.projectName} || '%'
                 </if>
             </if>
-            <if test="application.appId != null and application.appId != ''">
-                and t.app_id = #{application.appId}
-            </if>
             <if test="application.state != null and application.state != ''">
                 and t.state = #{application.state}
             </if>
-
             <if test="application.userId != null and application.userId != ''">
                 and t.user_id = #{application.userId}
             </if>
@@ -286,12 +281,12 @@
     <update id="mapping" 
parameterType="org.apache.streampark.console.core.entity.Application">
         update t_flink_app
         <set>
+            <if test="application.clusterId != null">
+                cluster_id=#{application.clusterId},
+            </if>
             <if test="application.jobId != null">
                 job_id=#{application.jobId},
             </if>
-            <if test="application.appId != null">
-                app_id=#{application.appId},
-            </if>
             end_time=null,
             state=14,
             tracking=1
@@ -314,11 +309,11 @@
     <select id="getRecentK8sClusterId" resultType="java.lang.String" 
parameterType="java.util.Map">
         select cluster_id
         from (
-            select cluster_id, max(create_time) as ct
-            from t_flink_app
-            where cluster_id is not null
-            and execution_mode = #{executionMode}
-            group by cluster_id
+            select c.cluster_id, max(a.create_time) as ct
+            from t_flink_app a inner join t_flink_cluster c
+            on a.flink_cluster_id = c.id
+            where a.execution_mode = 5
+            group by c.cluster_id
             order by ct desc
         ) as ci
         limit #{limitSize}
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
index 348a2f604..955f9104f 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
@@ -162,7 +162,7 @@ class AlertServiceTest {
     Application application = new Application();
     application.setStartTime(new Date());
     application.setJobName("Test My Job");
-    application.setAppId("1234567890");
+    application.setClusterId("1234567890");
     application.setAlertId(1);
 
     application.setRestartCount(5);
@@ -205,7 +205,7 @@ class AlertServiceTest {
       duration = application.getEndTime().getTime() - 
application.getStartTime().getTime();
     }
     String format = "%s/proxy/%s/";
-    String url = String.format(format, YarnUtils.getRMWebAppURL(false), 
application.getAppId());
+    String url = String.format(format, YarnUtils.getRMWebAppURL(false), 
application.getClusterId());
 
     AlertTemplate template = new AlertTemplate();
     template.setJobName(application.getJobName());
diff --git 
a/streampark-console/streampark-console-webapp/src/api/flink/app/flinkHistory.ts
 
b/streampark-console/streampark-console-webapp/src/api/flink/app/flinkHistory.ts
index 3352777ce..11cacb7dd 100644
--- 
a/streampark-console/streampark-console-webapp/src/api/flink/app/flinkHistory.ts
+++ 
b/streampark-console/streampark-console-webapp/src/api/flink/app/flinkHistory.ts
@@ -19,7 +19,7 @@ import { defHttp } from '/@/utils/http/axios';
 enum HISTORY_API {
   UPLOAD_JARS = '/flink/history/uploadJars',
   K8S_NAMESPACES = '/flink/history/k8sNamespaces',
-  SESSION_CLUSTER_IDS = '/flink/history/sessionClusterIds',
+  K8S_SESSION_CLUSTER_ID = '/flink/history/k8sClusterId',
   FLINK_BASE_IMAGES = '/flink/history/flinkBaseImages',
   FLINK_POD_TEMPLATES = '/flink/history/flinkPodTemplates',
   FLINK_JM_POD_TEMPLATES = '/flink/history/flinkJmPodTemplates',
@@ -42,8 +42,8 @@ export function fetchUploadJars(): Promise<string[]> {
   return defHttp.post({ url: HISTORY_API.UPLOAD_JARS });
 }
 
-export function fetchSessionClusterIds(data) {
-  return defHttp.post({ url: HISTORY_API.SESSION_CLUSTER_IDS, data });
+export function fetchK8sSessionClusterId() {
+  return defHttp.post({ url: HISTORY_API.K8S_SESSION_CLUSTER_ID });
 }
 
 export function fetchFlinkBaseImages() {
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
index ed8f04c93..8f40c8f2a 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
@@ -204,7 +204,7 @@
       return;
     }
     const value = await handleGetApplication();
-    setFieldsValue(value);
+    await setFieldsValue(value);
     if (app.resourceFrom == ResourceFromEnum.CICD) {
       jars.value = await fetchListJars({
         id: app.projectId,
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
index 190aa6cd3..c22f3a46a 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
@@ -333,10 +333,10 @@ export const useFlinkApplication = (openStartModal: Fn) 
=> {
             ].includes(app.executionMode) && (
               <Form.Item
                 label="YARN Application Id"
-                name="appId"
+                name="clusterId"
                 rules={[{ required: true, message: 'YARN ApplicationId is 
required' }]}
               >
-                <Input type="text" placeholder="ApplicationId" 
v-model:value={formValue.appId} />
+                <Input type="text" placeholder="ApplicationId" 
v-model:value={formValue.clusterId} />
               </Form.Item>
             )}
             <Form.Item
@@ -356,7 +356,7 @@ export const useFlinkApplication = (openStartModal: Fn) => {
           await mappingRef.value.validate();
           await fetchMapping({
             id: app.id,
-            appId: formValue.appId,
+            clusterId: formValue.clusterId || null,
             jobId: formValue.jobId,
           });
           Swal.fire({
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index fd8af4ef6..09e58f493 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -42,7 +42,7 @@ import { fetchVariableAll } from '/@/api/flink/variable';
 import {
   fetchFlinkBaseImages,
   fetchK8sNamespaces,
-  fetchSessionClusterIds,
+  fetchK8sSessionClusterId,
 } from '/@/api/flink/app/flinkHistory';
 import { fetchSelect } from '/@/api/flink/project';
 import { fetchAlertSetting } from '/@/api/flink/setting/alert';
@@ -584,13 +584,13 @@ export const useCreateAndEditSchema = (
     });
 
     //get flinkCluster
-    fetchFlinkCluster().then((res) => {
-      flinkClusters.value = res;
+    fetchFlinkCluster().then((resp) => {
+      flinkClusters.value = resp;
     });
-    fetchK8sNamespaces().then((res) => {
-      historyRecord.k8sNamespace = res;
+    fetchK8sNamespaces().then((resp) => {
+      historyRecord.k8sNamespace = resp;
     });
-    fetchSessionClusterIds({ executionMode: ExecModeEnum.KUBERNETES_SESSION 
}).then((res) => {
+    fetchK8sSessionClusterId().then((res) => {
       historyRecord.k8sSessionClusterId = res;
     });
     fetchFlinkBaseImages().then((res) => {
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 87274d832..9bcc01284 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -115,9 +115,9 @@ export async function handleView(app: AppListRecord, yarn: 
Nullable<string>) {
   ) {
     if (!yarn) {
       const res = await fetchYarn();
-      window.open(res + '/proxy/' + app['appId'] + '/');
+      window.open(res + '/proxy/' + app['clusterId'] + '/');
     } else {
-      window.open(yarn + '/proxy/' + app['appId'] + '/');
+      window.open(yarn + '/proxy/' + app['clusterId'] + '/');
     }
   } else {
     if (app.flinkRestUrl) {
diff --git 
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/useClusterSetting.ts
 
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/useClusterSetting.ts
index dcbf76d9a..14451b34e 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/useClusterSetting.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/useClusterSetting.ts
@@ -33,7 +33,7 @@ import optionData from '../../flink/app/data/option';
 import {
   fetchFlinkBaseImages,
   fetchK8sNamespaces,
-  fetchSessionClusterIds,
+  fetchK8sSessionClusterId,
 } from '/@/api/flink/app/flinkHistory';
 import { handleFormValue } from '../../flink/app/utils';
 import { useMessage } from '/@/hooks/web/useMessage';
@@ -371,9 +371,7 @@ export const useClusterSetting = () => {
     fetchK8sNamespaces().then((res) => {
       historyRecord.k8sNamespace = res;
     });
-    fetchSessionClusterIds({
-      executionMode: ExecModeEnum.KUBERNETES_SESSION,
-    }).then((res) => {
+    fetchK8sSessionClusterId().then((res) => {
       historyRecord.k8sSessionClusterId = res;
     });
     fetchFlinkBaseImages().then((res) => {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/enums/FlinkJobState.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/enums/FlinkJobState.scala
index f56065e9d..fcd4e88e0 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/enums/FlinkJobState.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/enums/FlinkJobState.scala
@@ -41,7 +41,7 @@ object FlinkJobState extends Enumeration {
       SUSPENDED, RECONCILING = Value
 
   // ending flink states, the tracking monitor will stop tracking these states 
of flink job.
-  val endingStates = Seq(FAILED, CANCELED, FINISHED, POS_TERMINATED, 
TERMINATED, LOST)
+  private val endingStates = Seq(FAILED, CANCELED, FINISHED, POS_TERMINATED, 
TERMINATED, LOST)
 
   def of(value: String): FlinkJobState.Value = {
     this.values.find(_.toString == value).getOrElse(OTHER)
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 632dc8b58..9c5590f0a 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -116,27 +116,24 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
       val sessionIds = trackIds.filter(_.executeMode == 
FlinkK8sExecuteMode.SESSION)
       val sessionCluster = 
sessionIds.groupBy(_.toClusterKey.toString).flatMap(_._2).toSet
       val sessionFuture = sessionCluster.map {
-        id =>
-          val future = Future(touchSessionAllJob(id))
+        trackId =>
+          val future = Future(touchSessionAllJob(trackId))
           future.onComplete(_.toOption match {
             case Some(map) =>
-              sessionIds.foreach(
-                id => {
-                  map.find(_._1.jobId == id.jobId) match {
-                    case Some(job) =>
-                      updateState(job._1.copy(appId = id.appId), job._2)
+              map.find(_._1.jobId == trackId.jobId) match {
+                case Some(job) =>
+                  updateState(job._1.copy(appId = trackId.appId), job._2)
+                case _ =>
+                  touchSessionJob(trackId) match {
+                    case Some(state) =>
+                      if (state.jobState == FlinkJobState.LOST) {
+                        // can't find that job in the k8s cluster.
+                        watchController.unWatching(trackId)
+                      }
+                      eventBus.postSync(FlinkJobStatusChangeEvent(trackId, 
state))
                     case _ =>
-                      // can't find that job in the k8s cluster.
-                      watchController.unWatching(id)
-                      val lostState = JobStatusCV(
-                        jobState = FlinkJobState.LOST,
-                        jobId = id.jobId,
-                        pollEmitTime = System.currentTimeMillis,
-                        pollAckTime = System.currentTimeMillis
-                      )
-                      eventBus.postSync(FlinkJobStatusChangeEvent(id, 
lostState))
                   }
-                })
+              }
             case _ =>
           })
           future
@@ -221,6 +218,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
       watchController.jobStatuses.put(trackId, jobState)
       // set jobId to trackIds
       watchController.trackIds.update(trackId)
+
       eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
     }
 

Reply via email to