This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9f825706d [Improve] Flink cluster status monitoring improvement (#2826)
9f825706d is described below

commit 9f825706d2ef5ad354d11adbe53ed25a71f87087
Author: benjobs <[email protected]>
AuthorDate: Tue Jul 11 23:53:29 2023 +0800

    [Improve] Flink cluster status monitoring improvement (#2826)
    
    * [Improve] Flink cluster status monitoring improvement
    
    * flink cluster jobManagerUrl improvement
---
 .../streampark/common/enums/ClusterState.java      |  20 +--
 .../main/assembly/script/upgrade/mysql/2.2.0.sql   |  26 +--
 .../main/assembly/script/upgrade/pgsql/2.2.0.sql   |  22 +--
 .../console/core/annotation/AppUpdated.java        |   2 +-
 .../console/core/aspect/StreamParkAspect.java      |   6 +-
 .../console/core/mapper/ApplicationMapper.java     |   2 +-
 .../console/core/service/ApplicationService.java   |   2 +-
 .../console/core/service/FlinkClusterService.java  |   3 +
 .../core/service/impl/AppBuildPipeServiceImpl.java |  12 +-
 .../core/service/impl/ApplicationServiceImpl.java  |  32 ++--
 .../core/service/impl/FlinkClusterServiceImpl.java |  86 +++++----
 .../core/service/impl/ProjectServiceImpl.java      |   8 +-
 .../core/service/impl/SavePointServiceImpl.java    |  10 +-
 .../console/core/task/CheckpointProcessor.java     |   4 +-
 .../console/core/task/FlinkClusterWatcher.java     | 194 +++++++--------------
 ...nkRESTAPIWatcher.java => FlinkHttpWatcher.java} | 127 +++++++-------
 .../resources/mapper/core/ApplicationMapper.xml    |   2 +-
 17 files changed, 242 insertions(+), 316 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index decfc770a..d9a715f7e 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -60,23 +60,13 @@ public enum ClusterState implements Serializable {
     return value;
   }
 
-  public static boolean isCreateState(ClusterState state) {
-    return CREATED.equals(state);
-  }
-
-  public static boolean isRunningState(ClusterState state) {
+  public static boolean isRunning(ClusterState state) {
     return RUNNING.equals(state);
   }
 
-  public static boolean isStoppedState(ClusterState state) {
-    return STOPPED.equals(state);
-  }
-
-  public static boolean isLostState(ClusterState state) {
-    return LOST.equals(state);
-  }
-
-  public static boolean isUnknownState(ClusterState state) {
-    return UNKNOWN.equals(state);
+  public static boolean isFailed(ClusterState state) {
+    return state == ClusterState.STOPPED
+        || state == ClusterState.LOST
+        || state == ClusterState.UNKNOWN;
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index 16404952d..757ac6cd7 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -25,19 +25,19 @@ set foreign_key_checks = 0;
 -- ----------------------------
 drop table if exists `t_resource`;
 create table `t_resource` (
-                                `id` bigint not null auto_increment,
-                                `resource_name` varchar(128) collate 
utf8mb4_general_ci not null comment 'The name of the resource file',
-                                `resource_type` int  not null comment '0:app 
1:common 2:connector 3:format 4:udf',
-                                `resource` text collate utf8mb4_general_ci 
comment 'resource content, including jars and poms',
-                                `engine_type` int  not null comment 'compute 
engine type, 0:apache flink 1:apache spark',
-                                `main_class` varchar(255) collate 
utf8mb4_general_ci default null,
-                                `description` text collate utf8mb4_general_ci 
default null comment 'More detailed description of resource',
-                                `creator_id` bigint collate utf8mb4_general_ci 
not null comment 'user id of creator',
-                                `team_id` bigint collate utf8mb4_general_ci 
not null comment 'team id',
-                                `create_time` datetime not null default 
current_timestamp comment 'create time',
-                                `modify_time` datetime not null default 
current_timestamp on update current_timestamp comment 'modify time',
-                                primary key (`id`) using btree,
-                                unique key `un_team_vcode_inx` 
(`team_id`,`resource_name`) using btree
+`id` bigint not null auto_increment,
+`resource_name` varchar(128) collate utf8mb4_general_ci not null comment 'The 
name of the resource file',
+`resource_type` int  not null comment '0:app 1:common 2:connector 3:format 
4:udf',
+`resource` text collate utf8mb4_general_ci comment 'resource content, 
including jars and poms',
+`engine_type` int  not null comment 'compute engine type, 0:apache flink 
1:apache spark',
+`main_class` varchar(255) collate utf8mb4_general_ci default null,
+`description` text collate utf8mb4_general_ci default null comment 'More 
detailed description of resource',
+`creator_id` bigint collate utf8mb4_general_ci not null comment 'user id of 
creator',
+`team_id` bigint collate utf8mb4_general_ci not null comment 'team id',
+`create_time` datetime not null default current_timestamp comment 'create 
time',
+`modify_time` datetime not null default current_timestamp on update 
current_timestamp comment 'modify time',
+primary key (`id`) using btree,
+unique key `un_team_vcode_inx` (`team_id`,`resource_name`) using btree
 ) engine=innodb auto_increment=100000 default charset=utf8mb4 
collate=utf8mb4_general_ci;
 
 alter table `t_flink_sql`
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
index 56b055ce8..7f5a54f7d 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
@@ -22,17 +22,17 @@ create sequence "public"."streampark_t_resource_id_seq"
     increment 1 start 10000 cache 1 minvalue 10000 maxvalue 
9223372036854775807;
 
 create table "public"."t_resource" (
-                                       "id" int8 not null default 
nextval('streampark_t_resource_id_seq'::regclass),
-                                       "resource_name" varchar(128) collate 
"pg_catalog"."default" not null,
-                                       "resource_type" int4,
-                                       "resource" text collate 
"pg_catalog"."default",
-                                       "engine_type" int4,
-                                       "main_class" varchar(255) collate 
"pg_catalog"."default",
-                                       "description" text collate 
"pg_catalog"."default" default null,
-                                       "creator_id" int8  not null,
-                                       "team_id" int8  not null,
-                                       "create_time" timestamp(6) not null 
default timezone('UTC-8'::text, (now())::timestamp(0) without time zone),
-                                       "modify_time" timestamp(6) not null 
default timezone('UTC-8'::text, (now())::timestamp(0) without time zone)
+"id" int8 not null default nextval('streampark_t_resource_id_seq'::regclass),
+"resource_name" varchar(128) collate "pg_catalog"."default" not null,
+"resource_type" int4,
+"resource" text collate "pg_catalog"."default",
+"engine_type" int4,
+"main_class" varchar(255) collate "pg_catalog"."default",
+"description" text collate "pg_catalog"."default" default null,
+"creator_id" int8  not null,
+"team_id" int8  not null,
+"create_time" timestamp(6) not null default timezone('UTC-8'::text, 
(now())::timestamp(0) without time zone),
+"modify_time" timestamp(6) not null default timezone('UTC-8'::text, 
(now())::timestamp(0) without time zone)
 )
 ;
 comment on column "public"."t_resource"."id" is 'Resource id';
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
index 345a361c7..97c77a79d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
@@ -30,7 +30,7 @@ import java.lang.annotation.Target;
  * cause the application to be updated, Will work together with {@link
  * 
org.apache.streampark.console.core.aspect.StreamParkAspect#appUpdated(ProceedingJoinPoint)},
 The
  * final purpose will be refresh {@link
- * org.apache.streampark.console.core.task.FlinkRESTAPIWatcher#WATCHING_APPS}, 
Make the state of the
+ * org.apache.streampark.console.core.task.FlinkHttpWatcher#WATCHING_APPS}, 
Make the state of the
  * job consistent with the database
  */
 @Target(ElementType.METHOD)
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
index 6646ead55..2392885bd 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
@@ -28,7 +28,7 @@ import 
org.apache.streampark.console.core.enums.PermissionType;
 import org.apache.streampark.console.core.enums.UserType;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkHttpWatcher;
 import org.apache.streampark.console.system.entity.AccessToken;
 import org.apache.streampark.console.system.entity.Member;
 import org.apache.streampark.console.system.entity.User;
@@ -58,7 +58,7 @@ import java.util.Objects;
 @Aspect
 public class StreamParkAspect {
 
-  @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+  @Autowired private FlinkHttpWatcher flinkHttpWatcher;
   @Autowired private CommonService commonService;
   @Autowired private MemberService memberService;
   @Autowired private ApplicationService applicationService;
@@ -93,7 +93,7 @@ public class StreamParkAspect {
     MethodSignature methodSignature = (MethodSignature) 
joinPoint.getSignature();
     log.debug("appUpdated aspect, method:{}", methodSignature.getName());
     Object target = joinPoint.proceed();
-    flinkRESTAPIWatcher.init();
+    flinkHttpWatcher.init();
     return target;
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index d1b59fa07..55c2ee72f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -66,5 +66,5 @@ public interface ApplicationMapper extends 
BaseMapper<Application> {
 
   boolean existsJobByClusterId(@Param("clusterId") Long clusterId);
 
-  Integer getAffectedJobsByClusterId(@Param("clusterId") Long clusterId);
+  Integer countJobsByClusterId(@Param("clusterId") Long clusterId);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index a64922afa..cba588c65 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -105,7 +105,7 @@ public interface ApplicationService extends 
IService<Application> {
 
   boolean existsJobByClusterId(Long clusterId);
 
-  Integer getAffectedJobsByClusterId(Long clusterId);
+  Integer countJobsByClusterId(Long clusterId);
 
   boolean existsJobByFlinkEnvId(Long flinkEnvId);
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index d149152af..6d4559735 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service;
 
+import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.console.core.bean.ResponseResult;
 import org.apache.streampark.console.core.entity.FlinkCluster;
@@ -47,4 +48,6 @@ public interface FlinkClusterService extends 
IService<FlinkCluster> {
   Boolean existsByFlinkEnvId(Long id);
 
   List<FlinkCluster> getByExecutionModes(Collection<ExecutionMode> 
executionModes);
+
+  void updateClusterFinalState(Long id, ClusterState state);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index ac4917bc7..2a38308cb 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -55,7 +55,7 @@ import 
org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.MessageService;
 import org.apache.streampark.console.core.service.ResourceService;
 import org.apache.streampark.console.core.service.SettingService;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkHttpWatcher;
 import org.apache.streampark.flink.packer.docker.DockerConf;
 import org.apache.streampark.flink.packer.maven.Artifact;
 import org.apache.streampark.flink.packer.maven.DependencyInfo;
@@ -131,7 +131,7 @@ public class AppBuildPipeServiceImpl
 
   @Autowired private ApplicationLogService applicationLogService;
 
-  @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+  @Autowired private FlinkHttpWatcher flinkHttpWatcher;
 
   @Autowired private ApplicationConfigService applicationConfigService;
 
@@ -217,8 +217,8 @@ public class AppBuildPipeServiceImpl
             app.setRelease(ReleaseState.RELEASING.get());
             applicationService.updateRelease(app);
 
-            if (flinkRESTAPIWatcher.isWatchingApp(app.getId())) {
-              flinkRESTAPIWatcher.init();
+            if (flinkHttpWatcher.isWatchingApp(app.getId())) {
+              flinkHttpWatcher.init();
             }
 
             // 1) checkEnv
@@ -341,8 +341,8 @@ public class AppBuildPipeServiceImpl
             }
             applicationService.updateRelease(app);
             applicationLogService.save(applicationLog);
-            if (flinkRESTAPIWatcher.isWatchingApp(app.getId())) {
-              flinkRESTAPIWatcher.init();
+            if (flinkHttpWatcher.isWatchingApp(app.getId())) {
+              flinkHttpWatcher.init();
             }
           }
         });
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 98253bb9f..886756770 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -78,7 +78,7 @@ import 
org.apache.streampark.console.core.service.SavePointService;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.VariableService;
 import org.apache.streampark.console.core.service.YarnQueueService;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkHttpWatcher;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.CancelRequest;
 import org.apache.streampark.flink.client.bean.CancelResponse;
@@ -238,7 +238,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     Integer runningJob = 0;
 
     // stat metrics from other than kubernetes mode
-    for (Application app : FlinkRESTAPIWatcher.getWatchingApps()) {
+    for (Application app : FlinkHttpWatcher.getWatchingApps()) {
       if (!teamId.equals(app.getTeamId())) {
         continue;
       }
@@ -400,7 +400,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     if (isKubernetesApp(application)) {
       k8SFlinkTrackMonitor.unWatching(toTrackId(application));
     } else {
-      FlinkRESTAPIWatcher.unWatching(paramApp.getId());
+      FlinkHttpWatcher.unWatching(paramApp.getId());
     }
     return true;
   }
@@ -449,7 +449,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     if (!FlinkAppState.CANCELED.equals(state)) {
       return false;
     }
-    long cancelUserId = FlinkRESTAPIWatcher.getCanceledJobUserId(appId);
+    long cancelUserId = FlinkHttpWatcher.getCanceledJobUserId(appId);
     long appUserId = application.getUserId();
     return cancelUserId != -1 && cancelUserId != appUserId;
   }
@@ -530,7 +530,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     if (exists) {
       return true;
     }
-    for (Application application : FlinkRESTAPIWatcher.getWatchingApps()) {
+    for (Application application : FlinkHttpWatcher.getWatchingApps()) {
       if (clusterId.equals(application.getFlinkClusterId())
           && FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum())) 
{
         return true;
@@ -545,8 +545,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
   }
 
   @Override
-  public Integer getAffectedJobsByClusterId(Long clusterId) {
-    return baseMapper.getAffectedJobsByClusterId(clusterId);
+  public Integer countJobsByClusterId(Long clusterId) {
+    return baseMapper.countJobsByClusterId(clusterId);
   }
 
   @Override
@@ -1208,14 +1208,14 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     if (isKubernetesApp(application)) {
       k8SFlinkTrackMonitor.doWatching(toTrackId(application));
     } else {
-      FlinkRESTAPIWatcher.doWatching(application);
+      FlinkHttpWatcher.doWatching(application);
     }
     return mapping;
   }
 
   @Override
   public void cancel(Application appParam) throws Exception {
-    FlinkRESTAPIWatcher.setOptionState(appParam.getId(), 
OptionState.CANCELLING);
+    FlinkHttpWatcher.setOptionState(appParam.getId(), OptionState.CANCELLING);
     Application application = getById(appParam.getId());
     application.setState(FlinkAppState.CANCELLING.getValue());
 
@@ -1227,7 +1227,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     applicationLog.setYarnAppId(application.getClusterId());
 
     if (appParam.getSavePointed()) {
-      FlinkRESTAPIWatcher.addSavepoint(application.getId());
+      FlinkHttpWatcher.addSavepoint(application.getId());
       application.setOptionState(OptionState.SAVEPOINTING.getValue());
     } else {
       application.setOptionState(OptionState.CANCELLING.getValue());
@@ -1238,7 +1238,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
     Long userId = commonService.getUserId();
     if (!application.getUserId().equals(userId)) {
-      FlinkRESTAPIWatcher.addCanceledApp(application.getId(), userId);
+      FlinkHttpWatcher.addCanceledApp(application.getId(), userId);
     }
 
     FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
@@ -1343,7 +1343,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                   k8SFlinkTrackMonitor.unWatching(id);
                   k8SFlinkTrackMonitor.doWatching(id);
                 } else {
-                  FlinkRESTAPIWatcher.unWatching(application.getId());
+                  FlinkHttpWatcher.unWatching(application.getId());
                 }
 
                 String exception = Utils.stringifyException(e);
@@ -1606,8 +1606,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               if (isKubernetesApp(application)) {
                 k8SFlinkTrackMonitor.doWatching(toTrackId(application));
               } else {
-                FlinkRESTAPIWatcher.setOptionState(appParam.getId(), 
OptionState.STARTING);
-                FlinkRESTAPIWatcher.doWatching(application);
+                FlinkHttpWatcher.setOptionState(appParam.getId(), 
OptionState.STARTING);
+                FlinkHttpWatcher.doWatching(application);
               }
 
               applicationLog.setSuccess(true);
@@ -1628,7 +1628,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                 if (isKubernetesApp(app)) {
                   k8SFlinkTrackMonitor.unWatching(toTrackId(app));
                 } else {
-                  FlinkRESTAPIWatcher.unWatching(appParam.getId());
+                  FlinkHttpWatcher.unWatching(appParam.getId());
                 }
               }
             })
@@ -1734,7 +1734,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       k8SFlinkTrackMonitor.unWatching(id);
       k8SFlinkTrackMonitor.doWatching(id);
     } else {
-      FlinkRESTAPIWatcher.unWatching(application.getId());
+      FlinkHttpWatcher.unWatching(application.getId());
     }
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 769ddf626..ad6af6617 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -43,6 +43,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.google.common.annotations.VisibleForTesting;
 import lombok.extern.slf4j.Slf4j;
@@ -149,7 +150,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     }
     boolean ret = save(flinkCluster);
     if (ret && ExecutionMode.isRemoteMode(flinkCluster.getExecutionMode())) {
-      FlinkClusterWatcher.addFlinkCluster(flinkCluster);
+      FlinkClusterWatcher.addWatching(flinkCluster);
     }
     return ret;
   }
@@ -165,7 +166,8 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
           "Deploy cluster failed, unknown reason,please check you params or 
StreamPark error log");
       if 
(ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) {
         String address =
-            YarnUtils.getRMWebAppURL(true) + "/proxy/" + 
deployResponse.clusterId() + "/";
+            String.format(
+                "%s/proxy/%s/", YarnUtils.getRMWebAppURL(true), 
deployResponse.clusterId());
         flinkCluster.setAddress(address);
         flinkCluster.setJobManagerUrl(deployResponse.address());
       } else {
@@ -176,12 +178,10 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       flinkCluster.setException(null);
       flinkCluster.setStartTime(new Date());
       flinkCluster.setEndTime(null);
-      FlinkClusterWatcher.addFlinkCluster(flinkCluster);
+      FlinkClusterWatcher.addWatching(flinkCluster);
       updateById(flinkCluster);
     } catch (Exception e) {
       log.error(e.getMessage(), e);
-      flinkCluster.setAddress(null);
-      flinkCluster.setJobManagerUrl(null);
       flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
       flinkCluster.setException(e.toString());
       updateById(flinkCluster);
@@ -190,18 +190,31 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   }
 
   @Override
-  public void update(FlinkCluster cluster) {
-    FlinkCluster flinkCluster = getById(cluster.getId());
-    boolean success = validateQueueIfNeeded(flinkCluster, cluster);
+  public void update(FlinkCluster paramOfCluster) {
+    FlinkCluster flinkCluster = getById(paramOfCluster.getId());
+    boolean success = validateQueueIfNeeded(flinkCluster, paramOfCluster);
     ApiAlertException.throwIfFalse(
-        success, String.format(ERROR_CLUSTER_QUEUE_HINT, 
cluster.getYarnQueue()));
-    updateCluster(cluster, flinkCluster);
-    try {
-      updateById(flinkCluster);
-    } catch (Exception e) {
-      throw new ApiDetailException(
-          "Update cluster failed, Caused By: " + 
ExceptionUtils.getStackTrace(e));
+        success, String.format(ERROR_CLUSTER_QUEUE_HINT, 
paramOfCluster.getYarnQueue()));
+
+    flinkCluster.setClusterName(paramOfCluster.getClusterName());
+    flinkCluster.setDescription(paramOfCluster.getDescription());
+    if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
+      flinkCluster.setAddress(paramOfCluster.getAddress());
+    } else {
+      flinkCluster.setClusterId(paramOfCluster.getClusterId());
+      flinkCluster.setVersionId(paramOfCluster.getVersionId());
+      flinkCluster.setDynamicProperties(paramOfCluster.getDynamicProperties());
+      flinkCluster.setOptions(paramOfCluster.getOptions());
+      flinkCluster.setResolveOrder(paramOfCluster.getResolveOrder());
+      
flinkCluster.setK8sHadoopIntegration(paramOfCluster.getK8sHadoopIntegration());
+      flinkCluster.setK8sConf(paramOfCluster.getK8sConf());
+      flinkCluster.setK8sNamespace(paramOfCluster.getK8sNamespace());
+      
flinkCluster.setK8sRestExposedType(paramOfCluster.getK8sRestExposedType());
+      flinkCluster.setServiceAccount(paramOfCluster.getServiceAccount());
+      flinkCluster.setFlinkImage(paramOfCluster.getFlinkImage());
+      flinkCluster.setYarnQueue(paramOfCluster.getYarnQueue());
     }
+    updateById(flinkCluster);
   }
 
   @Override
@@ -224,10 +237,9 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       // 4) shutdown
       ShutDownResponse shutDownResponse = shutdownInternal(flinkCluster, 
clusterId);
       ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response 
failed");
-      flinkCluster.setAddress(null);
       flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
       flinkCluster.setEndTime(new Date());
-      FlinkClusterWatcher.removeFlinkCluster(flinkCluster);
+      FlinkClusterWatcher.unWatching(flinkCluster);
       updateById(flinkCluster);
     } catch (Exception e) {
       log.error(e.getMessage(), e);
@@ -267,6 +279,16 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
                         .collect(Collectors.toSet())));
   }
 
+  @Override
+  public void updateClusterFinalState(Long id, ClusterState state) {
+    LambdaUpdateWrapper<FlinkCluster> updateWrapper =
+        new LambdaUpdateWrapper<FlinkCluster>()
+            .eq(FlinkCluster::getId, id)
+            .set(FlinkCluster::getClusterState, state.getValue())
+            .set(FlinkCluster::getEndTime, new Date());
+    update(updateWrapper);
+  }
+
   @Override
   public void delete(FlinkCluster cluster) {
     Long id = cluster.getId();
@@ -276,7 +298,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())
         || 
ExecutionMode.isKubernetesSessionMode(flinkCluster.getExecutionMode())) {
       ApiAlertException.throwIfTrue(
-          ClusterState.isRunningState(flinkCluster.getClusterStateEnum()),
+          ClusterState.isRunning(flinkCluster.getClusterStateEnum()),
           "Flink cluster is running, cannot be delete, please check.");
     }
 
@@ -367,11 +389,9 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   private void checkActiveIfNeeded(FlinkCluster flinkCluster) {
     if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) {
       ApiAlertException.throwIfFalse(
-          ClusterState.isRunningState(flinkCluster.getClusterStateEnum()),
+          ClusterState.isRunning(flinkCluster.getClusterStateEnum()),
           "Current cluster is not active, please check!");
       if (!flinkCluster.verifyClusterConnection()) {
-        flinkCluster.setAddress(null);
-        flinkCluster.setJobManagerUrl(null);
         flinkCluster.setClusterState(ClusterState.LOST.getValue());
         updateById(flinkCluster);
         throw new ApiAlertException("Current cluster is not active, please 
check!");
@@ -379,30 +399,6 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     }
   }
 
-  private void updateCluster(FlinkCluster cluster, FlinkCluster flinkCluster) {
-    flinkCluster.setClusterName(cluster.getClusterName());
-    flinkCluster.setDescription(cluster.getDescription());
-    if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
-      flinkCluster.setAddress(cluster.getAddress());
-      flinkCluster.setJobManagerUrl(cluster.getAddress());
-    } else {
-      flinkCluster.setAddress(null);
-      flinkCluster.setJobManagerUrl(null);
-      flinkCluster.setClusterId(cluster.getClusterId());
-      flinkCluster.setVersionId(cluster.getVersionId());
-      flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
-      flinkCluster.setOptions(cluster.getOptions());
-      flinkCluster.setResolveOrder(cluster.getResolveOrder());
-      flinkCluster.setK8sHadoopIntegration(cluster.getK8sHadoopIntegration());
-      flinkCluster.setK8sConf(cluster.getK8sConf());
-      flinkCluster.setK8sNamespace(cluster.getK8sNamespace());
-      flinkCluster.setK8sRestExposedType(cluster.getK8sRestExposedType());
-      flinkCluster.setServiceAccount(cluster.getServiceAccount());
-      flinkCluster.setFlinkImage(cluster.getFlinkImage());
-      flinkCluster.setYarnQueue(cluster.getYarnQueue());
-    }
-  }
-
   @Nullable
   private KubernetesDeployParam getKubernetesDeployDesc(
       @Nonnull FlinkCluster flinkCluster, String action) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 7d2b53d43..e190b6e6a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -39,7 +39,7 @@ import org.apache.streampark.console.core.enums.ReleaseState;
 import org.apache.streampark.console.core.mapper.ProjectMapper;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.ProjectService;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkHttpWatcher;
 import org.apache.streampark.console.core.task.ProjectBuildTask;
 
 import org.apache.flink.configuration.MemorySize;
@@ -81,7 +81,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
 
   @Autowired private ApplicationService applicationService;
 
-  @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+  @Autowired private FlinkHttpWatcher flinkHttpWatcher;
 
   private final ExecutorService executorService =
       new ThreadPoolExecutor(
@@ -206,7 +206,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
               if (buildState == BuildState.SUCCESSFUL) {
                 baseMapper.updateBuildTime(id);
               }
-              flinkRESTAPIWatcher.init();
+              flinkHttpWatcher.init();
             },
             fileLogger -> {
               List<Application> applications =
@@ -221,7 +221,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
                     app.setBuild(true);
                     this.applicationService.updateRelease(app);
                   });
