This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new b69b19877 [Improve] executors improvement (#3535)
b69b19877 is described below

commit b69b198779f7b68edd09281e803f2baf8502974d
Author: benjobs <[email protected]>
AuthorDate: Tue Feb 6 12:44:59 2024 +0800

    [Improve] executors improvement (#3535)
    
    * [Improve] ThreadPoolExecutor improvement
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../apache/streampark/common/util/YarnUtils.scala  |  12 +-
 .../console/core/annotation/AppUpdated.java        |   5 +-
 .../console/core/aspect/StreamParkAspect.java      |   6 +-
 .../core/service/alert/impl/AlertServiceImpl.java  |  19 ++-
 .../core/service/impl/AppBuildPipeServiceImpl.java |  42 +++--
 .../core/service/impl/ApplicationServiceImpl.java  |  57 ++++---
 .../core/service/impl/FlinkClusterServiceImpl.java |  27 +--
 .../core/service/impl/ProjectServiceImpl.java      |  23 +--
 .../core/service/impl/SavePointServiceImpl.java    |  26 +--
 .../console/core/task/CheckpointProcessor.java     |   4 +-
 ...ESTAPIWatcher.java => FlinkAppHttpWatcher.java} | 186 +++++++++++----------
 .../core/task/FlinkK8sChangeEventListener.java     |  19 +--
 12 files changed, 228 insertions(+), 198 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index 9400f688f..7be3bcbf3 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -222,11 +222,11 @@ object YarnUtils extends Logger {
    * @return
    */
   @throws[IOException]
-  def restRequest(url: String): String = {
+  def restRequest(url: String, timeout: Int = 5000): String = {
     if (url == null) return null
     url match {
       case u if u.matches("^http(|s)://.*") =>
-        Try(request(url)) match {
+        Try(request(url, timeout)) match {
           case Success(v) => v
           case Failure(e) =>
             if (hasYarnHttpKerberosAuth) {
@@ -236,11 +236,11 @@ object YarnUtils extends Logger {
             }
         }
       case _ =>
-        Try(request(s"${getRMWebAppURL()}/$url")) match {
+        Try(request(s"${getRMWebAppURL()}/$url", timeout)) match {
           case Success(v) => v
           case Failure(_) =>
             Utils.retry[String](5) {
-              request(s"${getRMWebAppURL(true)}/$url")
+              request(s"${getRMWebAppURL(true)}/$url", timeout)
             } match {
               case Success(v) => v
               case Failure(e) =>
@@ -250,8 +250,8 @@ object YarnUtils extends Logger {
     }
   }
 
-  private[this] def request(reqUrl: String): String = {
-    val config = RequestConfig.custom.setConnectTimeout(5000).build
+  private[this] def request(reqUrl: String, timeout: Int): String = {
+    val config = RequestConfig.custom.setConnectTimeout(timeout).build
     if (hasYarnHttpKerberosAuth) {
       HadoopUtils
         .getUgi()
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
index 345a361c7..cfb2a5563 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
@@ -17,6 +17,8 @@
 
 package org.apache.streampark.console.core.annotation;
 
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
+
 import org.aspectj.lang.ProceedingJoinPoint;
 
 import java.lang.annotation.ElementType;
@@ -29,8 +31,7 @@ import java.lang.annotation.Target;
  * application state update, need to add this annotation, This annotation 
marks which methods will
  * cause the application to be updated, Will work together with {@link
  * 
org.apache.streampark.console.core.aspect.StreamParkAspect#appUpdated(ProceedingJoinPoint)},
 The
- * final purpose will be refresh {@link
- * org.apache.streampark.console.core.task.FlinkRESTAPIWatcher#WATCHING_APPS}, 
Make the state of the
+ * final purpose will be refresh {@link FlinkAppHttpWatcher#WATCHING_APPS}, 
Make the state of the
  * job consistent with the database
  */
 @Target(ElementType.METHOD)
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
index 90f7eade0..f9f2e6372 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
@@ -26,7 +26,7 @@ import 
org.apache.streampark.console.core.enums.PermissionType;
 import org.apache.streampark.console.core.enums.UserType;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.ServiceHelper;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
 import org.apache.streampark.console.system.entity.AccessToken;
 import org.apache.streampark.console.system.entity.Member;
 import org.apache.streampark.console.system.entity.User;
@@ -56,7 +56,7 @@ import java.util.Objects;
 @Aspect
 public class StreamParkAspect {
 
-  @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+  @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
   @Autowired private ServiceHelper serviceHelper;
   @Autowired private MemberService memberService;
   @Autowired private ApplicationService applicationService;
@@ -91,7 +91,7 @@ public class StreamParkAspect {
     MethodSignature methodSignature = (MethodSignature) 
joinPoint.getSignature();
     log.debug("appUpdated aspect, method:{}", methodSignature.getName());
     Object target = joinPoint.proceed();
-    flinkRESTAPIWatcher.init();
+    flinkAppHttpWatcher.initialize();
     return target;
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
index 19a40249e..b08b0d2cf 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service.alert.impl;
 
+import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.exception.AlertException;
 import org.apache.streampark.console.base.util.SpringContextUtils;
@@ -39,22 +40,35 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.CollectionUtils;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 @Slf4j
 @Service
 public class AlertServiceImpl implements AlertService {
   @Autowired private AlertConfigService alertConfigService;
 
+  private final ExecutorService notifyExecutor =
+      new ThreadPoolExecutor(
+          1,
+          2,
+          20L,
+          TimeUnit.SECONDS,
+          new LinkedBlockingQueue<>(),
+          ThreadUtils.threadFactory("streampark-notify"));
+
   @Override
   public void alert(Application application, CheckPointStatus 
checkPointStatus) {
     AlertTemplate alertTemplate = AlertTemplate.of(application, 
checkPointStatus);
-    alert(application, alertTemplate);
+    notifyExecutor.submit(() -> alert(application, alertTemplate));
   }
 
   @Override
   public void alert(Application application, FlinkAppState appState) {
     AlertTemplate alertTemplate = AlertTemplate.of(application, appState);
-    alert(application, alertTemplate);
+    notifyExecutor.submit(() -> alert(application, alertTemplate));
   }
 
   private void alert(Application application, AlertTemplate alertTemplate) {
@@ -73,6 +87,7 @@ public class AlertServiceImpl implements AlertService {
   @Override
   public boolean alert(AlertConfigWithParams params, AlertTemplate 
alertTemplate)
       throws AlertException {
+
     List<AlertType> alertTypes = AlertType.decode(params.getAlertType());
     if (CollectionUtils.isEmpty(alertTypes)) {
       return true;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 00f46799e..b735797c0 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -51,7 +51,7 @@ import 
org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.MessageService;
 import org.apache.streampark.console.core.service.ServiceHelper;
 import org.apache.streampark.console.core.service.SettingService;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
 import org.apache.streampark.flink.packer.docker.DockerConf;
 import org.apache.streampark.flink.packer.maven.Artifact;
 import org.apache.streampark.flink.packer.maven.MavenTool;
@@ -89,6 +89,7 @@ import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.Nonnull;
+import javax.annotation.PreDestroy;
 
 import java.io.File;
 import java.io.IOException;
@@ -131,20 +132,10 @@ public class AppBuildPipeServiceImpl
 
   @Autowired private ApplicationLogService applicationLogService;
 
-  @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+  @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
 
   @Autowired private ApplicationConfigService applicationConfigService;
 
-  private final ExecutorService executorService =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          60L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("streampark-build-pipeline-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
-
   private static final Cache<Long, DockerPullSnapshot> 
DOCKER_PULL_PG_SNAPSHOTS =
       Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.DAYS).build();
 
@@ -154,6 +145,22 @@ public class AppBuildPipeServiceImpl
   private static final Cache<Long, DockerPushSnapshot> 
DOCKER_PUSH_PG_SNAPSHOTS =
       Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.DAYS).build();
 
+  private static final int CPU_NUM = Math.max(4, 
Runtime.getRuntime().availableProcessors() * 2);
+
+  private final ExecutorService buildPipelineExecutor =
+      new ThreadPoolExecutor(
+          1,
+          CPU_NUM,
+          60L,
+          TimeUnit.SECONDS,
+          new LinkedBlockingQueue<>(),
+          ThreadUtils.threadFactory("streampark-flink-buildPipeline"));
+
+  @PreDestroy
+  public void shutdown() {
+    buildPipelineExecutor.shutdown();
+  }
+
   @Override
   public boolean buildApplication(@Nonnull Application app, ApplicationLog 
applicationLog) {
     // 1) flink sql setDependency
@@ -183,8 +190,8 @@ public class AppBuildPipeServiceImpl
             app.setRelease(ReleaseState.RELEASING.get());
             applicationService.updateRelease(app);
 
-            if (flinkRESTAPIWatcher.isWatchingApp(app.getId())) {
-              flinkRESTAPIWatcher.init();
+            if (flinkAppHttpWatcher.isWatchingApp(app.getId())) {
+              flinkAppHttpWatcher.initialize();
             }
 
             // 1) checkEnv
@@ -257,8 +264,8 @@ public class AppBuildPipeServiceImpl
             }
             applicationService.updateRelease(app);
             applicationLogService.save(applicationLog);
-            if (flinkRESTAPIWatcher.isWatchingApp(app.getId())) {
-              flinkRESTAPIWatcher.init();
+            if (flinkAppHttpWatcher.isWatchingApp(app.getId())) {
+              flinkAppHttpWatcher.initialize();
             }
           }
         });
@@ -291,8 +298,7 @@ public class AppBuildPipeServiceImpl
     DOCKER_PULL_PG_SNAPSHOTS.invalidate(app.getId());
     DOCKER_BUILD_PG_SNAPSHOTS.invalidate(app.getId());
     DOCKER_PUSH_PG_SNAPSHOTS.invalidate(app.getId());
-    // async release pipeline
-    executorService.submit((Runnable) pipeline::launch);
+    buildPipelineExecutor.submit(pipeline::launch);
     return saved;
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index bd189cdbe..8360f7439 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -78,8 +78,8 @@ import 
org.apache.streampark.console.core.service.ServiceHelper;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.VariableService;
 import org.apache.streampark.console.core.service.YarnQueueService;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
 import org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.CancelRequest;
 import org.apache.streampark.flink.client.bean.CancelResponse;
@@ -126,6 +126,7 @@ import org.springframework.web.multipart.MultipartFile;
 
 import javax.annotation.Nonnull;
 import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 
 import java.io.File;
 import java.io.IOException;
@@ -172,16 +173,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
   private static final int DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT = 5;
 
-  private final ExecutorService executorService =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          60L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("streampark-deploy-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
-
   private static final Pattern JOB_NAME_PATTERN =
       Pattern.compile("^[.\\x{4e00}-\\x{9fa5}A-Za-z\\d_\\-\\s]+$");
 
@@ -221,11 +212,27 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
   @Autowired private FlinkK8sWatcherWrapper k8sWatcherWrapper;
 
+  private static final int CPU_NUM = Math.max(2, 
Runtime.getRuntime().availableProcessors() * 4);
+
+  private final ExecutorService bootstrapExecutor =
+      new ThreadPoolExecutor(
+          1,
+          CPU_NUM,
+          60L,
+          TimeUnit.SECONDS,
+          new LinkedBlockingQueue<>(),
+          ThreadUtils.threadFactory("streampark-flink-app-bootstrap"));
+
   @PostConstruct
   public void resetOptionState() {
     this.baseMapper.resetOptionState();
   }
 
+  @PreDestroy
+  public void shutdown() {
+    bootstrapExecutor.shutdown();
+  }
+
   private final Map<Long, CompletableFuture<SubmitResponse>> startFutureMap =
       new ConcurrentHashMap<>();
 
@@ -243,7 +250,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     Integer runningJob = 0;
 
     // stat metrics from other than kubernetes mode
-    for (Application app : FlinkRESTAPIWatcher.getWatchingApps()) {
+    for (Application app : FlinkAppHttpWatcher.getWatchingApps()) {
       if (!teamId.equals(app.getTeamId())) {
         continue;
       }
@@ -450,7 +457,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     if (!FlinkAppState.CANCELED.equals(state)) {
       return false;
     }
-    long cancelUserId = FlinkRESTAPIWatcher.getCanceledJobUserId(appId);
+    long cancelUserId = FlinkAppHttpWatcher.getCanceledJobUserId(appId);
     long appUserId = application.getUserId();
     return cancelUserId != -1 && cancelUserId != appUserId;
   }
@@ -544,7 +551,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     if (exists) {
       return true;
     }
-    for (Application application : FlinkRESTAPIWatcher.getWatchingApps()) {
+    for (Application application : FlinkAppHttpWatcher.getWatchingApps()) {
       if (clusterId.equals(application.getFlinkClusterId())
           && FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum())) 
{
         return true;
@@ -1238,14 +1245,14 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     if (application.isKubernetesModeJob()) {
       flinkK8sWatcher.doWatching(k8sWatcherWrapper.toTrackId(application));
     } else {
-      FlinkRESTAPIWatcher.doWatching(application);
+      FlinkAppHttpWatcher.doWatching(application);
     }
     return mapping;
   }
 
   @Override
   public void cancel(Application appParam) throws Exception {
-    FlinkRESTAPIWatcher.setOptionState(appParam.getId(), 
OptionState.CANCELLING);
+    FlinkAppHttpWatcher.setOptionState(appParam.getId(), 
OptionState.CANCELLING);
     Application application = getById(appParam.getId());
     application.setState(FlinkAppState.CANCELLING.getValue());
 
@@ -1257,7 +1264,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     applicationLog.setYarnAppId(application.getClusterId());
 
     if (appParam.getSavePointed()) {
-      FlinkRESTAPIWatcher.addSavepoint(application.getId());
+      FlinkAppHttpWatcher.addSavepoint(application.getId());
       application.setOptionState(OptionState.SAVEPOINTING.getValue());
     } else {
       application.setOptionState(OptionState.CANCELLING.getValue());
@@ -1268,7 +1275,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
     Long userId = serviceHelper.getUserId();
     if (!application.getUserId().equals(userId)) {
-      FlinkRESTAPIWatcher.addCanceledApp(application.getId(), userId);
+      FlinkAppHttpWatcher.addCanceledApp(application.getId(), userId);
     }
 
     FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
@@ -1315,7 +1322,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
     final Date triggerTime = new Date();
     CompletableFuture<CancelResponse> cancelFuture =
-        CompletableFuture.supplyAsync(() -> FlinkClient.cancel(cancelRequest), 
executorService);
+        CompletableFuture.supplyAsync(() -> FlinkClient.cancel(cancelRequest), 
bootstrapExecutor);
 
     cancelFutureMap.put(application.getId(), cancelFuture);
 
@@ -1347,7 +1354,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               if (application.isKubernetesModeJob()) {
                 flinkK8sWatcher.unWatching(trackId);
               } else {
-                FlinkRESTAPIWatcher.unWatching(application.getId());
+                FlinkAppHttpWatcher.unWatching(application.getId());
               }
             }
             return;
@@ -1604,7 +1611,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             exposedType);
 
     CompletableFuture<SubmitResponse> future =
-        CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), 
executorService);
+        CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), 
bootstrapExecutor);
 
     startFutureMap.put(application.getId(), future);
 
@@ -1631,7 +1638,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                 TrackId trackId = k8sWatcherWrapper.toTrackId(application);
                 flinkK8sWatcher.unWatching(trackId);
               } else {
-                FlinkRESTAPIWatcher.unWatching(appParam.getId());
+                FlinkAppHttpWatcher.unWatching(appParam.getId());
               }
             }
             return;
@@ -1692,8 +1699,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               }
             }
           } else {
-            FlinkRESTAPIWatcher.setOptionState(appParam.getId(), 
OptionState.STARTING);
-            FlinkRESTAPIWatcher.doWatching(application);
+            FlinkAppHttpWatcher.setOptionState(appParam.getId(), 
OptionState.STARTING);
+            FlinkAppHttpWatcher.doWatching(application);
           }
           // update app
           updateById(application);
@@ -1786,7 +1793,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       TrackId id = k8sWatcherWrapper.toTrackId(application);
       flinkK8sWatcher.doWatching(id);
     } else {
-      FlinkRESTAPIWatcher.unWatching(application.getId());
+      FlinkAppHttpWatcher.unWatching(application.getId());
     }
     // kill application
     if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 88690f50e..1240313d9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -73,16 +73,6 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   private static final String ERROR_CLUSTER_QUEUE_HINT =
       "Queue label '%s' isn't available in database, please add it first.";
 
-  private final ExecutorService executorService =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          60L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("streampark-cluster-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
-
   @Autowired private FlinkEnvService flinkEnvService;
 
   @Autowired private ServiceHelper serviceHelper;
@@ -93,6 +83,17 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
 
   @Autowired private SettingService settingService;
 
+  private static final int CPU_NUM = Math.max(4, 
Runtime.getRuntime().availableProcessors() * 2);
+
+  private final ExecutorService bootstrapExecutor =
+      new ThreadPoolExecutor(
+          1,
+          CPU_NUM,
+          60L,
+          TimeUnit.SECONDS,
+          new LinkedBlockingQueue<>(),
+          ThreadUtils.threadFactory("streampark-flink-cluster-bootstrap"));
+
   @Override
   public ResponseResult check(FlinkCluster cluster) {
     ResponseResult result = new ResponseResult();
@@ -169,10 +170,10 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     try {
       // 1) deployRequest
       DeployRequest deployRequest = getDeployRequest(flinkCluster);
-      log.info("deploy cluster request: {}", deployRequest);
 
+      log.info("deploy cluster request: {}", deployRequest);
       Future<DeployResponse> future =
-          executorService.submit(() -> FlinkClient.deploy(deployRequest));
+          bootstrapExecutor.submit(() -> FlinkClient.deploy(deployRequest));
       DeployResponse deployResponse = future.get(5, TimeUnit.SECONDS);
       if (deployResponse.error() != null) {
         throw new ApiDetailException(
@@ -311,7 +312,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     DeployRequest deployRequest = getDeployRequest(flinkCluster);
     try {
       Future<ShutDownResponse> future =
-          executorService.submit(() -> FlinkClient.shutdown(deployRequest));
+          bootstrapExecutor.submit(() -> FlinkClient.shutdown(deployRequest));
       ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
       if (shutDownResponse.error() != null) {
         throw new ApiDetailException(
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 931dd712f..0285329bb 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -38,7 +38,7 @@ import org.apache.streampark.console.core.enums.ReleaseState;
 import org.apache.streampark.console.core.mapper.ProjectMapper;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.ProjectService;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
 import org.apache.streampark.console.core.task.ProjectBuildTask;
 
 import org.apache.flink.configuration.MemorySize;
@@ -80,17 +80,18 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
 
   @Autowired private ApplicationService applicationService;
 
-  @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+  @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
 
-  private final ExecutorService executorService =
+  private static final int CPU_NUM = Math.max(4, 
Runtime.getRuntime().availableProcessors() * 2);
+
+  private final ExecutorService projectBuildExecutor =
       new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
+          1,
+          CPU_NUM,
           60L,
           TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("streampark-build-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
+          new LinkedBlockingQueue<>(),
+          ThreadUtils.threadFactory("streampark-project-build"));
 
   @Override
   public RestResponse create(Project project) {
@@ -204,7 +205,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
               if (buildState == BuildState.SUCCESSFUL) {
                 baseMapper.updateBuildTime(id);
               }
-              flinkRESTAPIWatcher.init();
+              flinkAppHttpWatcher.initialize();
             },
             fileLogger -> {
               List<Application> applications =
@@ -219,10 +220,10 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
                     app.setBuild(true);
                     this.applicationService.updateRelease(app);
                   });
-              flinkRESTAPIWatcher.init();
+              flinkAppHttpWatcher.initialize();
             });
     CompletableFuture<Void> buildTask =
-        CompletableFuture.runAsync(projectBuildTask, executorService);
+        CompletableFuture.runAsync(projectBuildTask, projectBuildExecutor);
     // TODO May need to define parameters to set the build timeout in the 
future.
     CompletableFutureUtils.runTimeout(buildTask, 20, TimeUnit.MINUTES);
   }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 8dde05c8b..3f132f014 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -43,7 +43,7 @@ import 
org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.SavePointService;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
+import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.SavepointResponse;
 import org.apache.streampark.flink.client.bean.TriggerSavepointRequest;
@@ -93,17 +93,18 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
 
   @Autowired private ApplicationLogService applicationLogService;
 
-  @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+  @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
 
-  private final ExecutorService executorService =
+  private static final int CPU_NUM = Math.max(2, 
Runtime.getRuntime().availableProcessors());
+
+  private final ExecutorService flinkTriggerExecutor =
       new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
+          1,
+          CPU_NUM,
           60L,
           TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("trigger-savepoint-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
+          new LinkedBlockingQueue<>(),
+          ThreadUtils.threadFactory("streampark-flink-savepoint-trigger"));
 
   @Override
   public void expire(Long appId) {
@@ -288,12 +289,12 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     applicationLog.setOptionTime(new Date());
     applicationLog.setYarnAppId(application.getClusterId());
 
-    FlinkRESTAPIWatcher.addSavepoint(application.getId());
+    FlinkAppHttpWatcher.addSavepoint(application.getId());
 
     application.setOptionState(OptionState.SAVEPOINTING.getValue());
     application.setOptionTime(new Date());
     this.applicationService.updateById(application);
-    flinkRESTAPIWatcher.init();
+    flinkAppHttpWatcher.initialize();
 
     FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
 
@@ -316,7 +317,8 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
             application.getK8sNamespace());
 
     CompletableFuture<SavepointResponse> savepointFuture =
-        CompletableFuture.supplyAsync(() -> 
FlinkClient.triggerSavepoint(request), executorService);
+        CompletableFuture.supplyAsync(
+            () -> FlinkClient.triggerSavepoint(request), flinkTriggerExecutor);
 
     handleSavepointResponseFuture(application, applicationLog, 
savepointFuture);
   }
@@ -360,7 +362,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
               application.setOptionState(OptionState.NONE.getValue());
               application.setOptionTime(new Date());
               applicationService.update(application);
-              flinkRESTAPIWatcher.init();
+              flinkAppHttpWatcher.initialize();
             });
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index 42b1a4137..092f6b6ae 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -70,7 +70,7 @@ public class CheckpointProcessor {
 
   @Autowired private SavePointService savePointService;
 
-  @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+  @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
 
   public void process(Application application, @Nonnull CheckPoints 
checkPoints) {
     checkPoints.getLatestCheckpoint().forEach(checkPoint -> 
process(application, checkPoint));
@@ -86,7 +86,7 @@ public class CheckpointProcessor {
       if (shouldStoreAsSavepoint(checkPointKey, checkPoint)) {
         savepointedCache.put(checkPointKey.getSavePointId(), 
DEFAULT_FLAG_BYTE);
         saveSavepoint(checkPoint, application.getId());
-        flinkRESTAPIWatcher.cleanSavepoint(application);
+        flinkAppHttpWatcher.cleanSavepoint(application);
         return;
       }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
similarity index 83%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index f1b1b5a61..9f406e655 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -68,7 +68,7 @@ import java.util.stream.Collectors;
 /** This implementation is currently used for tracing flink job on 
yarn,standalone,remote mode */
 @Slf4j
 @Component
-public class FlinkRESTAPIWatcher {
+public class FlinkAppHttpWatcher {
 
   @Autowired private ApplicationService applicationService;
 
@@ -85,6 +85,8 @@ public class FlinkRESTAPIWatcher {
   // option interval within 10 seconds
   private static final long OPTION_INTERVAL = 1000L * 10;
 
+  private static final int HTTP_TIMEOUT = 2000;
+
   /**
    *
    *
@@ -142,17 +144,19 @@ public class FlinkRESTAPIWatcher {
 
   private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0");
 
-  private static final ExecutorService EXECUTOR =
+  private static final int CPU_NUM = Math.max(4, 
Runtime.getRuntime().availableProcessors() * 2);
+
+  private static final ExecutorService watchExecutor =
       new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
+          CPU_NUM,
+          CPU_NUM * 5,
           60L,
           TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("flink-restapi-watching-executor"));
+          new LinkedBlockingQueue<>(),
+          ThreadUtils.threadFactory("streampark-flink-app-watching"));
 
   @PostConstruct
-  public void init() {
+  public void initialize() {
     WATCHING_APPS.clear();
     List<Application> applications =
         applicationService.list(
@@ -165,7 +169,7 @@ public class FlinkRESTAPIWatcher {
   @PreDestroy
   public void doStop() {
     log.info(
-        "FlinkRESTAPIWatcher StreamPark Console will be shutdown,persistent 
application to database.");
+        "[StreamPark][FlinkAppHttpWatcher] StreamPark Console will be 
shutdown,persistent application to database.");
     WATCHING_APPS.forEach((k, v) -> applicationService.persistMetrics(v));
   }
 
@@ -182,90 +186,87 @@ public class FlinkRESTAPIWatcher {
     // The application has been started at the first time, or the front-end is 
operating start/stop,
     // need to return status info immediately.
     if (lastWatchingTime == null || !OPTIONING.isEmpty()) {
-      doWatch();
+      watch();
     } else if (System.currentTimeMillis() - lastOptionTime <= OPTION_INTERVAL) 
{
       // The last operation time is less than option interval.(10 seconds)
-      doWatch();
+      watch();
     } else if (System.currentTimeMillis() - lastWatchingTime >= 
WATCHING_INTERVAL) {
       // Normal information obtain, check if there is 5 seconds interval 
between this time and the
       // last time.(once every 5 seconds)
-      doWatch();
+      watch();
     }
   }
 
-  private void doWatch() {
+  private void watch() {
     lastWatchingTime = System.currentTimeMillis();
-    for (Map.Entry<Long, Application> entry : WATCHING_APPS.entrySet()) {
-      watch(entry.getKey(), entry.getValue());
+    for (Application application : WATCHING_APPS.values()) {
+      watchExecutor.execute(
+          () -> {
+            try {
+              // query status from flink rest api
+              getFromFlinkRestApi(application);
+            } catch (Exception flinkException) {
+              // query status from yarn rest api
+              try {
+                getFromYarnRestApi(application);
+              } catch (Exception yarnException) {
+                doStateFailed(application);
+              }
+            }
+          });
     }
   }
 
-  private void watch(Long key, Application application) {
-    EXECUTOR.execute(
-        () -> {
-          final StopFrom stopFrom =
-              STOP_FROM_MAP.getOrDefault(key, null) == null
-                  ? StopFrom.NONE
-                  : STOP_FROM_MAP.get(key);
-          final OptionState optionState = OPTIONING.get(key);
+  private void doStateFailed(Application application) {
+    /*
+     Query from flink's restAPI and yarn's restAPI both failed.
+     In this case, it is necessary to decide whether to return to the final 
state depending on the state being operated
+    */
+    StopFrom stopFrom = getStopFrom(application);
+    OptionState optionState = OPTIONING.get(application.getId());
+    if (optionState == null || !optionState.equals(OptionState.STARTING)) {
+      // non-mapping
+      if (application.getState() != FlinkAppState.MAPPING.getValue()) {
+        log.error(
+            "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi and 
getFromYarnRestApi error,job failed,savePoint expired!");
+        if (StopFrom.NONE.equals(stopFrom)) {
+          savePointService.expire(application.getId());
+          application.setState(FlinkAppState.LOST.getValue());
+          alertService.alert(application, FlinkAppState.LOST);
+        } else {
+          application.setState(FlinkAppState.CANCELED.getValue());
+        }
+      }
+      /*
+       This step means that the above two ways to get information have failed, 
and this step is the last step,
+       which will directly identify the mission as cancelled or lost.
+       Need clean savepoint.
+      */
+      application.setEndTime(new Date());
+      cleanSavepoint(application);
+      cleanOptioning(optionState, application.getId());
+      doPersistMetrics(application, true);
+      FlinkAppState appState = FlinkAppState.of(application.getState());
+      if (appState.equals(FlinkAppState.FAILED) || 
appState.equals(FlinkAppState.LOST)) {
+        alertService.alert(application, 
FlinkAppState.of(application.getState()));
+        if (appState.equals(FlinkAppState.FAILED)) {
           try {
-            // query status from flink rest api
-            getFromFlinkRestApi(application, stopFrom);
-          } catch (Exception flinkException) {
-            // query status from yarn rest api
-            try {
-              getFromYarnRestApi(application, stopFrom);
-            } catch (Exception yarnException) {
-              /*
-               Query from flink's restAPI and yarn's restAPI both failed.
-               In this case, it is necessary to decide whether to return to 
the final state depending on the state being operated
-              */
-              if (optionState == null || 
!optionState.equals(OptionState.STARTING)) {
-                // non-mapping
-                if (application.getState() != 
FlinkAppState.MAPPING.getValue()) {
-                  log.error(
-                      "FlinkRESTAPIWatcher getFromFlinkRestApi and 
getFromYarnRestApi error,job failed,savePoint expired!");
-                  if (StopFrom.NONE.equals(stopFrom)) {
-                    savePointService.expire(application.getId());
-                    application.setState(FlinkAppState.LOST.getValue());
-                    alertService.alert(application, FlinkAppState.LOST);
-                  } else {
-                    application.setState(FlinkAppState.CANCELED.getValue());
-                  }
-                }
-                /*
-                 This step means that the above two ways to get information 
have failed, and this step is the last step,
-                 which will directly identify the mission as cancelled or lost.
-                 Need clean savepoint.
-                */
-                application.setEndTime(new Date());
-                cleanSavepoint(application);
-                cleanOptioning(optionState, key);
-                doPersistMetrics(application, true);
-                FlinkAppState appState = 
FlinkAppState.of(application.getState());
-                if (appState.equals(FlinkAppState.FAILED) || 
appState.equals(FlinkAppState.LOST)) {
-                  alertService.alert(application, 
FlinkAppState.of(application.getState()));
-                  if (appState.equals(FlinkAppState.FAILED)) {
-                    try {
-                      applicationService.start(application, true);
-                    } catch (Exception e) {
-                      log.error(e.getMessage(), e);
-                    }
-                  }
-                }
-              }
-            }
+            applicationService.start(application, true);
+          } catch (Exception e) {
+            log.error(e.getMessage(), e);
           }
-        });
+        }
+      }
+    }
   }
 
   /**
    * Get the current task running status information from flink restapi
    *
    * @param application application
-   * @param stopFrom stopFrom
    */
-  private void getFromFlinkRestApi(Application application, StopFrom stopFrom) 
throws Exception {
+  private void getFromFlinkRestApi(Application application) throws Exception {
+    StopFrom stopFrom = getStopFrom(application);
     JobsOverview jobsOverview = httpJobsOverview(application);
     Optional<JobsOverview.Job> optional;
     ExecutionMode execMode = application.getExecutionModeEnum();
@@ -312,6 +313,12 @@ public class FlinkRESTAPIWatcher {
     }
   }
 
+  private StopFrom getStopFrom(Application application) {
+    return STOP_FROM_MAP.getOrDefault(application.getId(), null) == null
+        ? StopFrom.NONE
+        : STOP_FROM_MAP.get(application.getId());
+  }
+
   /**
    * handle job overview
    *
@@ -445,14 +452,14 @@ public class FlinkRESTAPIWatcher {
         break;
       case CANCELED:
         log.info(
-            "FlinkRESTAPIWatcher getFromFlinkRestApi, job state {}, stop 
tracking and delete stopFrom!",
+            "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job state 
{}, stop tracking and delete stopFrom!",
             currentState.name());
         cleanSavepoint(application);
         application.setState(currentState.getValue());
         if (StopFrom.NONE.equals(stopFrom) || 
applicationService.checkAlter(application)) {
           if (StopFrom.NONE.equals(stopFrom)) {
             log.info(
-                "FlinkRESTAPIWatcher getFromFlinkRestApi, job cancel is not 
form StreamPark,savePoint expired!");
+                "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job 
cancel is not form StreamPark,savePoint expired!");
             savePointService.expire(application.getId());
           }
           stopCanceledJob(application.getId());
@@ -472,7 +479,7 @@ public class FlinkRESTAPIWatcher {
         break;
       case RESTARTING:
         log.info(
-            "FlinkRESTAPIWatcher getFromFlinkRestApi, job state {},add to 
starting",
+            "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job state 
{},add to starting",
             currentState.name());
         STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
         break;
@@ -487,10 +494,10 @@ public class FlinkRESTAPIWatcher {
    * status of the task is CANCELED</strong>
    *
    * @param application application
-   * @param stopFrom stopFrom
    */
-  private void getFromYarnRestApi(Application application, StopFrom stopFrom) 
throws Exception {
-    log.debug("FlinkRESTAPIWatcher getFromYarnRestApi starting...");
+  private void getFromYarnRestApi(Application application) throws Exception {
+    log.debug("[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi 
starting...");
+    StopFrom stopFrom = getStopFrom(application);
     OptionState optionState = OPTIONING.get(application.getId());
 
     /*
@@ -500,7 +507,7 @@ public class FlinkRESTAPIWatcher {
     */
     Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
     if (flag != null) {
-      log.info("FlinkRESTAPIWatcher previous state: canceling.");
+      log.info("[StreamPark][FlinkAppHttpWatcher] previous state: canceling.");
       FlinkAppState flinkAppState = FlinkAppState.CANCELED;
       try {
         YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
@@ -511,7 +518,7 @@ public class FlinkRESTAPIWatcher {
       } finally {
         if (StopFrom.NONE.equals(stopFrom)) {
           log.error(
-              "FlinkRESTAPIWatcher query previous state was canceling and 
stopFrom NotFound,savePoint expired!");
+              "[StreamPark][FlinkAppHttpWatcher] query previous state was 
canceling and stopFrom NotFound,savePoint expired!");
           savePointService.expire(application.getId());
           if (flinkAppState == FlinkAppState.KILLED || flinkAppState == 
FlinkAppState.FAILED) {
             alertService.alert(application, flinkAppState);
@@ -528,7 +535,8 @@ public class FlinkRESTAPIWatcher {
       YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
       if (yarnAppInfo == null) {
         if (!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
-          throw new RuntimeException("FlinkRESTAPIWatcher getFromYarnRestApi 
failed ");
+          throw new RuntimeException(
+              "[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi failed ");
         }
       } else {
         try {
@@ -540,7 +548,7 @@ public class FlinkRESTAPIWatcher {
           if (FlinkAppState.KILLED.equals(flinkAppState)) {
             if (StopFrom.NONE.equals(stopFrom)) {
               log.error(
-                  "FlinkRESTAPIWatcher getFromYarnRestApi,job was killed and 
stopFrom NotFound,savePoint expired!");
+                  "[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi,job 
was killed and stopFrom NotFound,savePoint expired!");
               savePointService.expire(application.getId());
             }
             flinkAppState = FlinkAppState.CANCELED;
@@ -565,7 +573,8 @@ public class FlinkRESTAPIWatcher {
           }
         } catch (Exception e) {
           if 
(!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
-            throw new RuntimeException("FlinkRESTAPIWatcher getFromYarnRestApi 
error,", e);
+            throw new RuntimeException(
+                "[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi error,", 
e);
           }
         }
       }
@@ -589,7 +598,7 @@ public class FlinkRESTAPIWatcher {
     if (isKubernetesApp(appId)) {
       return;
     }
-    log.info("FlinkRESTAPIWatcher setOptioning");
+    log.info("[StreamPark][FlinkAppHttpWatcher] setOptioning");
     OPTIONING.put(appId, state);
     if (state.equals(OptionState.CANCELLING)) {
       STOP_FROM_MAP.put(appId, StopFrom.STREAMPARK);
@@ -600,7 +609,7 @@ public class FlinkRESTAPIWatcher {
     if (application.isKubernetesModeJob()) {
       return;
     }
-    log.info("FlinkRESTAPIWatcher add app to tracking,appId:{}", 
application.getId());
+    log.info("[StreamPark][FlinkAppHttpWatcher] add app to tracking,appId:{}", 
application.getId());
     WATCHING_APPS.put(application.getId(), application);
     STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
   }
@@ -609,7 +618,7 @@ public class FlinkRESTAPIWatcher {
     if (isKubernetesApp(appId)) {
       return;
     }
-    log.info("FlinkRESTAPIWatcher add app to savepoint,appId:{}", appId);
+    log.info("[StreamPark][FlinkAppHttpWatcher] add app to 
savepoint,appId:{}", appId);
     SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
   }
 
@@ -617,7 +626,7 @@ public class FlinkRESTAPIWatcher {
     if (isKubernetesApp(appId)) {
       return;
     }
-    log.info("FlinkRESTAPIWatcher stop app,appId:{}", appId);
+    log.info("[StreamPark][FlinkAppHttpWatcher] stop app,appId:{}", appId);
     WATCHING_APPS.remove(appId);
   }
 
@@ -736,13 +745,14 @@ public class FlinkRESTAPIWatcher {
   }
 
   private <T> T yarnRestRequest(String url, Class<T> clazz) throws IOException 
{
-    String result = YarnUtils.restRequest(url);
+    String result = YarnUtils.restRequest(url, HTTP_TIMEOUT);
     return JacksonUtils.read(result, clazz);
   }
 
   private <T> T httpRestRequest(String url, Class<T> clazz) throws IOException 
{
     String result =
-        HttpClientUtils.httpGetRequest(url, 
RequestConfig.custom().setConnectTimeout(5000).build());
+        HttpClientUtils.httpGetRequest(
+            url, 
RequestConfig.custom().setConnectTimeout(HTTP_TIMEOUT).build());
     if (null == result) {
       return null;
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 9cc077718..55053f1f0 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -18,7 +18,6 @@
 package org.apache.streampark.console.core.task;
 
 import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.enums.FlinkAppState;
 import org.apache.streampark.console.core.enums.OptionState;
@@ -37,15 +36,12 @@ import 
org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher;
 
 import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
 
 import java.util.Date;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import scala.Enumeration;
 
@@ -54,6 +50,7 @@ import static 
org.apache.streampark.console.core.enums.FlinkAppState.Bridge.toK8
 
 /** Event Listener for K8sFlinkTrackMonitor */
 @Component
+@Slf4j
 public class FlinkK8sChangeEventListener {
 
   @Lazy @Autowired private ApplicationService applicationService;
@@ -62,16 +59,6 @@ public class FlinkK8sChangeEventListener {
 
   @Lazy @Autowired private CheckpointProcessor checkpointProcessor;
 
-  private final ExecutorService executor =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          20L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("streampark-notify-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
-
   /**
    * Catch FlinkJobStatusChangeEvent then storage it persistently to db. 
Actually update
    * org.apache.streampark.console.core.entity.Application records.
@@ -98,7 +85,7 @@ public class FlinkK8sChangeEventListener {
         || FlinkAppState.LOST.equals(state)
         || FlinkAppState.RESTARTING.equals(state)
         || FlinkAppState.FINISHED.equals(state)) {
-      executor.execute(() -> alertService.alert(app, state));
+      alertService.alert(app, state);
     }
   }
 

Reply via email to