This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new f71549b30 [Improve#3068] Improve streampark-console module base on
[3.5 Concurrent Processing] (#3435)
f71549b30 is described below
commit f71549b30a61850a6be7b7cf250a42a3d99030d6
Author: zhengke zhou <[email protected]>
AuthorDate: Sat Dec 30 10:15:51 2023 +0800
[Improve#3068] Improve streampark-console module base on [3.5 Concurrent
Processing] (#3435)
* [Improve] Unified management of thread pools
* [Improve] replace thread pool in ApplicationActionServiceImpl
* [Improve] replace thread pool in FlinkAppHttpWatcher
* [Improve] replace thread pool in FlinkClusterWatcher
* [Improve] replace thread pool in ProjectServiceImpl
* [Improve] replace thread pool in SavePointServiceImpl
* [Improve] replace thread pool in FlinkK8sChangeEventListener
* [Improve] replace thread pool in FlinkClusterServiceImpl
AppBuildPipeServiceImpl
---
.../base/config/AsyncExecutorPoolConfig.java | 105 +++++++++++++++++++++
.../impl/ApplicationActionServiceImpl.java | 19 +---
.../core/service/impl/AppBuildPipeServiceImpl.java | 16 +---
.../core/service/impl/FlinkClusterServiceImpl.java | 16 +---
.../core/service/impl/ProjectServiceImpl.java | 18 +---
.../core/service/impl/SavePointServiceImpl.java | 18 +---
.../console/core/watcher/FlinkAppHttpWatcher.java | 21 ++---
.../console/core/watcher/FlinkClusterWatcher.java | 22 ++---
.../core/watcher/FlinkK8sChangeEventListener.java | 22 ++---
9 files changed, 149 insertions(+), 108 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
index b50ffe289..5222c0cfb 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java
@@ -17,13 +17,18 @@
package org.apache.streampark.console.base.config;
+import org.apache.streampark.common.util.ThreadUtils;
+
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurerSupport;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
@Configuration
public class AsyncExecutorPoolConfig extends AsyncConfigurerSupport {
@@ -40,4 +45,104 @@ public class AsyncExecutorPoolConfig extends
AsyncConfigurerSupport {
executor.setRejectedExecutionHandler(new
ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
+
+ @Bean("triggerSavepointExecutor")
+ public Executor savepointExecutor() {
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
+ executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
+ executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
+ executor.setQueueCapacity(1024);
+ executor.setKeepAliveSeconds(60);
+ executor.setThreadNamePrefix("trigger-savepoint-executor-");
+ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
+ return executor;
+ }
+
+ @Bean("flinkRestAPIWatchingExecutor")
+ public Executor restAPIWatchingExecutor() {
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
+ executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
+ executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
+ executor.setQueueCapacity(1024);
+ executor.setKeepAliveSeconds(60);
+ executor.setThreadNamePrefix("flink-restapi-watching-executor-");
+ return executor;
+ }
+
+ @Bean("flinkClusterWatchingExecutor")
+ public Executor clusterWatchingExecutor() {
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
+ executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
+ executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
+ executor.setQueueCapacity(1024);
+ executor.setKeepAliveSeconds(60);
+ executor.setThreadNamePrefix("flink-cluster-watching-executor-");
+ return executor;
+ }
+
+ @Bean("streamparkBuildPipelineExecutor")
+ public ExecutorService pipelineExecutor() {
+ return new ThreadPoolExecutor(
+ Runtime.getRuntime().availableProcessors() * 5,
+ Runtime.getRuntime().availableProcessors() * 10,
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(1024),
+ ThreadUtils.threadFactory("streampark-build-pipeline-executor"),
+ new ThreadPoolExecutor.AbortPolicy());
+ }
+
+ @Bean("streamparkClusterExecutor")
+ public ExecutorService clusterExecutor() {
+ return new ThreadPoolExecutor(
+ Runtime.getRuntime().availableProcessors() * 5,
+ Runtime.getRuntime().availableProcessors() * 10,
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(1024),
+ ThreadUtils.threadFactory("streampark-cluster-executor"),
+ new ThreadPoolExecutor.AbortPolicy());
+ }
+
+ @Bean("streamparkNotifyExecutor")
+ public Executor notifyExecutor() {
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
+ executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
+ executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
+ executor.setQueueCapacity(1024);
+ executor.setKeepAliveSeconds(20);
+ executor.setThreadNamePrefix("streampark-notify-executor-");
+ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
+ return executor;
+ }
+
+ @Bean("streamparkDeployExecutor")
+ public Executor deployExecutor() {
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
+ executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
+ executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
+ executor.setQueueCapacity(1024);
+ executor.setKeepAliveSeconds(60);
+ executor.setThreadNamePrefix("streampark-deploy-executor-");
+ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
+ return executor;
+ }
+
+ @Bean("streamparkBuildExecutor")
+ public Executor buildExecutor() {
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
+ executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
+ executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
+ executor.setQueueCapacity(1024);
+ executor.setKeepAliveSeconds(60);
+ executor.setThreadNamePrefix("streampark-build-executor-");
+ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
+ return executor;
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 963a92d45..86384bab2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -31,7 +31,6 @@ import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.PropertiesUtils;
-import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApplicationException;
@@ -101,6 +100,7 @@ import com.google.common.collect.Sets;
import io.fabric8.kubernetes.client.KubernetesClientException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@@ -117,10 +117,7 @@ import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
import static
org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper.Bridge.toTrackId;
import static
org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper.isKubernetesApp;
@@ -131,15 +128,9 @@ import static
org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper.
public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper, Application>
implements ApplicationActionService {
- private final ExecutorService executorService =
- new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors() * 5,
- Runtime.getRuntime().availableProcessors() * 10,
- 60L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(1024),
- ThreadUtils.threadFactory("streampark-deploy-executor"),
- new ThreadPoolExecutor.AbortPolicy());
+ @Qualifier("streamparkDeployExecutor")
+ @Autowired
+ private Executor executorService;
@Autowired private ApplicationBackUpService backUpService;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 36111230e..6d11c21ea 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -26,7 +26,6 @@ import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.FileUtils;
-import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.util.JacksonUtils;
@@ -96,6 +95,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@@ -111,8 +111,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -149,15 +147,9 @@ public class AppBuildPipeServiceImpl
@Autowired private ResourceService resourceService;
- private final ExecutorService executorService =
- new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors() * 5,
- Runtime.getRuntime().availableProcessors() * 10,
- 60L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(1024),
- ThreadUtils.threadFactory("streampark-build-pipeline-executor"),
- new ThreadPoolExecutor.AbortPolicy());
+ @Qualifier("streamparkBuildPipelineExecutor")
+ @Autowired
+ private ExecutorService executorService;
private static final Cache<Long, DockerPullSnapshot>
DOCKER_PULL_PG_SNAPSHOTS =
Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.DAYS).build();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index b85313126..9400da6d9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkExecutionMode;
-import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;
@@ -50,6 +49,7 @@ import
com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@@ -63,8 +63,6 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -78,15 +76,9 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
private static final String ERROR_CLUSTER_QUEUE_HINT =
"Queue label '%s' isn't available in database, please add it first.";
- private final ExecutorService executorService =
- new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors() * 5,
- Runtime.getRuntime().availableProcessors() * 10,
- 60L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(1024),
- ThreadUtils.threadFactory("streampark-cluster-executor"),
- new ThreadPoolExecutor.AbortPolicy());
+ @Qualifier("streamparkClusterExecutor")
+ @Autowired
+ private ExecutorService executorService;
@Autowired private FlinkEnvService flinkEnvService;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 55ca1dc53..b8abe3fc7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -23,7 +23,6 @@ import org.apache.streampark.common.conf.InternalConfigHolder;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.FileUtils;
-import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.ResponseCode;
import org.apache.streampark.console.base.domain.RestRequest;
@@ -50,6 +49,7 @@ import
com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@@ -68,9 +68,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -85,15 +83,9 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
@Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
- private final ExecutorService executorService =
- new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors() * 5,
- Runtime.getRuntime().availableProcessors() * 10,
- 60L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(1024),
- ThreadUtils.threadFactory("streampark-build-executor"),
- new ThreadPoolExecutor.AbortPolicy());
+ @Qualifier("streamparkBuildExecutor")
+ @Autowired
+ private Executor executorService;
@Override
public RestResponse create(Project project) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 6d71af900..87052ad8f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.ExceptionUtils;
-import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.Constant;
import org.apache.streampark.console.base.domain.RestRequest;
@@ -61,6 +60,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@@ -74,9 +74,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -105,15 +103,9 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
@Autowired private CommonServiceImpl commonService;
- private final ExecutorService executorService =
- new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors() * 5,
- Runtime.getRuntime().availableProcessors() * 10,
- 60L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(1024),
- ThreadUtils.threadFactory("trigger-savepoint-executor"),
- new ThreadPoolExecutor.AbortPolicy());
+ @Qualifier("triggerSavepointExecutor")
+ @Autowired
+ private Executor executorService;
@Override
public void expire(Long appId) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index 8d200d530..b4bf9c8b5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -19,7 +19,6 @@ package org.apache.streampark.console.core.watcher;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.util.HttpClientUtils;
-import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
@@ -52,6 +51,7 @@ import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -68,9 +68,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -95,6 +93,10 @@ public class FlinkAppHttpWatcher {
@Autowired private FlinkClusterWatcher flinkClusterWatcher;
+ @Qualifier("flinkRestAPIWatchingExecutor")
+ @Autowired
+ private Executor executorService;
+
// track interval every 5 seconds
public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
@@ -158,15 +160,6 @@ public class FlinkAppHttpWatcher {
private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0");
- private static final ExecutorService EXECUTOR =
- new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors() * 5,
- Runtime.getRuntime().availableProcessors() * 10,
- 60L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(1024),
- ThreadUtils.threadFactory("flink-restapi-watching-executor"));
-
@PostConstruct
public void init() {
WATCHING_APPS.clear();
@@ -217,7 +210,7 @@ public class FlinkAppHttpWatcher {
}
private void watch(Long id, Application application) {
- EXECUTOR.execute(
+ executorService.execute(
() -> {
try {
// query status from flink rest api
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
index 421668303..dada561c0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
@@ -23,7 +23,6 @@ import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.HttpClientUtils;
-import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.core.bean.AlertTemplate;
@@ -43,6 +42,7 @@ import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -53,9 +53,7 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/** This implementation is currently used for tracing Cluster on
yarn,remote,K8s mode */
@@ -69,6 +67,10 @@ public class FlinkClusterWatcher {
@Autowired private ApplicationInfoService applicationInfoService;
+ @Qualifier("flinkClusterWatchingExecutor")
+ @Autowired
+ private Executor executorService;
+
private Long lastWatchTime = 0L;
// Track interval every 30 seconds
@@ -82,16 +84,6 @@ public class FlinkClusterWatcher {
private boolean immediateWatch = false;
- /** Thread pool for processing status monitoring for each cluster */
- private static final ExecutorService EXECUTOR =
- new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors() * 5,
- Runtime.getRuntime().availableProcessors() * 10,
- 60L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(1024),
- ThreadUtils.threadFactory("flink-cluster-watching-executor"));
-
/** Initialize cluster cache */
@PostConstruct
private void init() {
@@ -113,7 +105,7 @@ public class FlinkClusterWatcher {
immediateWatch = false;
WATCHER_CLUSTERS.forEach(
(aLong, flinkCluster) ->
- EXECUTOR.execute(
+ executorService.execute(
() -> {
ClusterState state = getClusterState(flinkCluster);
switch (state) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java
index 358f356fc..0284b4346 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java
@@ -18,7 +18,6 @@
package org.apache.streampark.console.core.watcher;
import org.apache.streampark.common.enums.FlinkExecutionMode;
-import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.component.FlinkCheckpointProcessor;
import org.apache.streampark.console.core.entity.Application;
@@ -42,14 +41,12 @@ import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import java.util.Date;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
import scala.Enumeration;
@@ -69,21 +66,16 @@ import static
org.apache.streampark.console.core.enums.FlinkAppStateEnum.Bridge.
public class FlinkK8sChangeEventListener {
@Lazy @Autowired private ApplicationManageService applicationManageService;
+
@Autowired private ApplicationInfoService applicationInfoService;
@Lazy @Autowired private AlertService alertService;
@Lazy @Autowired private FlinkCheckpointProcessor checkpointProcessor;
- private final ExecutorService executor =
- new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors() * 5,
- Runtime.getRuntime().availableProcessors() * 10,
- 20L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(1024),
- ThreadUtils.threadFactory("streampark-notify-executor"),
- new ThreadPoolExecutor.AbortPolicy());
+ @Qualifier("streamparkNotifyExecutor")
+ @Autowired
+ private Executor executorService;
/**
* Catch FlinkJobStatusChangeEvent then storage it persistently to db.
Actually update
@@ -110,7 +102,7 @@ public class FlinkK8sChangeEventListener {
|| FlinkAppStateEnum.LOST == state
|| FlinkAppStateEnum.RESTARTING == state
|| FlinkAppStateEnum.FINISHED == state) {
- executor.execute(
+ executorService.execute(
() -> {
if (app.getProbing()) {
log.info("application with id {} is probing, don't send alert",
app.getId());