This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch thread
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 0bd8255f79c0162930da7c1a0e436404c90571fd
Author: benjobs <[email protected]>
AuthorDate: Mon Feb 5 17:38:10 2024 +0800

    [Improve] executors improvement
---
 .../core/service/impl/AppBuildPipeServiceImpl.java     | 18 +++---------------
 .../core/service/impl/ApplicationServiceImpl.java      | 16 +++-------------
 .../core/service/impl/FlinkClusterServiceImpl.java     | 18 ++++--------------
 .../console/core/service/impl/ProjectServiceImpl.java  | 17 ++---------------
 .../core/service/impl/SavePointServiceImpl.java        | 15 ++-------------
 .../console/core/task/FlinkK8sChangeEventListener.java | 17 ++++-------------
 .../console/core/task/FlinkRESTAPIWatcher.java         | 15 ++++-----------
 7 files changed, 22 insertions(+), 94 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 00f46799e..7b3b70058 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -22,7 +22,6 @@ import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.common.util.FileUtils;
-import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.util.WebUtils;
@@ -101,8 +100,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -135,16 +133,6 @@ public class AppBuildPipeServiceImpl
 
   @Autowired private ApplicationConfigService applicationConfigService;
 
-  private final ExecutorService executorService =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          60L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("streampark-build-pipeline-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
-
   private static final Cache<Long, DockerPullSnapshot> 
DOCKER_PULL_PG_SNAPSHOTS =
       Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.DAYS).build();
 
@@ -291,8 +279,8 @@ public class AppBuildPipeServiceImpl
     DOCKER_PULL_PG_SNAPSHOTS.invalidate(app.getId());
     DOCKER_BUILD_PG_SNAPSHOTS.invalidate(app.getId());
     DOCKER_PUSH_PG_SNAPSHOTS.invalidate(app.getId());
-    // async release pipeline
-    executorService.submit((Runnable) pipeline::launch);
+    ExecutorService executorService = Executors.newSingleThreadExecutor();
+    executorService.submit(pipeline::launch);
     return saved;
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index bd189cdbe..0fc9d508a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -30,7 +30,6 @@ import org.apache.streampark.common.fs.LfsOperator;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.HadoopUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
-import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
@@ -149,8 +148,7 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -172,16 +170,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
   private static final int DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT = 5;
 
-  private final ExecutorService executorService =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          60L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("streampark-deploy-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
-
   private static final Pattern JOB_NAME_PATTERN =
       Pattern.compile("^[.\\x{4e00}-\\x{9fa5}A-Za-z\\d_\\-\\s]+$");
 
@@ -1313,6 +1301,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             customSavepoint,
             namespace);
 
+    ExecutorService executorService = Executors.newSingleThreadExecutor();
     final Date triggerTime = new Date();
     CompletableFuture<CancelResponse> cancelFuture =
         CompletableFuture.supplyAsync(() -> FlinkClient.cancel(cancelRequest), 
executorService);
@@ -1603,6 +1592,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             k8sNamespace,
             exposedType);
 
+    ExecutorService executorService = Executors.newSingleThreadExecutor();
     CompletableFuture<SubmitResponse> future =
         CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), 
executorService);
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 88690f50e..777ce177c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
@@ -58,9 +57,8 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -73,16 +71,6 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   private static final String ERROR_CLUSTER_QUEUE_HINT =
       "Queue label '%s' isn't available in database, please add it first.";
 
-  private final ExecutorService executorService =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          60L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("streampark-cluster-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
-
   @Autowired private FlinkEnvService flinkEnvService;
 
   @Autowired private ServiceHelper serviceHelper;
@@ -166,11 +154,12 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
               + " is already running in the yarn queue, please check!");
     }
 