-              flinkRESTAPIWatcher.init();
+              flinkHttpWatcher.init();
             });
     CompletableFuture<Void> buildTask =
         CompletableFuture.runAsync(projectBuildTask, executorService);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 22be1d5e1..2ed5bb3b5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -43,7 +43,7 @@ import 
org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.SavePointService;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkHttpWatcher;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.SavepointResponse;
 import org.apache.streampark.flink.client.bean.TriggerSavepointRequest;
@@ -101,7 +101,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
 
   @Autowired private ApplicationLogService applicationLogService;
 
-  @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+  @Autowired private FlinkHttpWatcher flinkHttpWatcher;
 
   private final ExecutorService executorService =
       new ThreadPoolExecutor(
@@ -176,12 +176,12 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     applicationLog.setOptionTime(new Date());
     applicationLog.setYarnAppId(application.getClusterId());
 
-    FlinkRESTAPIWatcher.addSavepoint(application.getId());
+    FlinkHttpWatcher.addSavepoint(application.getId());
 
     application.setOptionState(OptionState.SAVEPOINTING.getValue());
     application.setOptionTime(new Date());
     this.applicationService.updateById(application);
-    flinkRESTAPIWatcher.init();
+    flinkHttpWatcher.init();
 
     FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
 
@@ -266,7 +266,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
               application.setOptionState(OptionState.NONE.getValue());
               application.setOptionTime(new Date());
               applicationService.update(application);
-              flinkRESTAPIWatcher.init();
+              flinkHttpWatcher.init();
             });
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index f96217b92..2c17f51f2 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -69,7 +69,7 @@ public class CheckpointProcessor {
 
   @Autowired private SavePointService savePointService;
 
-  @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+  @Autowired private FlinkHttpWatcher flinkHttpWatcher;
 
   public void process(Application application, @Nonnull CheckPoints 
checkPoints) {
     checkPoints.getLatestCheckpoint().forEach(checkPoint -> 
process(application, checkPoint));
@@ -85,7 +85,7 @@ public class CheckpointProcessor {
       if (shouldStoreAsSavepoint(checkPointKey, checkPoint)) {
         savepointedCache.put(checkPointKey.getSavePointId(), 
DEFAULT_FLAG_BYTE);
         saveSavepoint(checkPoint, application.getId());
-        flinkRESTAPIWatcher.cleanSavepoint(application);
+        flinkHttpWatcher.cleanSavepoint(application);
         return;
       }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 29131f651..ed545b0dd 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -35,14 +35,14 @@ import 
org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hc.client5.http.config.RequestConfig;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
 
 import java.time.Duration;
 import java.util.Date;
@@ -65,7 +65,7 @@ public class FlinkClusterWatcher {
 
   @Autowired private ApplicationService applicationService;
 
-  private Long lastWatcheringTime = 0L;
+  private Long lastWatchTime = 0L;
 
   // Track interval  every 30 seconds
   private static final Duration WATCHER_INTERVAL = Duration.ofSeconds(30);
@@ -73,6 +73,11 @@ public class FlinkClusterWatcher {
   /** Watcher cluster lists */
   private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new 
ConcurrentHashMap<>(8);
 
+  private static final Cache<Long, ClusterState> FAILED_STATES =
+      Caffeine.newBuilder().expireAfterWrite(WATCHER_INTERVAL).build();
+
+  private boolean immediateWatch = false;
+
   /** Thread pool for processing status monitoring for each cluster */
   private static final ExecutorService EXECUTOR =
       new ThreadPoolExecutor(
@@ -95,71 +100,33 @@ public class FlinkClusterWatcher {
     flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), 
cluster));
   }
 
-  /** flinkcluster persistent */
-  @PreDestroy
-  private void stop() {
-    // TODO: flinkcluster persistent
-  }
-
   @Scheduled(fixedDelay = 1000)
   private void start() {
-    if (System.currentTimeMillis() - lastWatcheringTime >= 
WATCHER_INTERVAL.toMillis()) {
-      lastWatcheringTime = System.currentTimeMillis();
-      watcher();
-    }
-  }
-
-  private void watcher() {
-    for (Map.Entry<Long, FlinkCluster> entry : WATCHER_CLUSTERS.entrySet()) {
-      EXECUTOR.execute(
-          () -> {
-            FlinkCluster flinkCluster = entry.getValue();
-            updateClusterState(flinkCluster);
-          });
-    }
-  }
-
-  private ClusterState updateClusterState(FlinkCluster flinkCluster) {
-    Integer clusterExecutionMode = flinkCluster.getExecutionMode();
-    if (!ExecutionMode.isKubernetesSessionMode(clusterExecutionMode)) {
-      ClusterState state = getClusterState(flinkCluster);
-      handleClusterState(flinkCluster, state);
-      return state;
-    } else {
-      // TODO: K8s Session status monitoring
-      return ClusterState.UNKNOWN;
-    }
-  }
-
-  public synchronized boolean verifyClusterValidByClusterId(Long clusterId) {
-    FlinkCluster flinkCluster = flinkClusterService.getById(clusterId);
-    ClusterState state = ClusterState.of(flinkCluster.getClusterState());
-    if (!ClusterState.isRunningState(state)) {
-      return false;
-    }
-    state = updateClusterState(flinkCluster);
-    if (!ClusterState.isRunningState(state)) {
-      return false;
+    Long timeMillis = System.currentTimeMillis();
+    if (immediateWatch || timeMillis - lastWatchTime >= 
WATCHER_INTERVAL.toMillis()) {
+      lastWatchTime = timeMillis;
+      immediateWatch = false;
+      WATCHER_CLUSTERS.forEach(
+          (aLong, flinkCluster) ->
+              EXECUTOR.execute(
+                  () -> {
+                    ClusterState state = getClusterState(flinkCluster);
+                    if (ClusterState.isFailed(state)) {
+                      
flinkClusterService.updateClusterFinalState(flinkCluster.getId(), state);
+                      unWatching(flinkCluster);
+                      alert(flinkCluster, state);
+                    }
+                  }));
     }
-    return true;
-  }
-
-  public boolean checkAlert(Long clusterId) {
-    FlinkCluster flinkCluster = flinkClusterService.getById(clusterId);
-    if (flinkCluster.getAlertId() == null) {
-      return false;
-    }
-    return true;
   }
 
   private void alert(FlinkCluster cluster, ClusterState state) {
-    if (!checkAlert(cluster.getId())) {
-      return;
+    if (cluster.getAlertId() != null) {
+      
cluster.setJobs(applicationService.countJobsByClusterId(cluster.getId()));
+      cluster.setClusterState(state.getValue());
+      cluster.setEndTime(new Date());
+      alertService.alert(cluster, state);
     }
-    
cluster.setJobs(applicationService.getAffectedJobsByClusterId(cluster.getId()));
-    cluster.setClusterState(state.getValue());
-    cluster.setEndTime(new Date());
-    alertService.alert(cluster, state);
   }
 
   /**
@@ -168,13 +135,29 @@ public class FlinkClusterWatcher {
    * @param flinkCluster
    * @return
    */
-  private ClusterState getClusterState(FlinkCluster flinkCluster) {
-    ClusterState state = getClusterStateFromFlinkAPI(flinkCluster);
-    if (ClusterState.isRunningState(state)) {
+  public ClusterState getClusterState(FlinkCluster flinkCluster) {
+    ClusterState state = FAILED_STATES.getIfPresent(flinkCluster.getId());
+    if (state != null) {
       return state;
+    }
+    switch (flinkCluster.getExecutionModeEnum()) {
+      case REMOTE:
+        state = httpRemoteClusterState(flinkCluster);
+        break;
+      case YARN_SESSION:
+        state = httpYarnSessionClusterState(flinkCluster);
+        break;
+      default:
+        state = ClusterState.UNKNOWN;
+        break;
+    }
+    if (ClusterState.isRunning(state)) {
+      FAILED_STATES.invalidate(flinkCluster.getId());
     } else {
-      return getClusterStateFromYarnAPI(flinkCluster);
+      immediateWatch = true;
+      FAILED_STATES.put(flinkCluster.getId(), state);
     }
+    return state;
   }
 
   /**
@@ -183,12 +166,12 @@ public class FlinkClusterWatcher {
    * @param flinkCluster
    * @return
    */
-  private ClusterState getClusterStateFromFlinkAPI(FlinkCluster flinkCluster) {
+  private ClusterState httpRemoteClusterState(FlinkCluster flinkCluster) {
     final String address = flinkCluster.getAddress();
-    final String jobManagerUrl = flinkCluster.getJobManagerUrl();
     if (StringUtils.isEmpty(address)) {
       return ClusterState.STOPPED;
     }
+    final String jobManagerUrl = flinkCluster.getJobManagerUrl();
     final String flinkUrl =
         StringUtils.isEmpty(jobManagerUrl)
             ? address.concat("/overview")
@@ -198,13 +181,12 @@ public class FlinkClusterWatcher {
           HttpClientUtils.httpGetRequest(
               flinkUrl,
               RequestConfig.custom().setConnectTimeout(5000, 
TimeUnit.MILLISECONDS).build());
-
       JacksonUtils.read(res, Overview.class);
       return ClusterState.RUNNING;
     } catch (Exception ignored) {
       log.error("cluster id:{} get state from flink api failed", 
flinkCluster.getId());
     }
-    return ClusterState.UNKNOWN;
+    return ClusterState.LOST;
   }
 
   /**
@@ -213,14 +195,7 @@ public class FlinkClusterWatcher {
    * @param flinkCluster
    * @return
    */
-  private ClusterState getClusterStateFromYarnAPI(FlinkCluster flinkCluster) {
-    if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
-      return ClusterState.LOST;
-    }
-    String clusterId = flinkCluster.getClusterId();
-    if (StringUtils.isEmpty(clusterId)) {
-      return ClusterState.STOPPED;
-    }
+  private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) {
     String yarnUrl = "ws/v1/cluster/apps/".concat(flinkCluster.getClusterId());
     try {
       String result = YarnUtils.restRequest(yarnUrl);
@@ -243,64 +218,27 @@ public class FlinkClusterWatcher {
   }
 
   /**
-   * process cluster state
+   * add flinkCluster to watching
    *
    * @param flinkCluster
-   * @param state
    */
-  private void handleClusterState(FlinkCluster flinkCluster, ClusterState 
state) {
-    LambdaUpdateWrapper<FlinkCluster> updateWrapper =
-        new LambdaUpdateWrapper<FlinkCluster>()
-            .eq(FlinkCluster::getId, flinkCluster.getId())
-            .set(FlinkCluster::getClusterState, state.getValue());
-    switch (state) {
-      case STOPPED:
-        {
-          updateWrapper
-              .set(FlinkCluster::getAddress, null)
-              .set(FlinkCluster::getJobManagerUrl, null)
-              .set(FlinkCluster::getEndTime, new Date());
-        }
-        // fall through
-      case LOST:
-      case UNKNOWN:
-        {
-          removeFlinkCluster(flinkCluster);
-          alert(flinkCluster, state);
-          break;
-        }
+  public static void addWatching(FlinkCluster flinkCluster) {
+    if (!WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
+      log.info("add the cluster with id:{} to watcher cluster cache", 
flinkCluster.getId());
+      WATCHER_CLUSTERS.put(flinkCluster.getId(), flinkCluster);
     }
-    flinkClusterService.update(updateWrapper);
   }
 
-  /**
-   * Add a cluster to cache
-   *
-   * @param flinkCluster
-   */
-  public static void addFlinkCluster(FlinkCluster flinkCluster) {
+  /** @param flinkCluster */
+  public static void unWatching(FlinkCluster flinkCluster) {
     if (WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
-      return;
+      log.info("remove the cluster with id:{} from watcher cluster cache", 
flinkCluster.getId());
+      WATCHER_CLUSTERS.remove(flinkCluster.getId());
     }
-    log.info("add the cluster with id:{} to watcher cluster cache", 
flinkCluster.getId());
-    WATCHER_CLUSTERS.put(flinkCluster.getId(), flinkCluster);
   }
 
   /**
-   * Remove a cluster from cache
-   *
-   * @param flinkCluster
-   */
-  public static void removeFlinkCluster(FlinkCluster flinkCluster) {
-    if (!WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
-      return;
-    }
-    log.info("remove the cluster with id:{} from watcher cluster cache", 
flinkCluster.getId());
-    WATCHER_CLUSTERS.remove(flinkCluster.getId());
-  }
-
-  /**
-   * string conver final application status
+   * string converse final application status
    *
    * @param value
    * @return
@@ -321,11 +259,9 @@ public class FlinkClusterWatcher {
    * @return
    */
   private ClusterState 
finalApplicationStatusConvertClusterState(FinalApplicationStatus status) {
-    switch (status) {
-      case UNDEFINED:
-        return ClusterState.RUNNING;
-      default:
-        return ClusterState.STOPPED;
+    if (status == FinalApplicationStatus.UNDEFINED) {
+      return ClusterState.RUNNING;
     }
+    return ClusterState.STOPPED;
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
similarity index 88%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
index 737cc1bc6..f6ed5c96c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.task;
 
+import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.util.HttpClientUtils;
 import org.apache.streampark.common.util.ThreadUtils;
@@ -53,6 +54,7 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
@@ -68,7 +70,7 @@ import java.util.stream.Collectors;
 /** This implementation is currently used for tracing flink job on 
yarn,standalone,remote mode */
 @Slf4j
 @Component
-public class FlinkRESTAPIWatcher {
+public class FlinkHttpWatcher {
 
   @Autowired private ApplicationService applicationService;
 
@@ -83,9 +85,10 @@ public class FlinkRESTAPIWatcher {
   @Autowired private FlinkClusterWatcher flinkClusterWatcher;
 
   // track interval  every 5 seconds
-  private static final long WATCHING_INTERVAL = 1000L * 5;
+  private static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
+
   // option interval within 10 seconds
-  private static final long OPTION_INTERVAL = 1000L * 10;
+  private static final Duration OPTION_INTERVAL = Duration.ofSeconds(10);
 
   /**
    *
@@ -138,7 +141,7 @@ public class FlinkRESTAPIWatcher {
 
   private static final Map<Long, OptionState> OPTIONING = new 
ConcurrentHashMap<>(0);
 
-  private Long lastWatchingTime = 0L;
+  private Long lastWatchTime = 0L;
 
   private Long lastOptionTime = 0L;
 
@@ -161,13 +164,17 @@ public class FlinkRESTAPIWatcher {
             new LambdaQueryWrapper<Application>()
                 .eq(Application::getTracking, 1)
                 .notIn(Application::getExecutionMode, 
ExecutionMode.getKubernetesMode()));
-    applications.forEach((app) -> WATCHING_APPS.put(app.getId(), app));
+    applications.forEach(
+        (app) -> {
+          WATCHING_APPS.put(app.getId(), app);
+          STARTING_CACHE.put(app.getId(), DEFAULT_FLAG_BYTE);
+        });
   }
 
   @PreDestroy
   public void doStop() {
     log.info(
-        "FlinkRESTAPIWatcher StreamPark Console will be shutdown,persistent 
application to database.");
+        "FlinkHttpWatcher StreamPark Console will be shutdown,persistent 
application to database.");
     WATCHING_APPS.forEach((k, v) -> applicationService.persistMetrics(v));
   }
 
@@ -181,35 +188,22 @@ public class FlinkRESTAPIWatcher {
    */
   @Scheduled(fixedDelay = 1000)
   public void start() {
-    // The application has been started at the first time, or the front-end is 
operating start/stop,
-    // need to return status info immediately.
-    if (lastWatchingTime == null || !OPTIONING.isEmpty()) {
-      doWatch();
-    } else if (System.currentTimeMillis() - lastOptionTime <= OPTION_INTERVAL) 
{
-      // The last operation time is less than option interval.(10 seconds)
-      doWatch();
-    } else if (System.currentTimeMillis() - lastWatchingTime >= 
WATCHING_INTERVAL) {
-      // Normal information obtain, check if there is 5 seconds interval 
between this time and the
-      // last time.(once every 5 seconds)
-      doWatch();
+    Long timeMillis = System.currentTimeMillis();
+    if (lastWatchTime == null
+        || !OPTIONING.isEmpty()
+        || timeMillis - lastOptionTime <= OPTION_INTERVAL.toMillis()
+        || timeMillis - lastWatchTime >= WATCHING_INTERVAL.toMillis()) {
+      lastWatchTime = timeMillis;
+      WATCHING_APPS.forEach(this::watch);
     }
   }
 
-  private void doWatch() {
-    lastWatchingTime = System.currentTimeMillis();
-    for (Map.Entry<Long, Application> entry : WATCHING_APPS.entrySet()) {
-      watch(entry.getKey(), entry.getValue());
-    }
-  }
-
-  private void watch(Long key, Application application) {
+  private void watch(Long id, Application application) {
     EXECUTOR.execute(
         () -> {
           final StopFrom stopFrom =
-              STOP_FROM_MAP.getOrDefault(key, null) == null
-                  ? StopFrom.NONE
-                  : STOP_FROM_MAP.get(key);
-          final OptionState optionState = OPTIONING.get(key);
+              STOP_FROM_MAP.getOrDefault(id, null) == null ? StopFrom.NONE : 
STOP_FROM_MAP.get(id);
+          final OptionState optionState = OPTIONING.get(id);
           try {
             // query status from flink rest api
             getFromFlinkRestApi(application, stopFrom);
@@ -226,11 +220,11 @@ public class FlinkRESTAPIWatcher {
                 // non-mapping
                 if (application.getState() != 
FlinkAppState.MAPPING.getValue()) {
                   log.error(
-                      "FlinkRESTAPIWatcher getFromFlinkRestApi and 
getFromYarnRestApi error,job failed,savePoint expired!");
+                      "FlinkHttpWatcher getFromFlinkRestApi and 
getFromYarnRestApi error,job failed,savePoint expired!");
                   if (StopFrom.NONE.equals(stopFrom)) {
                     savePointService.expire(application.getId());
                     application.setState(FlinkAppState.LOST.getValue());
-                    alert(application, FlinkAppState.LOST);
+                    doAlert(application, FlinkAppState.LOST);
                   } else {
                     application.setState(FlinkAppState.CANCELED.getValue());
                   }
@@ -242,11 +236,11 @@ public class FlinkRESTAPIWatcher {
                 */
                 application.setEndTime(new Date());
                 cleanSavepoint(application);
-                cleanOptioning(optionState, key);
+                cleanOptioning(optionState, id);
                 doPersistMetrics(application, true);
                 FlinkAppState appState = 
FlinkAppState.of(application.getState());
                 if (appState.equals(FlinkAppState.FAILED) || 
appState.equals(FlinkAppState.LOST)) {
-                  alert(application, FlinkAppState.of(application.getState()));
+                  doAlert(application, 
FlinkAppState.of(application.getState()));
                   if (appState.equals(FlinkAppState.FAILED)) {
                     try {
                       applicationService.start(application, true);
@@ -341,13 +335,13 @@ public class FlinkRESTAPIWatcher {
 
     // get overview info at the first start time
     if (STARTING_CACHE.getIfPresent(application.getId()) != null) {
+      STARTING_CACHE.invalidate(application.getId());
       Overview override = httpOverview(application);
       if (override != null && override.getSlotsTotal() > 0) {
         application.setTotalTM(override.getTaskmanagers());
         application.setTotalSlot(override.getSlotsTotal());
         application.setAvailableSlot(override.getSlotsAvailable());
       }
-      STARTING_CACHE.invalidate(application.getId());
     }
   }
 
@@ -447,18 +441,18 @@ public class FlinkRESTAPIWatcher {
         break;
       case CANCELED:
         log.info(
-            "FlinkRESTAPIWatcher getFromFlinkRestApi, job state {}, stop 
tracking and delete stopFrom!",
+            "FlinkHttpWatcher getFromFlinkRestApi, job state {}, stop tracking 
and delete stopFrom!",
             currentState.name());
         cleanSavepoint(application);
         application.setState(currentState.getValue());
         if (StopFrom.NONE.equals(stopFrom) || 
applicationService.checkAlter(application)) {
           if (StopFrom.NONE.equals(stopFrom)) {
             log.info(
-                "FlinkRESTAPIWatcher getFromFlinkRestApi, job cancel is not 
form StreamPark,savePoint expired!");
+                "FlinkHttpWatcher getFromFlinkRestApi, job cancel is not form 
StreamPark,savePoint expired!");
             savePointService.expire(application.getId());
           }
           stopCanceledJob(application.getId());
-          alert(application, FlinkAppState.CANCELED);
+          doAlert(application, FlinkAppState.CANCELED);
         }
         STOP_FROM_MAP.remove(application.getId());
         doPersistMetrics(application, true);
@@ -469,12 +463,12 @@ public class FlinkRESTAPIWatcher {
         STOP_FROM_MAP.remove(application.getId());
         application.setState(FlinkAppState.FAILED.getValue());
         doPersistMetrics(application, true);
-        alert(application, FlinkAppState.FAILED);
+        doAlert(application, FlinkAppState.FAILED);
         applicationService.start(application, true);
         break;
       case RESTARTING:
         log.info(
-            "FlinkRESTAPIWatcher getFromFlinkRestApi, job state {},add to 
starting",
+            "FlinkHttpWatcher getFromFlinkRestApi, job state {},add to 
starting",
             currentState.name());
         STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
         break;
@@ -492,7 +486,7 @@ public class FlinkRESTAPIWatcher {
    * @param stopFrom stopFrom
    */
   private void getFromYarnRestApi(Application application, StopFrom stopFrom) 
throws Exception {
-    log.debug("FlinkRESTAPIWatcher getFromYarnRestApi starting...");
+    log.debug("FlinkHttpWatcher getFromYarnRestApi starting...");
     OptionState optionState = OPTIONING.get(application.getId());
 
     /*
@@ -502,10 +496,10 @@ public class FlinkRESTAPIWatcher {
     */
     Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
     if (flag != null) {
-      log.info("FlinkRESTAPIWatcher previous state: canceling.");
+      log.info("FlinkHttpWatcher previous state: canceling.");
       if (StopFrom.NONE.equals(stopFrom)) {
         log.error(
-            "FlinkRESTAPIWatcher query previous state was canceling and 
stopFrom NotFound,savePoint expired!");
+            "FlinkHttpWatcher query previous state was canceling and stopFrom 
NotFound,savePoint expired!");
         savePointService.expire(application.getId());
       }
       application.setState(FlinkAppState.CANCELED.getValue());
@@ -517,7 +511,7 @@ public class FlinkRESTAPIWatcher {
       YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
       if (yarnAppInfo == null) {
         if (!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
-          throw new RuntimeException("FlinkRESTAPIWatcher getFromYarnRestApi 
failed ");
+          throw new RuntimeException("FlinkHttpWatcher getFromYarnRestApi 
failed ");
         }
       } else {
         try {
@@ -529,7 +523,7 @@ public class FlinkRESTAPIWatcher {
           if (FlinkAppState.KILLED.equals(flinkAppState)) {
             if (StopFrom.NONE.equals(stopFrom)) {
               log.error(
-                  "FlinkRESTAPIWatcher getFromYarnRestApi,job was killed and 
stopFrom NotFound,savePoint expired!");
+                  "FlinkHttpWatcher getFromYarnRestApi,job was killed and 
stopFrom NotFound,savePoint expired!");
               savePointService.expire(application.getId());
             }
             flinkAppState = FlinkAppState.CANCELED;
@@ -546,7 +540,7 @@ public class FlinkRESTAPIWatcher {
               || flinkAppState.equals(FlinkAppState.LOST)
               || (flinkAppState.equals(FlinkAppState.CANCELED) && 
StopFrom.NONE.equals(stopFrom))
               || applicationService.checkAlter(application)) {
-            alert(application, flinkAppState);
+            doAlert(application, flinkAppState);
             stopCanceledJob(application.getId());
             if (flinkAppState.equals(FlinkAppState.FAILED)) {
               applicationService.start(application, true);
@@ -554,7 +548,7 @@ public class FlinkRESTAPIWatcher {
           }
         } catch (Exception e) {
           if 
(!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
-            throw new RuntimeException("FlinkRESTAPIWatcher getFromYarnRestApi 
error,", e);
+            throw new RuntimeException("FlinkHttpWatcher getFromYarnRestApi 
error,", e);
           }
         }
       }
@@ -578,7 +572,7 @@ public class FlinkRESTAPIWatcher {
     if (isKubernetesApp(appId)) {
       return;
     }
-    log.info("FlinkRESTAPIWatcher setOptioning");
+    log.info("FlinkHttpWatcher setOptioning");
     OPTIONING.put(appId, state);
     if (state.equals(OptionState.CANCELLING)) {
       STOP_FROM_MAP.put(appId, StopFrom.STREAMPARK);
@@ -589,7 +583,7 @@ public class FlinkRESTAPIWatcher {
     if (isKubernetesApp(application)) {
       return;
     }
-    log.info("FlinkRESTAPIWatcher add app to tracking,appId:{}", 
application.getId());
+    log.info("FlinkHttpWatcher add app to tracking,appId:{}", 
application.getId());
     WATCHING_APPS.put(application.getId(), application);
     STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
   }
@@ -598,7 +592,7 @@ public class FlinkRESTAPIWatcher {
     if (isKubernetesApp(appId)) {
       return;
     }
-    log.info("FlinkRESTAPIWatcher add app to savepoint,appId:{}", appId);
+    log.info("FlinkHttpWatcher add app to savepoint,appId:{}", appId);
     SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
   }
 
@@ -606,7 +600,7 @@ public class FlinkRESTAPIWatcher {
     if (isKubernetesApp(appId)) {
       return;
     }
-    log.info("FlinkRESTAPIWatcher stop app,appId:{}", appId);
+    log.info("FlinkHttpWatcher stop app,appId:{}", appId);
     WATCHING_APPS.remove(appId);
   }
 
@@ -781,19 +775,26 @@ public class FlinkRESTAPIWatcher {
    * alarm; If the abnormal behavior of the job is caused by itself and the 
flink cluster is running
    * normally, the job will an alarm
    */
-  private void alert(Application app, FlinkAppState appState) {
-    if (ExecutionMode.isYarnPerJobOrAppMode(app.getExecutionModeEnum())
-        || !flinkClusterWatcher.checkAlert(app.getFlinkClusterId())) {
-      alertService.alert(app, appState);
-      return;
-    }
-    boolean isValid = 
flinkClusterWatcher.verifyClusterValidByClusterId(app.getFlinkClusterId());
-    if (isValid) {
-      log.info(
-          "application with id {} is yarn session or remote and flink cluster 
with id {} is alive, application send alert",
-          app.getId(),
-          app.getFlinkClusterId());
-      alertService.alert(app, appState);
+  private void doAlert(Application app, FlinkAppState appState) {
+    switch (app.getExecutionModeEnum()) {
+      case YARN_APPLICATION:
+      case YARN_PER_JOB:
+        alertService.alert(app, appState);
+        return;
+      case YARN_SESSION:
+      case REMOTE:
+        FlinkCluster flinkCluster = 
flinkClusterService.getById(app.getFlinkClusterId());
+        ClusterState clusterState = 
flinkClusterWatcher.getClusterState(flinkCluster);
+        if (ClusterState.isRunning(clusterState)) {
+          log.info(
+              "application with id {} is yarn session or remote and flink 
cluster with id {} is alive, application send alert",
+              app.getId(),
+              app.getFlinkClusterId());
+          alertService.alert(app, appState);
+        }
+        break;
+      default:
+        break;
     }
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index cc0173387..04005c724 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -133,7 +133,7 @@
              limit 1
     </select>
 
-    <select id="getAffectedJobsByClusterId" resultType="java.lang.Integer" 
parameterType="java.lang.Long">
+    <select id="countJobsByClusterId" resultType="java.lang.Integer" 
parameterType="java.lang.Long">
         select
             count(1)
         from t_flink_app

Reply via email to