This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch thread in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 0bd8255f79c0162930da7c1a0e436404c90571fd Author: benjobs <[email protected]> AuthorDate: Mon Feb 5 17:38:10 2024 +0800 [Improve] executors improvement --- .../core/service/impl/AppBuildPipeServiceImpl.java | 18 +++--------------- .../core/service/impl/ApplicationServiceImpl.java | 16 +++------------- .../core/service/impl/FlinkClusterServiceImpl.java | 18 ++++-------------- .../console/core/service/impl/ProjectServiceImpl.java | 17 ++--------------- .../core/service/impl/SavePointServiceImpl.java | 15 ++------------- .../console/core/task/FlinkK8sChangeEventListener.java | 17 ++++------------- .../console/core/task/FlinkRESTAPIWatcher.java | 15 ++++----------- 7 files changed, 22 insertions(+), 94 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 00f46799e..7b3b70058 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -22,7 +22,6 @@ import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.common.util.FileUtils; -import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.util.WebUtils; @@ -101,8 +100,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -135,16 +133,6 @@ public class AppBuildPipeServiceImpl @Autowired private ApplicationConfigService applicationConfigService; - private final ExecutorService executorService = - new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() * 5, - Runtime.getRuntime().availableProcessors() * 10, - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1024), - ThreadUtils.threadFactory("streampark-build-pipeline-executor"), - new ThreadPoolExecutor.AbortPolicy()); - private static final Cache<Long, DockerPullSnapshot> DOCKER_PULL_PG_SNAPSHOTS = Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.DAYS).build(); @@ -291,8 +279,8 @@ public class AppBuildPipeServiceImpl DOCKER_PULL_PG_SNAPSHOTS.invalidate(app.getId()); DOCKER_BUILD_PG_SNAPSHOTS.invalidate(app.getId()); DOCKER_PUSH_PG_SNAPSHOTS.invalidate(app.getId()); - // async release pipeline - executorService.submit((Runnable) pipeline::launch); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(pipeline::launch); return saved; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index bd189cdbe..0fc9d508a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -30,7 +30,6 @@ import org.apache.streampark.common.fs.LfsOperator; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.HadoopUtils; import org.apache.streampark.common.util.PropertiesUtils; -import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.common.util.YarnUtils; import org.apache.streampark.console.base.domain.RestRequest; @@ -149,8 +148,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -172,16 +170,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli private static final int DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT = 5; - private final ExecutorService executorService = - new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() * 5, - Runtime.getRuntime().availableProcessors() * 10, - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1024), - ThreadUtils.threadFactory("streampark-deploy-executor"), - new ThreadPoolExecutor.AbortPolicy()); - private static final Pattern JOB_NAME_PATTERN = Pattern.compile("^[.\\x{4e00}-\\x{9fa5}A-Za-z\\d_\\-\\s]+$"); @@ -1313,6 +1301,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli customSavepoint, namespace); + ExecutorService executorService = Executors.newSingleThreadExecutor(); final Date triggerTime = new Date(); CompletableFuture<CancelResponse> cancelFuture = CompletableFuture.supplyAsync(() -> FlinkClient.cancel(cancelRequest), executorService); @@ -1603,6 +1592,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli k8sNamespace, exposedType); + ExecutorService executorService = Executors.newSingleThreadExecutor(); CompletableFuture<SubmitResponse> future = CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), executorService); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index 88690f50e..777ce177c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -19,7 +19,6 @@ package org.apache.streampark.console.core.service.impl; import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.ExecutionMode; -import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.common.util.YarnUtils; import org.apache.streampark.console.base.exception.ApiAlertException; @@ -58,9 +57,8 @@ import java.util.Collection; import java.util.Date; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -73,16 +71,6 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli private static final String ERROR_CLUSTER_QUEUE_HINT = "Queue label '%s' isn't available in database, please add it first."; - private final ExecutorService executorService = - new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() * 5, - Runtime.getRuntime().availableProcessors() * 10, - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1024), - ThreadUtils.threadFactory("streampark-cluster-executor"), - new ThreadPoolExecutor.AbortPolicy()); - @Autowired private FlinkEnvService flinkEnvService; @Autowired private ServiceHelper serviceHelper; @@ -166,11 +154,12 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli + " is already running in the yarn queue, please check!"); } + ExecutorService executorService = Executors.newSingleThreadExecutor(); try { // 1) deployRequest DeployRequest deployRequest = getDeployRequest(flinkCluster); - log.info("deploy cluster request: {}", deployRequest); + log.info("deploy cluster request: {}", deployRequest); Future<DeployResponse> future = executorService.submit(() -> FlinkClient.deploy(deployRequest)); DeployResponse deployResponse = future.get(5, TimeUnit.SECONDS); @@ -310,6 +299,7 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli // 4) shutdown DeployRequest deployRequest = getDeployRequest(flinkCluster); try { + ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<ShutDownResponse> future = executorService.submit(() -> FlinkClient.shutdown(deployRequest)); ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java index 931dd712f..66ca9d9f6 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java @@ -21,7 +21,6 @@ import org.apache.streampark.common.conf.CommonConfig; import org.apache.streampark.common.conf.InternalConfigHolder; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.util.CompletableFutureUtils; -import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.ResponseCode; import org.apache.streampark.console.base.domain.RestRequest; @@ -67,9 +66,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @Slf4j @@ -82,16 +79,6 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project> @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher; - private final ExecutorService executorService = - new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() * 5, - Runtime.getRuntime().availableProcessors() * 10, - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1024), - ThreadUtils.threadFactory("streampark-build-executor"), - new ThreadPoolExecutor.AbortPolicy()); - @Override public RestResponse create(Project project) { LambdaQueryWrapper<Project> queryWrapper = @@ -222,7 +209,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project> flinkRESTAPIWatcher.init(); }); CompletableFuture<Void> buildTask = - CompletableFuture.runAsync(projectBuildTask, executorService); + CompletableFuture.runAsync(projectBuildTask, Executors.newSingleThreadExecutor()); // TODO May need to define parameters to set the build timeout in the future. CompletableFutureUtils.runTimeout(buildTask, 20, TimeUnit.MINUTES); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java index 8dde05c8b..81242848a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java @@ -20,7 +20,6 @@ package org.apache.streampark.console.core.service.impl; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.common.util.CompletableFutureUtils; import org.apache.streampark.common.util.PropertiesUtils; -import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.ApiAlertException; @@ -72,8 +71,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -95,16 +93,6 @@ public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher; - private final ExecutorService executorService = - new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() * 5, - Runtime.getRuntime().availableProcessors() * 10, - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1024), - ThreadUtils.threadFactory("trigger-savepoint-executor"), - new ThreadPoolExecutor.AbortPolicy()); - @Override public void expire(Long appId) { SavePoint savePoint = new SavePoint(); @@ -315,6 +303,7 @@ public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint customSavepoint, application.getK8sNamespace()); + ExecutorService executorService = Executors.newSingleThreadExecutor(); CompletableFuture<SavepointResponse> savepointFuture = CompletableFuture.supplyAsync(() -> FlinkClient.triggerSavepoint(request), executorService); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java index 9cc077718..598a6ec37 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java @@ -43,9 +43,7 @@ import org.springframework.stereotype.Component; import java.util.Date; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executors; import scala.Enumeration; @@ -62,15 +60,8 @@ public class FlinkK8sChangeEventListener { @Lazy @Autowired private CheckpointProcessor checkpointProcessor; - private final ExecutorService executor = - new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() * 5, - Runtime.getRuntime().availableProcessors() * 10, - 20L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1024), - ThreadUtils.threadFactory("streampark-notify-executor"), - new ThreadPoolExecutor.AbortPolicy()); + private final ExecutorService notifyExecutor = + Executors.newCachedThreadPool(ThreadUtils.threadFactory("streampark-notify-executor")); /** * Catch FlinkJobStatusChangeEvent then storage it persistently to db. Actually update @@ -98,7 +89,7 @@ public class FlinkK8sChangeEventListener { || FlinkAppState.LOST.equals(state) || FlinkAppState.RESTARTING.equals(state) || FlinkAppState.FINISHED.equals(state)) { - executor.execute(() -> alertService.alert(app, state)); + notifyExecutor.execute(() -> alertService.alert(app, state)); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java index f1b1b5a61..8dd566ab0 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java @@ -60,8 +60,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -142,14 +141,8 @@ public class FlinkRESTAPIWatcher { private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0"); - private static final ExecutorService EXECUTOR = - new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() * 5, - Runtime.getRuntime().availableProcessors() * 10, - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1024), - ThreadUtils.threadFactory("flink-restapi-watching-executor")); + private static final ExecutorService watchExecutor = + Executors.newCachedThreadPool(ThreadUtils.threadFactory("flink-restapi-watching-executor")); @PostConstruct public void init() { @@ -201,7 +194,7 @@ public class FlinkRESTAPIWatcher { } private void watch(Long key, Application application) { - EXECUTOR.execute( + watchExecutor.execute( () -> { final StopFrom stopFrom = STOP_FROM_MAP.getOrDefault(key, null) == null