+    ExecutorService executorService = Executors.newSingleThreadExecutor();
     try {
       // 1) deployRequest
       DeployRequest deployRequest = getDeployRequest(flinkCluster);
-      log.info("deploy cluster request: {}", deployRequest);
 
+      log.info("deploy cluster request: {}", deployRequest);
       Future<DeployResponse> future =
           executorService.submit(() -> FlinkClient.deploy(deployRequest));
       DeployResponse deployResponse = future.get(5, TimeUnit.SECONDS);
@@ -310,6 +299,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     // 4) shutdown
     DeployRequest deployRequest = getDeployRequest(flinkCluster);
     try {
+      ExecutorService executorService = Executors.newSingleThreadExecutor();
       Future<ShutDownResponse> future =
           executorService.submit(() -> FlinkClient.shutdown(deployRequest));
       ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 931dd712f..66ca9d9f6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -21,7 +21,6 @@ import org.apache.streampark.common.conf.CommonConfig;
 import org.apache.streampark.common.conf.InternalConfigHolder;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.util.CompletableFutureUtils;
-import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.ResponseCode;
 import org.apache.streampark.console.base.domain.RestRequest;
@@ -67,9 +66,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
@@ -82,16 +79,6 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
 
   @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
 
-  private final ExecutorService executorService =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          60L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("streampark-build-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
-
   @Override
   public RestResponse create(Project project) {
     LambdaQueryWrapper<Project> queryWrapper =
@@ -222,7 +209,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
               flinkRESTAPIWatcher.init();
             });
     CompletableFuture<Void> buildTask =
-        CompletableFuture.runAsync(projectBuildTask, executorService);
+        CompletableFuture.runAsync(projectBuildTask, 
Executors.newSingleThreadExecutor());
     // TODO May need to define parameters to set the build timeout in the 
future.
     CompletableFutureUtils.runTimeout(buildTask, 20, TimeUnit.MINUTES);
   }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 8dde05c8b..81242848a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.streampark.console.core.service.impl;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
-import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
@@ -72,8 +71,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -95,16 +93,6 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
 
   @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
 
-  private final ExecutorService executorService =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          60L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("trigger-savepoint-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
-
   @Override
   public void expire(Long appId) {
     SavePoint savePoint = new SavePoint();
@@ -315,6 +303,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
             customSavepoint,
             application.getK8sNamespace());
 
+    ExecutorService executorService = Executors.newSingleThreadExecutor();
     CompletableFuture<SavepointResponse> savepointFuture =
         CompletableFuture.supplyAsync(() -> 
FlinkClient.triggerSavepoint(request), executorService);
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 9cc077718..598a6ec37 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -43,9 +43,7 @@ import org.springframework.stereotype.Component;
 
 import java.util.Date;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
 
 import scala.Enumeration;
 
@@ -62,15 +60,8 @@ public class FlinkK8sChangeEventListener {
 
   @Lazy @Autowired private CheckpointProcessor checkpointProcessor;
 
-  private final ExecutorService executor =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          20L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("streampark-notify-executor"),
-          new ThreadPoolExecutor.AbortPolicy());
+  private final ExecutorService notifyExecutor =
+      
Executors.newCachedThreadPool(ThreadUtils.threadFactory("streampark-notify-executor"));
 
   /**
    * Catch FlinkJobStatusChangeEvent then storage it persistently to db. 
Actually update
@@ -98,7 +89,7 @@ public class FlinkK8sChangeEventListener {
         || FlinkAppState.LOST.equals(state)
         || FlinkAppState.RESTARTING.equals(state)
         || FlinkAppState.FINISHED.equals(state)) {
-      executor.execute(() -> alertService.alert(app, state));
+      notifyExecutor.execute(() -> alertService.alert(app, state));
     }
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index f1b1b5a61..8dd566ab0 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -60,8 +60,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -142,14 +141,8 @@ public class FlinkRESTAPIWatcher {
 
   private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0");
 
-  private static final ExecutorService EXECUTOR =
-      new ThreadPoolExecutor(
-          Runtime.getRuntime().availableProcessors() * 5,
-          Runtime.getRuntime().availableProcessors() * 10,
-          60L,
-          TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(1024),
-          ThreadUtils.threadFactory("flink-restapi-watching-executor"));
+  private static final ExecutorService watchExecutor =
+      
Executors.newCachedThreadPool(ThreadUtils.threadFactory("flink-restapi-watching-executor"));
 
   @PostConstruct
   public void init() {
@@ -201,7 +194,7 @@ public class FlinkRESTAPIWatcher {
   }
 
   private void watch(Long key, Application application) {
-    EXECUTOR.execute(
+    watchExecutor.execute(
         () -> {
           final StopFrom stopFrom =
               STOP_FROM_MAP.getOrDefault(key, null) == null

Reply via email to