This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new f71549b30 [Improve#3068] Improve streampark-console module base on 
[3.5 Concurrent Processing] (#3435)
f71549b30 is described below

commit f71549b30a61850a6be7b7cf250a42a3d99030d6
Author: zhengke zhou <[email protected]>
AuthorDate: Sat Dec 30 10:15:51 2023 +0800

    [Improve#3068] Improve streampark-console module base on [3.5 Concurrent 
Processing] (#3435)
    
    * [Improve] Unified management of thread pools
    
    * [Improve] replace thread pool in ApplicationActionServiceImpl
    
    * [Improve] replace thread pool in FlinkAppHttpWatcher
    
    * [Improve] replace thread pool in FlinkClusterWatcher
    
    * [Improve] replace thread pool in ProjectServiceImpl
    
    * [Improve] replace thread pool in SavePointServiceImpl
    
    * [Improve] replace thread pool in FlinkK8sChangeEventListener
    
    * [Improve] replace thread pool in FlinkClusterServiceImpl 
AppBuildPipeServiceImpl
---
 .../base/config/AsyncExecutorPoolConfig.java       | 105 +++++++++++++++++++++
 .../impl/ApplicationActionServiceImpl.java         |  19 +---
 .../core/service/impl/AppBuildPipeServiceImpl.java |  16 +---
 .../core/service/impl/FlinkClusterServiceImpl.java |  16 +---
 .../core/service/impl/ProjectServiceImpl.java      |  18 +---
 .../core/service/impl/SavePointServiceImpl.java    |  18 +---
 .../console/core/watcher/FlinkAppHttpWatcher.java  |  21 ++---
 .../console/core/watcher/FlinkClusterWatcher.java  |  22 ++---
 .../core/watcher/FlinkK8sChangeEventListener.java  |  22 ++---
 9 files changed, 149 insertions(+), 108 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
index b50ffe289..5222c0cfb 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
@@ -17,13 +17,18 @@
 
 package org.apache.streampark.console.base.config;
 
+import org.apache.streampark.common.util.ThreadUtils;
+
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.scheduling.annotation.AsyncConfigurerSupport;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 @Configuration
 public class AsyncExecutorPoolConfig extends AsyncConfigurerSupport {
@@ -40,4 +45,104 @@ public class AsyncExecutorPoolConfig extends 
AsyncConfigurerSupport {
     executor.setRejectedExecutionHandler(new 
ThreadPoolExecutor.CallerRunsPolicy());
     return executor;
   }
+
+  @Bean("triggerSavepointExecutor")
+  public Executor savepointExecutor() {
+    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
+    executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
+    executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
+    executor.setQueueCapacity(1024);
+    executor.setKeepAliveSeconds(60);
+    executor.setThreadNamePrefix("trigger-savepoint-executor-");
+    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
+    return executor;
+  }
+
+  @Bean("flinkRestAPIWatchingExecutor")
+  public Executor restAPIWatchingExecutor() {
+    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
+    executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
+    executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
+    executor.setQueueCapacity(1024);
+    executor.setKeepAliveSeconds(60);
+    executor.setThreadNamePrefix("flink-restapi-watching-executor-");
+    return executor;
+  }
+
+  @Bean("flinkClusterWatchingExecutor")
+  public Executor clusterWatchingExecutor() {
+    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
+    executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
+    executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
+    executor.setQueueCapacity(1024);
+    executor.setKeepAliveSeconds(60);
+    executor.setThreadNamePrefix("flink-cluster-watching-executor-");
+    return executor;
+  }
+
+  @Bean("streamparkBuildPipelineExecutor")
+  public ExecutorService pipelineExecutor() {
+    return new ThreadPoolExecutor(
+        Runtime.getRuntime().availableProcessors() * 5,
+        Runtime.getRuntime().availableProcessors() * 10,
+        60L,
+        TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(1024),
+        ThreadUtils.threadFactory("streampark-build-pipeline-executor"),
+        new ThreadPoolExecutor.AbortPolicy());
+  }
+
+  @Bean("streamparkClusterExecutor")
+  public ExecutorService clusterExecutor() {
+    return new ThreadPoolExecutor(
+        Runtime.getRuntime().availableProcessors() * 5,
+        Runtime.getRuntime().availableProcessors() * 10,
+        60L,
+        TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(1024),
+        ThreadUtils.threadFactory("streampark-cluster-executor"),
+        new ThreadPoolExecutor.AbortPolicy());
+  }
+
+  @Bean("streamparkNotifyExecutor")
+  public Executor notifyExecutor() {
+    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
+    executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
+    executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
+    executor.setQueueCapacity(1024);
+    executor.setKeepAliveSeconds(20);
+    executor.setThreadNamePrefix("streampark-notify-executor-");
+    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
+    return executor;
+  }
+
+  @Bean("streamparkDeployExecutor")
+  public Executor deployExecutor() {
+    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
+    executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
+    executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
+    executor.setQueueCapacity(1024);
+    executor.setKeepAliveSeconds(60);
+    executor.setThreadNamePrefix("streampark-deploy-executor-");
+    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
+    return executor;
+  }
+
+  @Bean("streamparkBuildExecutor")
+  public Executor buildExecutor() {
+    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
+    executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
+    executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
+    executor.setQueueCapacity(1024);
+    executor.setKeepAliveSeconds(60);
+    executor.setThreadNamePrefix("streampark-build-executor-");
+    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
+    return executor;
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 963a92d45..86384bab2 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -31,7 +31,6 @@ import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.ExceptionUtils;
 import org.apache.streampark.common.util.HadoopUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
-import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApplicationException;
@@ -101,6 +100,7 @@ import com.google.common.collect.Sets;
 import io.fabric8.kubernetes.client.KubernetesClientException;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
@@ -117,10 +117,7 @@ import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
 
 import static 
org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper.Bridge.toTrackId;
 import static 
org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper.isKubernetesApp;
@@ -131,15 +128,9 @@ import static 
org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper.
 public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper, Application>
     implements ApplicationActionService {
 
-  private final ExecutorService executorService =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          60L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("streampark-deploy-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
+  @Qualifier("streamparkDeployExecutor")
+  @Autowired
+  private Executor executorService;
 
   @Autowired private ApplicationBackUpService backUpService;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 36111230e..6d11c21ea 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -26,7 +26,6 @@ import org.apache.streampark.common.enums.FlinkExecutionMode;
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.common.util.ExceptionUtils;
 import org.apache.streampark.common.util.FileUtils;
-import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.util.JacksonUtils;
@@ -96,6 +95,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import lombok.extern.slf4j.Slf4j;
 import org.jetbrains.annotations.NotNull;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
@@ -111,8 +111,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -149,15 +147,9 @@ public class AppBuildPipeServiceImpl
 
   @Autowired private ResourceService resourceService;
 
-  private final ExecutorService executorService =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          60L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("streampark-build-pipeline-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
+  @Qualifier("streamparkBuildPipelineExecutor")
+  @Autowired
+  private ExecutorService executorService;
 
   private static final Cache<Long, DockerPullSnapshot> 
DOCKER_PULL_PG_SNAPSHOTS =
       Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.DAYS).build();
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index b85313126..9400da6d9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.streampark.console.core.service.impl;
 import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.FlinkExecutionMode;
-import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApiDetailException;
@@ -50,6 +49,7 @@ import 
com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.google.common.annotations.VisibleForTesting;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
@@ -63,8 +63,6 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
@@ -78,15 +76,9 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   private static final String ERROR_CLUSTER_QUEUE_HINT =
       "Queue label '%s' isn't available in database, please add it first.";
 
-  private final ExecutorService executorService =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          60L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("streampark-cluster-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
+  @Qualifier("streamparkClusterExecutor")
+  @Autowired
+  private ExecutorService executorService;
 
   @Autowired private FlinkEnvService flinkEnvService;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 55ca1dc53..b8abe3fc7 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -23,7 +23,6 @@ import org.apache.streampark.common.conf.InternalConfigHolder;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.FileUtils;
-import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.ResponseCode;
 import org.apache.streampark.console.base.domain.RestRequest;
@@ -50,6 +49,7 @@ import 
com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
@@ -68,9 +68,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -85,15 +83,9 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
 
   @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
 
-  private final ExecutorService executorService =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          60L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("streampark-build-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
+  @Qualifier("streamparkBuildExecutor")
+  @Autowired
+  private Executor executorService;
 
   @Override
   public RestResponse create(Project project) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 6d71af900..87052ad8f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.streampark.console.core.service.impl;
 import org.apache.streampark.common.enums.FlinkExecutionMode;
 import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.ExceptionUtils;
-import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.Constant;
 import org.apache.streampark.console.base.domain.RestRequest;
@@ -61,6 +60,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.annotations.VisibleForTesting;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
@@ -74,9 +74,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -105,15 +103,9 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
 
   @Autowired private CommonServiceImpl commonService;
 
-  private final ExecutorService executorService =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          60L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("trigger-savepoint-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
+  @Qualifier("triggerSavepointExecutor")
+  @Autowired
+  private Executor executorService;
 
   @Override
   public void expire(Long appId) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index 8d200d530..b4bf9c8b5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -19,7 +19,6 @@ package org.apache.streampark.console.core.watcher;
 
 import org.apache.streampark.common.enums.FlinkExecutionMode;
 import org.apache.streampark.common.util.HttpClientUtils;
-import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.util.JacksonUtils;
@@ -52,6 +51,7 @@ import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
@@ -68,9 +68,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -95,6 +93,10 @@ public class FlinkAppHttpWatcher {
 
   @Autowired private FlinkClusterWatcher flinkClusterWatcher;
 
+  @Qualifier("flinkRestAPIWatchingExecutor")
+  @Autowired
+  private Executor executorService;
+
   // track interval  every 5 seconds
   public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
 
@@ -158,15 +160,6 @@ public class FlinkAppHttpWatcher {
 
   private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0");
 
-  private static final ExecutorService EXECUTOR =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          60L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("flink-restapi-watching-executor"));
-
   @PostConstruct
   public void init() {
     WATCHING_APPS.clear();
@@ -217,7 +210,7 @@ public class FlinkAppHttpWatcher {
   }
 
   private void watch(Long id, Application application) {
-    EXECUTOR.execute(
+    executorService.execute(
         () -> {
           try {
             // query status from flink rest api
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
index 421668303..dada561c0 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
@@ -23,7 +23,6 @@ import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.FlinkExecutionMode;
 import org.apache.streampark.common.util.HadoopUtils;
 import org.apache.streampark.common.util.HttpClientUtils;
-import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.util.JacksonUtils;
 import org.apache.streampark.console.core.bean.AlertTemplate;
@@ -43,6 +42,7 @@ import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
@@ -53,9 +53,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 /** This implementation is currently used for tracing Cluster on 
yarn,remote,K8s mode */
@@ -69,6 +67,10 @@ public class FlinkClusterWatcher {
 
   @Autowired private ApplicationInfoService applicationInfoService;
 
+  @Qualifier("flinkClusterWatchingExecutor")
+  @Autowired
+  private Executor executorService;
+
   private Long lastWatchTime = 0L;
 
   // Track interval  every 30 seconds
@@ -82,16 +84,6 @@ public class FlinkClusterWatcher {
 
   private boolean immediateWatch = false;
 
-  /** Thread pool for processing status monitoring for each cluster */
-  private static final ExecutorService EXECUTOR =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          60L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("flink-cluster-watching-executor"));
-
   /** Initialize cluster cache */
   @PostConstruct
   private void init() {
@@ -113,7 +105,7 @@ public class FlinkClusterWatcher {
       immediateWatch = false;
       WATCHER_CLUSTERS.forEach(
           (aLong, flinkCluster) ->
-              EXECUTOR.execute(
+              executorService.execute(
                   () -> {
                     ClusterState state = getClusterState(flinkCluster);
                     switch (state) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java
index 358f356fc..0284b4346 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java
@@ -18,7 +18,6 @@
 package org.apache.streampark.console.core.watcher;
 
 import org.apache.streampark.common.enums.FlinkExecutionMode;
-import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.console.core.bean.AlertTemplate;
 import org.apache.streampark.console.core.component.FlinkCheckpointProcessor;
 import org.apache.streampark.console.core.entity.Application;
@@ -42,14 +41,12 @@ import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
 
 import java.util.Date;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
 
 import scala.Enumeration;
 
@@ -69,21 +66,16 @@ import static 
org.apache.streampark.console.core.enums.FlinkAppStateEnum.Bridge.
 public class FlinkK8sChangeEventListener {
 
   @Lazy @Autowired private ApplicationManageService applicationManageService;
+
   @Autowired private ApplicationInfoService applicationInfoService;
 
   @Lazy @Autowired private AlertService alertService;
 
   @Lazy @Autowired private FlinkCheckpointProcessor checkpointProcessor;
 
-  private final ExecutorService executor =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          20L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("streampark-notify-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
+  @Qualifier("streamparkNotifyExecutor")
+  @Autowired
+  private Executor executorService;
 
   /**
    * Catch FlinkJobStatusChangeEvent then storage it persistently to db. 
Actually update
@@ -110,7 +102,7 @@ public class FlinkK8sChangeEventListener {
         || FlinkAppStateEnum.LOST == state
         || FlinkAppStateEnum.RESTARTING == state
         || FlinkAppStateEnum.FINISHED == state) {
-      executor.execute(
+      executorService.execute(
           () -> {
             if (app.getProbing()) {
               log.info("application with id {} is probing, don't send alert", 
app.getId());

Reply via email to