This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch improve
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit a53e4867347df6224993c7feb08078ebc9c851f5
Author: benjobs <[email protected]>
AuthorDate: Sun Sep 17 09:51:02 2023 +0800

    [Improve] application state improvement
---
 .../console/core/annotation/AppUpdated.java        |  5 +--
 .../console/core/aspect/StreamParkAspect.java      |  6 ++--
 .../console/core/bean/AlertProbeMsg.java           | 29 ++++++++-------
 .../console/core/entity/Application.java           | 24 ++++---------
 .../impl/ApplicationActionServiceImpl.java         | 18 +++++-----
 .../impl/ApplicationInfoServiceImpl.java           | 15 ++++----
 .../impl/ApplicationManageServiceImpl.java         |  6 ++--
 .../core/service/impl/AppBuildPipeServiceImpl.java | 12 +++----
 .../core/service/impl/ProjectServiceImpl.java      |  8 ++---
 .../core/service/impl/SavePointServiceImpl.java    | 10 +++---
 ...nkHttpWatcher.java => FlinkAppHttpWatcher.java} | 42 +++++++++++-----------
 ...thProbingTask.java => FlinkAppLostWatcher.java} | 40 +++++++++------------
 ...rocessor.java => FlinkCheckpointProcessor.java} |  6 ++--
 .../core/task/FlinkK8sChangeEventListener.java     |  6 ++--
 .../console/core/task/FlinkK8sWatcherWrapper.java  |  2 +-
 .../resources/mapper/core/ApplicationMapper.xml    |  6 ++--
 .../service/ApplicationManageServiceITest.java     |  8 ++---
 17 files changed, 114 insertions(+), 129 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
index 97c77a79d..cfb2a5563 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
@@ -17,6 +17,8 @@
 
 package org.apache.streampark.console.core.annotation;
 
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
+
 import org.aspectj.lang.ProceedingJoinPoint;
 
 import java.lang.annotation.ElementType;
@@ -29,8 +31,7 @@ import java.lang.annotation.Target;
  * application state update, need to add this annotation, This annotation 
marks which methods will
  * cause the application to be updated, Will work together with {@link
  * 
org.apache.streampark.console.core.aspect.StreamParkAspect#appUpdated(ProceedingJoinPoint)},
 The
- * final purpose will be refresh {@link
- * org.apache.streampark.console.core.task.FlinkHttpWatcher#WATCHING_APPS}, 
Make the state of the
+ * final purpose will be refresh {@link FlinkAppHttpWatcher#WATCHING_APPS}, 
Make the state of the
  * job consistent with the database
  */
 @Target(ElementType.METHOD)
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
index 6d14119d5..a15583333 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
@@ -28,7 +28,7 @@ import 
org.apache.streampark.console.core.enums.PermissionType;
 import org.apache.streampark.console.core.enums.UserType;
 import org.apache.streampark.console.core.service.CommonService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
-import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
 import org.apache.streampark.console.system.entity.AccessToken;
 import org.apache.streampark.console.system.entity.Member;
 import org.apache.streampark.console.system.entity.User;
@@ -58,7 +58,7 @@ import java.util.Objects;
 @Aspect
 public class StreamParkAspect {
 
-  @Autowired private FlinkHttpWatcher flinkHttpWatcher;
+  @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
   @Autowired private CommonService commonService;
   @Autowired private MemberService memberService;
   @Autowired private ApplicationManageService applicationManageService;
@@ -93,7 +93,7 @@ public class StreamParkAspect {
     MethodSignature methodSignature = (MethodSignature) 
joinPoint.getSignature();
     log.debug("appUpdated aspect, method:{}", methodSignature.getName());
     Object target = joinPoint.proceed();
-    flinkHttpWatcher.init();
+    flinkAppHttpWatcher.init();
     return target;
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java
index d42537746..f9bb31604 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertProbeMsg.java
@@ -17,6 +17,8 @@
 
 package org.apache.streampark.console.core.bean;
 
+import org.apache.streampark.console.core.enums.FlinkAppState;
+
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
@@ -38,19 +40,20 @@ public class AlertProbeMsg {
 
   private Integer cancelledJobs = 0;
 
-  public void incrementProbeJobs() {
+  public void compute(FlinkAppState state) {
     this.probeJobs++;
-  }
-
-  public void incrementFailedJobs() {
-    this.failedJobs++;
-  }
-
-  public void incrementLostJobs() {
-    this.lostJobs++;
-  }
-
-  public void incrementCancelledJobs() {
-    this.cancelledJobs++;
+    switch (state) {
+      case LOST:
+        this.lostJobs++;
+        break;
+      case FAILED:
+        failedJobs++;
+        break;
+      case CANCELED:
+        cancelledJobs++;
+        break;
+      default:
+        break;
+    }
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index f0b283be1..98fd8dd53 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -49,8 +49,6 @@ import lombok.Data;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
-import javax.annotation.Nonnull;
-
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.Date;
@@ -59,8 +57,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
-import static org.apache.streampark.console.core.enums.FlinkAppState.of;
-
 @Data
 @TableName("t_flink_app")
 @Slf4j
@@ -275,8 +271,7 @@ public class Application implements Serializable {
 
   public void setState(Integer state) {
     this.state = state;
-    FlinkAppState appState = of(this.state);
-    this.tracking = shouldTracking(appState);
+    this.tracking = shouldTracking() ? 1 : 0;
   }
 
   public void setYarnQueueByHotParams() {
@@ -301,8 +296,8 @@ public class Application implements Serializable {
    *
    * @return 1: need to be tracked | 0: no need to be tracked.
    */
-  public static Integer shouldTracking(@Nonnull FlinkAppState state) {
-    switch (state) {
+  public Boolean shouldTracking() {
+    switch (getStateEnum()) {
       case ADDED:
       case CREATED:
       case FINISHED:
@@ -310,9 +305,9 @@ public class Application implements Serializable {
       case CANCELED:
       case TERMINATED:
       case POS_TERMINATED:
-        return 0;
+        return false;
       default:
-        return 1;
+        return true;
     }
   }
 
@@ -322,8 +317,7 @@ public class Application implements Serializable {
    * @return true: can start | false: can not start.
    */
   public boolean isCanBeStart() {
-    FlinkAppState state = FlinkAppState.of(getState());
-    switch (state) {
+    switch (getStateEnum()) {
       case ADDED:
       case CREATED:
       case FAILED:
@@ -340,10 +334,6 @@ public class Application implements Serializable {
     }
   }
 
-  public boolean shouldBeTrack() {
-    return shouldTracking(FlinkAppState.of(getState())) == 1;
-  }
-
   @JsonIgnore
   public ReleaseState getReleaseState() {
     return ReleaseState.of(release);
@@ -355,7 +345,7 @@ public class Application implements Serializable {
   }
 
   @JsonIgnore
-  public FlinkAppState getFlinkAppStateEnum() {
+  public FlinkAppState getStateEnum() {
     return FlinkAppState.of(state);
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 451ffc453..728f3b7f4 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -65,7 +65,7 @@ import 
org.apache.streampark.console.core.service.VariableService;
 import 
org.apache.streampark.console.core.service.application.ApplicationActionService;
 import 
org.apache.streampark.console.core.service.application.ApplicationInfoService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
-import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
 import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.CancelRequest;
@@ -227,7 +227,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
 
   @Override
   public void cancel(Application appParam) throws Exception {
-    FlinkHttpWatcher.setOptionState(appParam.getId(), OptionState.CANCELLING);
+    FlinkAppHttpWatcher.setOptionState(appParam.getId(), 
OptionState.CANCELLING);
     Application application = getById(appParam.getId());
     application.setState(FlinkAppState.CANCELLING.getValue());
 
@@ -239,7 +239,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
     applicationLog.setYarnAppId(application.getClusterId());
 
     if (appParam.getSavePointed()) {
-      FlinkHttpWatcher.addSavepoint(application.getId());
+      FlinkAppHttpWatcher.addSavepoint(application.getId());
       application.setOptionState(OptionState.SAVEPOINTING.getValue());
     } else {
       application.setOptionState(OptionState.CANCELLING.getValue());
@@ -250,7 +250,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
 
     Long userId = commonService.getUserId();
     if (!application.getUserId().equals(userId)) {
-      FlinkHttpWatcher.addCanceledApp(application.getId(), userId);
+      FlinkAppHttpWatcher.addCanceledApp(application.getId(), userId);
     }
 
     FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
@@ -357,7 +357,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
                   k8SFlinkTrackMonitor.unWatching(id);
                   k8SFlinkTrackMonitor.doWatching(id);
                 } else {
-                  FlinkHttpWatcher.unWatching(application.getId());
+                  FlinkAppHttpWatcher.unWatching(application.getId());
                 }
 
                 String exception = Utils.stringifyException(e);
@@ -508,8 +508,8 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
               if (isKubernetesApp(application)) {
                 k8SFlinkTrackMonitor.doWatching(toTrackId(application));
               } else {
-                FlinkHttpWatcher.setOptionState(appParam.getId(), 
OptionState.STARTING);
-                FlinkHttpWatcher.doWatching(application);
+                FlinkAppHttpWatcher.setOptionState(appParam.getId(), 
OptionState.STARTING);
+                FlinkAppHttpWatcher.doWatching(application);
               }
 
               applicationLog.setSuccess(true);
@@ -530,7 +530,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
                 if (isKubernetesApp(app)) {
                   k8SFlinkTrackMonitor.unWatching(toTrackId(app));
                 } else {
-                  FlinkHttpWatcher.unWatching(appParam.getId());
+                  FlinkAppHttpWatcher.unWatching(appParam.getId());
                 }
               }
             })
@@ -750,7 +750,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
       k8SFlinkTrackMonitor.unWatching(id);
       k8SFlinkTrackMonitor.doWatching(id);
     } else {
-      FlinkHttpWatcher.unWatching(application.getId());
+      FlinkAppHttpWatcher.unWatching(application.getId());
     }
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index a292cde75..ca7ad1e04 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -40,8 +40,8 @@ import 
org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.SavePointService;
 import 
org.apache.streampark.console.core.service.application.ApplicationInfoService;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
 import org.apache.streampark.console.core.task.FlinkClusterWatcher;
-import org.apache.streampark.console.core.task.FlinkHttpWatcher;
 import org.apache.streampark.console.core.task.FlinkK8sObserverStub;
 import org.apache.streampark.flink.core.conf.ParameterCli;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
@@ -115,7 +115,7 @@ public class ApplicationInfoServiceImpl extends 
ServiceImpl<ApplicationMapper, A
     Integer runningJob = 0;
 
     // stat metrics from other than kubernetes mode
-    for (Application app : FlinkHttpWatcher.getWatchingApps()) {
+    for (Application app : FlinkAppHttpWatcher.getWatchingApps()) {
       if (!teamId.equals(app.getTeamId())) {
         continue;
       }
@@ -220,11 +220,10 @@ public class ApplicationInfoServiceImpl extends 
ServiceImpl<ApplicationMapper, A
   @Override
   public boolean checkAlter(Application appParam) {
     Long appId = appParam.getId();
-    FlinkAppState state = FlinkAppState.of(appParam.getState());
-    if (!FlinkAppState.CANCELED.equals(state)) {
+    if (FlinkAppState.CANCELED != appParam.getStateEnum()) {
       return false;
     }
-    long cancelUserId = FlinkHttpWatcher.getCanceledJobUserId(appId);
+    long cancelUserId = FlinkAppHttpWatcher.getCanceledJobUserId(appId);
     long appUserId = appParam.getUserId();
     return cancelUserId != -1 && cancelUserId != appUserId;
   }
@@ -244,11 +243,11 @@ public class ApplicationInfoServiceImpl extends 
ServiceImpl<ApplicationMapper, A
   @Override
   public boolean existsRunningByClusterId(Long clusterId) {
     return baseMapper.existsRunningJobByClusterId(clusterId)
-        || FlinkHttpWatcher.getWatchingApps().stream()
+        || FlinkAppHttpWatcher.getWatchingApps().stream()
             .anyMatch(
                 application ->
                     clusterId.equals(application.getFlinkClusterId())
-                        && 
FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum()));
+                        && 
FlinkAppState.RUNNING.equals(application.getStateEnum()));
   }
 
   @Override
@@ -467,7 +466,7 @@ public class ApplicationInfoServiceImpl extends 
ServiceImpl<ApplicationMapper, A
         flinkK8sObserver.watchApplication(application);
       }
     } else {
-      FlinkHttpWatcher.doWatching(application);
+      FlinkAppHttpWatcher.doWatching(application);
     }
     return mapping;
   }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index e89b16117..2dc38afb2 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -53,7 +53,7 @@ import 
org.apache.streampark.console.core.service.SavePointService;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.YarnQueueService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
-import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
 import org.apache.streampark.console.core.task.FlinkK8sObserverStub;
 import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
@@ -189,7 +189,7 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
         flinkK8sObserver.unWatchById(application.getId());
       }
     } else {
-      FlinkHttpWatcher.unWatching(appParam.getId());
+      FlinkAppHttpWatcher.unWatching(appParam.getId());
     }
     return true;
   }
@@ -264,7 +264,7 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
                                       .getCode()
                                       .equals(record.getBuildStatus()))
                           .setAllowStart(
-                              !record.shouldBeTrack()
+                              !record.shouldTracking()
                                   && PipelineStatus.success
                                       .getCode()
                                       .equals(record.getBuildStatus()))
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 147137800..a1fb0b4e8 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -59,7 +59,7 @@ import 
org.apache.streampark.console.core.service.SettingService;
 import 
org.apache.streampark.console.core.service.application.ApplicationActionService;
 import 
org.apache.streampark.console.core.service.application.ApplicationInfoService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
-import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
 import org.apache.streampark.flink.packer.docker.DockerConf;
 import org.apache.streampark.flink.packer.maven.Artifact;
 import org.apache.streampark.flink.packer.maven.DependencyInfo;
@@ -141,7 +141,7 @@ public class AppBuildPipeServiceImpl
 
   @Autowired private ApplicationLogService applicationLogService;
 
-  @Autowired private FlinkHttpWatcher flinkHttpWatcher;
+  @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
 
   @Autowired private ApplicationConfigService applicationConfigService;
 
@@ -226,8 +226,8 @@ public class AppBuildPipeServiceImpl
             app.setRelease(ReleaseState.RELEASING.get());
             applicationManageService.updateRelease(app);
 
-            if (flinkHttpWatcher.isWatchingApp(app.getId())) {
-              flinkHttpWatcher.init();
+            if (flinkAppHttpWatcher.isWatchingApp(app.getId())) {
+              flinkAppHttpWatcher.init();
             }
 
             // 1) checkEnv
@@ -358,8 +358,8 @@ public class AppBuildPipeServiceImpl
             }
             applicationManageService.updateRelease(app);
             applicationLogService.save(applicationLog);
-            if (flinkHttpWatcher.isWatchingApp(app.getId())) {
-              flinkHttpWatcher.init();
+            if (flinkAppHttpWatcher.isWatchingApp(app.getId())) {
+              flinkAppHttpWatcher.init();
             }
           }
         });
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index c14b66b5e..254818045 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -38,7 +38,7 @@ import org.apache.streampark.console.core.enums.ReleaseState;
 import org.apache.streampark.console.core.mapper.ProjectMapper;
 import org.apache.streampark.console.core.service.ProjectService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
-import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
 import org.apache.streampark.console.core.task.ProjectBuildTask;
 
 import org.apache.flink.configuration.MemorySize;
@@ -82,7 +82,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
 
   @Autowired private ApplicationManageService applicationManageService;
 
-  @Autowired private FlinkHttpWatcher flinkHttpWatcher;
+  @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
 
   private final ExecutorService executorService =
       new ThreadPoolExecutor(
@@ -207,7 +207,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
               if (buildState == BuildState.SUCCESSFUL) {
                 baseMapper.updateBuildTime(id);
               }
-              flinkHttpWatcher.init();
+              flinkAppHttpWatcher.init();
             },
             fileLogger -> {
               List<Application> applications =
@@ -222,7 +222,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
                     app.setBuild(true);
                     this.applicationManageService.updateRelease(app);
                   });
-              flinkHttpWatcher.init();
+              flinkAppHttpWatcher.init();
             });
     CompletableFuture<Void> buildTask =
         CompletableFuture.runAsync(projectBuildTask, executorService);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 766d5a74f..16d09a246 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -43,7 +43,7 @@ import 
org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.SavePointService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
-import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.SavepointResponse;
 import org.apache.streampark.flink.client.bean.TriggerSavepointRequest;
@@ -101,7 +101,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
 
   @Autowired private ApplicationLogService applicationLogService;
 
-  @Autowired private FlinkHttpWatcher flinkHttpWatcher;
+  @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
 
   private final ExecutorService executorService =
       new ThreadPoolExecutor(
@@ -176,12 +176,12 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     applicationLog.setOptionTime(new Date());
     applicationLog.setYarnAppId(application.getClusterId());
 
-    FlinkHttpWatcher.addSavepoint(application.getId());
+    FlinkAppHttpWatcher.addSavepoint(application.getId());
 
     application.setOptionState(OptionState.SAVEPOINTING.getValue());
     application.setOptionTime(new Date());
     this.applicationManageService.updateById(application);
-    flinkHttpWatcher.init();
+    flinkAppHttpWatcher.init();
 
     FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
 
@@ -266,7 +266,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
               application.setOptionState(OptionState.NONE.getValue());
               application.setOptionTime(new Date());
               applicationManageService.update(application);
-              flinkHttpWatcher.init();
+              flinkAppHttpWatcher.init();
             });
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
similarity index 94%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index d61018564..ce60a1919 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -75,7 +75,7 @@ import java.util.stream.Collectors;
 /** This implementation is currently used for tracing flink job on 
yarn,standalone,remote mode */
 @Slf4j
 @Component
-public class FlinkHttpWatcher {
+public class FlinkAppHttpWatcher {
 
   @Autowired private ApplicationManageService applicationManageService;
   @Autowired private ApplicationActionService applicationActionService;
@@ -83,7 +83,7 @@ public class FlinkHttpWatcher {
 
   @Autowired private AlertService alertService;
 
-  @Autowired private CheckpointProcessor checkpointProcessor;
+  @Autowired private FlinkCheckpointProcessor checkpointProcessor;
 
   @Autowired private FlinkClusterService flinkClusterService;
 
@@ -182,7 +182,7 @@ public class FlinkHttpWatcher {
   @PreDestroy
   public void doStop() {
     log.info(
-        "FlinkHttpWatcher StreamPark Console will be shutdown,persistent 
application to database.");
+        "FlinkAppHttpWatcher StreamPark Console will be shutdown,persistent 
application to database.");
     WATCHING_APPS.forEach((k, v) -> applicationInfoService.persistMetrics(v));
   }
 
@@ -209,7 +209,7 @@ public class FlinkHttpWatcher {
   @VisibleForTesting
   public @Nullable FlinkAppState tryQueryFlinkAppState(@Nonnull Long appId) {
     Application app = WATCHING_APPS.get(appId);
-    return (app == null || app.getState() == null) ? null : 
FlinkAppState.of(app.getState());
+    return (app == null || app.getState() == null) ? null : app.getStateEnum();
   }
 
   private void watch(Long id, Application application) {
@@ -234,7 +234,7 @@ public class FlinkHttpWatcher {
                 // non-mapping
                 if (application.getState() != 
FlinkAppState.MAPPING.getValue()) {
                   log.error(
-                      "FlinkHttpWatcher getFromFlinkRestApi and 
getFromYarnRestApi error,job failed,savePoint expired!");
+                      "FlinkAppHttpWatcher getFromFlinkRestApi and 
getFromYarnRestApi error,job failed,savePoint expired!");
                   if (StopFrom.NONE.equals(stopFrom)) {
                     savePointService.expire(application.getId());
                     application.setState(FlinkAppState.LOST.getValue());
@@ -252,9 +252,9 @@ public class FlinkHttpWatcher {
                 cleanSavepoint(application);
                 cleanOptioning(optionState, id);
                 doPersistMetrics(application, true);
-                FlinkAppState appState = 
FlinkAppState.of(application.getState());
-                if (appState.equals(FlinkAppState.FAILED) || 
appState.equals(FlinkAppState.LOST)) {
-                  doAlert(application, 
FlinkAppState.of(application.getState()));
+                FlinkAppState appState = application.getStateEnum();
+                if (appState == FlinkAppState.FAILED || appState == 
FlinkAppState.LOST) {
+                  doAlert(application, application.getStateEnum());
                   if (appState.equals(FlinkAppState.FAILED)) {
                     try {
                       applicationActionService.start(application, true);
@@ -456,14 +456,14 @@ public class FlinkHttpWatcher {
         break;
       case CANCELED:
         log.info(
-            "FlinkHttpWatcher getFromFlinkRestApi, job state {}, stop tracking 
and delete stopFrom!",
+            "FlinkAppHttpWatcher getFromFlinkRestApi, job state {}, stop 
tracking and delete stopFrom!",
             currentState.name());
         cleanSavepoint(application);
         application.setState(currentState.getValue());
         if (StopFrom.NONE.equals(stopFrom) || 
applicationInfoService.checkAlter(application)) {
           if (StopFrom.NONE.equals(stopFrom)) {
             log.info(
-                "FlinkHttpWatcher getFromFlinkRestApi, job cancel is not form 
StreamPark,savePoint expired!");
+                "FlinkAppHttpWatcher getFromFlinkRestApi, job cancel is not 
form StreamPark,savePoint expired!");
             savePointService.expire(application.getId());
           }
           stopCanceledJob(application.getId());
@@ -483,7 +483,7 @@ public class FlinkHttpWatcher {
         break;
       case RESTARTING:
         log.info(
-            "FlinkHttpWatcher getFromFlinkRestApi, job state {},add to 
starting",
+            "FlinkAppHttpWatcher getFromFlinkRestApi, job state {},add to 
starting",
             currentState.name());
         STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
         break;
@@ -501,7 +501,7 @@ public class FlinkHttpWatcher {
    * @param stopFrom stopFrom
    */
   private void getFromYarnRestApi(Application application, StopFrom stopFrom) 
throws Exception {
-    log.debug("FlinkHttpWatcher getFromYarnRestApi starting...");
+    log.debug("FlinkAppHttpWatcher getFromYarnRestApi starting...");
     OptionState optionState = OPTIONING.get(application.getId());
 
     /*
@@ -511,10 +511,10 @@ public class FlinkHttpWatcher {
     */
     Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
     if (flag != null) {
-      log.info("FlinkHttpWatcher previous state: canceling.");
+      log.info("FlinkAppHttpWatcher previous state: canceling.");
       if (StopFrom.NONE.equals(stopFrom)) {
         log.error(
-            "FlinkHttpWatcher query previous state was canceling and stopFrom 
NotFound,savePoint expired!");
+            "FlinkAppHttpWatcher query previous state was canceling and 
stopFrom NotFound,savePoint expired!");
         savePointService.expire(application.getId());
       }
       application.setState(FlinkAppState.CANCELED.getValue());
@@ -526,7 +526,7 @@ public class FlinkHttpWatcher {
       YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
       if (yarnAppInfo == null) {
         if (!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
-          throw new RuntimeException("FlinkHttpWatcher getFromYarnRestApi 
failed ");
+          throw new RuntimeException("FlinkAppHttpWatcher getFromYarnRestApi 
failed ");
         }
       } else {
         try {
@@ -538,7 +538,7 @@ public class FlinkHttpWatcher {
           if (FlinkAppState.KILLED.equals(flinkAppState)) {
             if (StopFrom.NONE.equals(stopFrom)) {
               log.error(
-                  "FlinkHttpWatcher getFromYarnRestApi,job was killed and 
stopFrom NotFound,savePoint expired!");
+                  "FlinkAppHttpWatcher getFromYarnRestApi,job was killed and 
stopFrom NotFound,savePoint expired!");
               savePointService.expire(application.getId());
             }
             flinkAppState = FlinkAppState.CANCELED;
@@ -563,7 +563,7 @@ public class FlinkHttpWatcher {
           }
         } catch (Exception e) {
           if 
(!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
-            throw new RuntimeException("FlinkHttpWatcher getFromYarnRestApi 
error,", e);
+            throw new RuntimeException("FlinkAppHttpWatcher getFromYarnRestApi 
error,", e);
           }
         }
       }
@@ -587,7 +587,7 @@ public class FlinkHttpWatcher {
     if (isKubernetesApp(appId)) {
       return;
     }
-    log.info("FlinkHttpWatcher setOptioning");
+    log.info("FlinkAppHttpWatcher setOptioning");
     OPTIONING.put(appId, state);
     if (state.equals(OptionState.CANCELLING)) {
       STOP_FROM_MAP.put(appId, StopFrom.STREAMPARK);
@@ -598,7 +598,7 @@ public class FlinkHttpWatcher {
     if (isKubernetesApp(application)) {
       return;
     }
-    log.info("FlinkHttpWatcher add app to tracking,appId:{}", 
application.getId());
+    log.info("FlinkAppHttpWatcher add app to tracking,appId:{}", 
application.getId());
     WATCHING_APPS.put(application.getId(), application);
     STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
   }
@@ -607,7 +607,7 @@ public class FlinkHttpWatcher {
     if (isKubernetesApp(appId)) {
       return;
     }
-    log.info("FlinkHttpWatcher add app to savepoint,appId:{}", appId);
+    log.info("FlinkAppHttpWatcher add app to savepoint,appId:{}", appId);
     SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
   }
 
@@ -615,7 +615,7 @@ public class FlinkHttpWatcher {
     if (isKubernetesApp(appId)) {
       return;
     }
-    log.info("FlinkHttpWatcher stop app,appId:{}", appId);
+    log.info("FlinkAppHttpWatcher stop app,appId:{}", appId);
     WATCHING_APPS.remove(appId);
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AutoHealthProbingTask.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppLostWatcher.java
similarity index 83%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AutoHealthProbingTask.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppLostWatcher.java
index d19865f34..d9f7381fd 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AutoHealthProbingTask.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppLostWatcher.java
@@ -36,15 +36,17 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
+import static org.apache.streampark.console.core.enums.FlinkAppState.LOST;
 import static 
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.Bridge.toTrackId;
 import static 
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.isKubernetesApp;
 
 /** This implementation is currently used for probe on yarn,remote,K8s mode */
 @Slf4j
 @Component
-public class AutoHealthProbingTask {
+public class FlinkAppLostWatcher {
 
   @Autowired private ApplicationManageService applicationManageService;
 
@@ -63,14 +65,14 @@ public class AutoHealthProbingTask {
 
   private long lastWatchTime = 0L;
 
-  private Boolean isProbing = false;
+  private AtomicBoolean isProbing = new AtomicBoolean(false);
 
   private Short retryAttempts = PROBE_RETRY_COUNT;
 
   @Scheduled(fixedDelay = 1000)
-  private void schedule() {
+  private void start() {
     long timeMillis = System.currentTimeMillis();
-    if (isProbing) {
+    if (isProbing.get()) {
       if (timeMillis - lastWatchTime >= PROBE_WAIT_INTERVAL.toMillis()) {
         handleProbeResults();
         lastWatchTime = timeMillis;
@@ -78,28 +80,28 @@ public class AutoHealthProbingTask {
     } else {
       if (timeMillis - lastWatchTime >= PROBE_INTERVAL.toMillis()) {
         lastWatchTime = timeMillis;
-        probe(Collections.emptyList());
+        watch(Collections.emptyList());
       }
     }
   }
 
-  public void probe(List<Application> applications) {
+  public void watch(List<Application> applications) {
     List<Application> probeApplication =
         applications.isEmpty() ? applicationManageService.getProbeApps() : 
applications;
     if (probeApplication.isEmpty()) {
       log.info("there is no application that needs to be probe");
       return;
     }
-    isProbing = true;
+    isProbing.set(true);
     probeApplication =
         probeApplication.stream()
             .filter(application -> 
FlinkAppState.isLost(application.getState()))
             .collect(Collectors.toList());
-    updateProbingState(probeApplication);
+    updateState(probeApplication);
     probeApplication.stream().forEach(this::monitorApplication);
   }
 
-  private void updateProbingState(List<Application> applications) {
+  private void updateState(List<Application> applications) {
     applications.stream()
         .filter(application -> FlinkAppState.isLost(application.getState()))
         .forEach(
@@ -113,7 +115,7 @@ public class AutoHealthProbingTask {
   private void handleProbeResults() {
     List<Application> probeApps = applicationManageService.getProbeApps();
     if (shouldRetry(probeApps)) {
-      probe(probeApps);
+      watch(probeApps);
     } else {
       List<AlertProbeMsg> alertProbeMsgs = generateProbeResults(probeApps);
       alertProbeMsgs.stream().forEach(this::alert);
@@ -129,7 +131,7 @@ public class AutoHealthProbingTask {
         });
     applicationManageService.updateBatchById(applications);
     retryAttempts = PROBE_RETRY_COUNT;
-    isProbing = false;
+    isProbing.set(false);
   }
 
   private void alert(AlertProbeMsg alertProbeMsg) {
@@ -155,17 +157,9 @@ public class AutoHealthProbingTask {
                           apps.forEach(
                               app -> {
                                 alertProbeMsg.setUser(app.getUserName());
-                                alertProbeMsg.incrementProbeJobs();
-                                if (app.getState() == 
FlinkAppState.LOST.getValue()) {
-                                  alertProbeMsg.incrementLostJobs();
-                                } else if (app.getState() == 
FlinkAppState.FAILED.getValue()) {
-                                  alertProbeMsg.incrementFailedJobs();
-                                } else if (app.getState() == 
FlinkAppState.CANCELED.getValue()) {
-                                  alertProbeMsg.incrementCancelledJobs();
-                                }
+                                alertProbeMsg.compute(app.getStateEnum());
                                 alertIds.add(app.getAlertId());
                               });
-
                           alertProbeMsg.setAlertId(alertIds);
                           return alertProbeMsg;
                         })))
@@ -176,14 +170,12 @@ public class AutoHealthProbingTask {
     if (isKubernetesApp(application)) {
       k8SFlinkTrackMonitor.doWatching(toTrackId(application));
     } else {
-      FlinkHttpWatcher.doWatching(application);
+      FlinkAppHttpWatcher.doWatching(application);
     }
   }
 
   private Boolean shouldRetry(List<Application> applications) {
-    return applications.stream()
-            .anyMatch(
-                application -> FlinkAppState.LOST.getValue() == 
application.getState().intValue())
+    return applications.stream().anyMatch(application -> 
application.getStateEnum() == LOST)
         && (retryAttempts-- > 0);
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java
similarity index 98%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java
index 02d4f8ca1..5e30557c9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkCheckpointProcessor.java
@@ -44,7 +44,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 @Component
-public class CheckpointProcessor {
+public class FlinkCheckpointProcessor {
 
   private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0");
   private static final Integer SAVEPOINT_CACHE_HOUR = 1;
@@ -70,7 +70,7 @@ public class CheckpointProcessor {
 
   @Autowired private SavePointService savePointService;
 
-  @Autowired private FlinkHttpWatcher flinkHttpWatcher;
+  @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
 
   public void process(Application application, @Nonnull CheckPoints 
checkPoints) {
     checkPoints.getLatestCheckpoint().forEach(checkPoint -> 
process(application, checkPoint));
@@ -86,7 +86,7 @@ public class CheckpointProcessor {
       if (shouldStoreAsSavepoint(checkPointKey, checkPoint)) {
         savepointedCache.put(checkPointKey.getSavePointId(), 
DEFAULT_FLAG_BYTE);
         saveSavepoint(checkPoint, application.getId());
-        flinkHttpWatcher.cleanSavepoint(application);
+        flinkAppHttpWatcher.cleanSavepoint(application);
         return;
       }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 3006a1b6b..45560c35f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -72,7 +72,7 @@ public class FlinkK8sChangeEventListener {
 
   @Lazy @Autowired private AlertService alertService;
 
-  @Lazy @Autowired private CheckpointProcessor checkpointProcessor;
+  @Lazy @Autowired private FlinkCheckpointProcessor checkpointProcessor;
 
   private final ExecutorService executor =
       new ThreadPoolExecutor(
@@ -104,7 +104,7 @@ public class FlinkK8sChangeEventListener {
     applicationInfoService.persistMetrics(app);
 
     // email alerts when necessary
-    FlinkAppState state = FlinkAppState.of(app.getState());
+    FlinkAppState state = app.getStateEnum();
     if (FlinkAppState.FAILED.equals(state)
         || FlinkAppState.LOST.equals(state)
         || FlinkAppState.RESTARTING.equals(state)
@@ -175,7 +175,7 @@ public class FlinkK8sChangeEventListener {
     // infer the final flink job state
     Enumeration.Value state =
         FlinkJobStatusWatcher.inferFlinkJobStateFromPersist(
-            jobStatus.jobState(), 
toK8sFlinkJobState(FlinkAppState.of(app.getState())));
+            jobStatus.jobState(), toK8sFlinkJobState(app.getStateEnum()));
 
     // corrective start-time / end-time / duration
     long preStartTime = app.getStartTime() != null ? 
app.getStartTime().getTime() : 0;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index 7840d06e6..2d5a1a339 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -116,7 +116,7 @@ public class FlinkK8sWatcherWrapper {
     }
     // filter out the application that should be tracking
     return k8sApplication.stream()
-        .filter(app -> 
!FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum())))
+        .filter(app -> 
!FlinkJobState.isEndState(toK8sFlinkJobState(app.getStateEnum())))
         .map(Bridge::toTrackId)
         .collect(Collectors.toList());
   }
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index c25a504a4..f57b8efe7 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -265,9 +265,9 @@
                 else u.nick_name
                 end as nick_name
         from t_flink_app t
-                 inner join t_user u
-                            on t.user_id = u.user_id
-        where (t.tracking = 1 and t.state = 13) or t.probing = 1
+        inner join t_user u
+        on t.user_id = u.user_id
+        where t.probing = 1 or (t.tracking = 1 and t.state = 13)
     </select>
 
     <update id="mapping" 
parameterType="org.apache.streampark.console.core.entity.Application">
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
index 58e36b501..79ee5ca43 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
@@ -29,7 +29,7 @@ import org.apache.streampark.console.core.enums.ReleaseState;
 import 
org.apache.streampark.console.core.service.application.ApplicationActionService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
 import org.apache.streampark.console.core.service.impl.FlinkClusterServiceImpl;
-import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
 import org.apache.streampark.testcontainer.flink.FlinkStandaloneSessionCluster;
 
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@@ -45,7 +45,7 @@ import java.util.Base64;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.streampark.console.core.task.FlinkHttpWatcher.WATCHING_INTERVAL;
+import static 
org.apache.streampark.console.core.task.FlinkAppHttpWatcher.WATCHING_INTERVAL;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
@@ -70,7 +70,7 @@ class ApplicationManageServiceITest extends 
SpringIntegrationTestBase {
 
   @Autowired private FlinkSqlService sqlService;
 
-  @Autowired private FlinkHttpWatcher flinkHttpWatcher;
+  @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
 
   @BeforeAll
   static void setup() {
@@ -145,7 +145,7 @@ class ApplicationManageServiceITest extends 
SpringIntegrationTestBase {
         CompletableFuture.supplyAsync(
             () -> {
               while (true) {
-                if (flinkHttpWatcher.tryQueryFlinkAppState(application.getId())
+                if 
(flinkAppHttpWatcher.tryQueryFlinkAppState(application.getId())
                     == FlinkAppState.RUNNING) {
                   break;
                 }

Reply via email to