This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b8b8ca1c2b [Fix][Zeta] Introduce SeaTunnel CompletableFuture to 
prevent ForkJoinPool thread shortage (#8445)
b8b8ca1c2b is described below

commit b8b8ca1c2b747503f626047da2446539a9e06c9e
Author: Jia Fan <[email protected]>
AuthorDate: Wed Jan 8 09:40:30 2025 +0800

    [Fix][Zeta] Introduce SeaTunnel CompletableFuture to prevent ForkJoinPool 
thread shortage (#8445)
---
 .../seatunnel/api/ChineseCharacterCheckTest.java   |   2 +-
 ...assCheckTest.java => ImportClassCheckTest.java} |  91 ++++++++---
 .../apache/seatunnel/api/UTClassNameCheckTest.java |   2 +-
 .../container/seatunnel/SeaTunnelContainer.java    |   1 +
 .../engine/client/ConnectorPackageClientTest.java  |   2 +-
 .../engine/client/SeaTunnelClientTest.java         |   2 +-
 .../common/utils/PassiveCompletableFuture.java     |   6 +-
 .../common/utils/concurrent/CompletableFuture.java | 180 +++++++++++++++++++++
 .../utils/concurrent/CompletableFutureTest.java    | 119 ++++++++++++++
 .../core/checkpoint/CheckpointIDCounter.java       |   3 +-
 .../engine/server/CoordinatorService.java          |   2 +-
 .../engine/server/SeaTunnelServerStarter.java      |   6 +
 .../engine/server/TaskExecutionService.java        |   2 +-
 .../server/checkpoint/CheckpointCoordinator.java   |   2 +-
 .../server/checkpoint/CheckpointManager.java       |   2 +-
 .../server/checkpoint/IMapCheckpointIDCounter.java |   2 +-
 .../server/checkpoint/PendingCheckpoint.java       |   2 +-
 .../checkpoint/StandaloneCheckpointIDCounter.java  |   2 +-
 .../engine/server/dag/physical/PhysicalPlan.java   |   2 +-
 .../server/dag/physical/PhysicalPlanGenerator.java |   2 +-
 .../engine/server/dag/physical/PhysicalVertex.java |   2 +-
 .../engine/server/dag/physical/ResourceUtils.java  |   2 +-
 .../engine/server/dag/physical/SubPlan.java        |   2 +-
 .../seatunnel/engine/server/master/JobMaster.java  |   2 +-
 .../operation/GetJobCheckpointOperation.java       |   2 +-
 .../operation/GetJobDetailStatusOperation.java     |   2 +-
 .../server/operation/GetJobInfoOperation.java      |   2 +-
 .../server/operation/GetJobMetricsOperation.java   |   2 +-
 .../server/operation/GetJobStatusOperation.java    |   2 +-
 .../operation/GetRunningJobMetricsOperation.java   |   2 +-
 .../server/operation/ListJobStatusOperation.java   |   2 +-
 .../operation/UploadConnectorJarOperation.java     |   2 +-
 .../resourcemanager/AbstractResourceManager.java   |   5 +-
 .../server/resourcemanager/ResourceManager.java    |   2 +-
 .../resourcemanager/ResourceRequestHandler.java    |   2 +-
 .../thirdparty/ThirdPartyResourceManager.java      |   3 +-
 .../kubernetes/KubernetesResourceManager.java      |   3 +-
 .../thirdparty/yarn/YarnResourceManager.java       |   3 +-
 .../seatunnel/engine/server/task/AbstractTask.java |   2 +-
 .../engine/server/task/SeaTunnelTask.java          |   2 +-
 .../server/task/SinkAggregatedCommitterTask.java   |   2 +-
 .../engine/server/task/SourceSeaTunnelTask.java    |   2 +-
 .../engine/server/task/TransformSeaTunnelTask.java |   3 +-
 .../server/task/flow/AbstractFlowLifeCycle.java    |   2 +-
 .../server/task/flow/ActionFlowLifeCycle.java      |   3 +-
 .../task/flow/IntermediateQueueFlowLifeCycle.java  |   2 +-
 .../server/task/flow/ShuffleSinkFlowLifeCycle.java |   2 +-
 .../task/flow/ShuffleSourceFlowLifeCycle.java      |   2 +-
 .../engine/server/task/flow/SinkFlowLifeCycle.java |   2 +-
 .../server/task/flow/SourceFlowLifeCycle.java      |   2 +-
 .../server/task/flow/TransformFlowLifeCycle.java   |   2 +-
 .../engine/server/CoordinatorServiceTest.java      |  36 +++++
 .../engine/server/TaskExecutionServiceTest.java    |   2 +-
 .../server/checkpoint/CheckpointStorageTest.java   |   2 +-
 .../resourcemanager/FakeResourceManager.java       |   2 +-
 ...FakeResourceManagerForRequestSlotRetryTest.java |   2 +-
 .../engine/server/utils/PeekBlockingQueueTest.java |   3 +-
 57 files changed, 467 insertions(+), 82 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ChineseCharacterCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ChineseCharacterCheckTest.java
index d0b2b838a1..edb3ca046d 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ChineseCharacterCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ChineseCharacterCheckTest.java
@@ -38,7 +38,7 @@ import java.util.List;
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
 
-import static org.apache.seatunnel.api.ImportShadeClassCheckTest.isWindows;
+import static org.apache.seatunnel.api.ImportClassCheckTest.isWindows;
 
 @Slf4j
 public class ChineseCharacterCheckTest {
diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ImportShadeClassCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ImportClassCheckTest.java
similarity index 63%
rename from 
seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ImportShadeClassCheckTest.java
rename to 
seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ImportClassCheckTest.java
index 84cbddff27..5bfe523fe1 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ImportShadeClassCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ImportClassCheckTest.java
@@ -47,7 +47,7 @@ import java.util.stream.Stream;
 import static java.nio.file.StandardOpenOption.READ;
 
 @Slf4j
-public class ImportShadeClassCheckTest {
+public class ImportClassCheckTest {
 
     private static Map<String, NodeList<ImportDeclaration>> importsMap = new 
HashMap<>();
     private final String SEATUNNEL_SHADE_PREFIX = 
"org.apache.seatunnel.shade.";
@@ -85,58 +85,95 @@ public class ImportShadeClassCheckTest {
     @Test
     public void guavaShadeCheck() {
         Map<String, List<String>> errorMap =
-                checkShade(Collections.singletonList("com.google.common"));
-        Assertions.assertEquals(0, errorMap.size(), errorMsg("guava", 
errorMap));
+                
checkImportClassPrefixWithAll(Collections.singletonList("com.google.common"));
+        Assertions.assertEquals(0, errorMap.size(), shadeErrorMsg("guava", 
errorMap));
         log.info("check guava shade successfully");
     }
 
     @Test
     public void jacksonShadeCheck() {
         Map<String, List<String>> errorMap =
-                checkShade(
+                checkImportClassPrefixWithExclude(
                         Collections.singletonList("com.fasterxml.jackson"),
                         Arrays.asList(
                                 
"org.apache.seatunnel.format.compatible.debezium.json",
                                 
"org.apache.seatunnel.format.compatible.kafka.connect.json",
                                 "org.apache.seatunnel.connectors.druid.sink",
                                 
"org.apache.seatunnel.connectors.seatunnel.typesense.client"));
-        Assertions.assertEquals(0, errorMap.size(), errorMsg("jackson", 
errorMap));
+        Assertions.assertEquals(0, errorMap.size(), shadeErrorMsg("jackson", 
errorMap));
         log.info("check jackson shade successfully");
     }
 
     @Test
     public void jettyShadeCheck() {
         Map<String, List<String>> errorMap =
-                checkShade(Collections.singletonList("org.eclipse.jetty"));
-        Assertions.assertEquals(0, errorMap.size(), errorMsg("jetty", 
errorMap));
+                
checkImportClassPrefixWithAll(Collections.singletonList("org.eclipse.jetty"));
+        Assertions.assertEquals(0, errorMap.size(), shadeErrorMsg("jetty", 
errorMap));
         log.info("check jetty shade successfully");
     }
 
     @Test
     public void janinoShadeCheck() {
         Map<String, List<String>> errorMap =
-                checkShade(Arrays.asList("org.codehaus.janino", 
"org.codehaus.commons"));
-        Assertions.assertEquals(0, errorMap.size(), errorMsg("janino", 
errorMap));
+                checkImportClassPrefixWithAll(
+                        Arrays.asList("org.codehaus.janino", 
"org.codehaus.commons"));
+        Assertions.assertEquals(0, errorMap.size(), shadeErrorMsg("janino", 
errorMap));
         log.info("check janino shade successfully");
     }
 
-    private Map<String, List<String>> checkShade(List<String> prefixList) {
-        return checkShade(prefixList, Collections.emptyList());
+    @Test
+    public void javaUtilCompletableFutureCheck() {
+        Map<String, List<String>> errorMap =
+                checkImportClassPrefix(
+                        
Collections.singletonList("java.util.concurrent.CompletableFuture"),
+                        
Collections.singletonList("org.apache.seatunnel.engine"),
+                        
Collections.singletonList("org.apache.seatunnel.engine.e2e"));
+        Assertions.assertEquals(
+                0,
+                errorMap.size(),
+                errorMsg(
+                        "Can not use java.util.concurrent.CompletableFuture, 
please use 
org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture instead.",
+                        errorMap));
+        log.info("check java concurrent CompletableFuture successfully");
+    }
+
+    private Map<String, List<String>> 
checkImportClassPrefixWithAll(List<String> prefixList) {
+        return checkImportClassPrefix(prefixList, Collections.emptyList(), 
Collections.emptyList());
     }
 
-    private Map<String, List<String>> checkShade(
+    private Map<String, List<String>> checkImportClassPrefixWithExclude(
             List<String> prefixList, List<String> packageWhiteList) {
+        return checkImportClassPrefix(prefixList, Collections.emptyList(), 
packageWhiteList);
+    }
+
+    private Map<String, List<String>> checkImportClassPrefixWithInclude(
+            List<String> prefixList, List<String> packageCheckList) {
+        return checkImportClassPrefix(prefixList, packageCheckList, 
Collections.emptyList());
+    }
+
+    private Map<String, List<String>> checkImportClassPrefix(
+            List<String> prefixList, List<String> packageCheckList, 
List<String> packageWhiteList) {
+        List<String> pathWhiteList =
+                packageWhiteList.stream()
+                        .map(whitePackage -> whitePackage.replace(".", 
isWindows ? "\\" : "/"))
+                        .collect(Collectors.toList());
+        List<String> pathCheckList =
+                packageCheckList.stream()
+                        .map(whitePackage -> whitePackage.replace(".", 
isWindows ? "\\" : "/"))
+                        .collect(Collectors.toList());
         Map<String, List<String>> errorMap = new HashMap<>();
         importsMap.forEach(
                 (clazzPath, imports) -> {
-                    boolean match =
-                            packageWhiteList.stream()
-                                    .map(
-                                            whitePackage ->
-                                                    whitePackage.replace(
-                                                            ".", isWindows ? 
"\\" : "/"))
-                                    .anyMatch(clazzPath::contains);
-                    if (!match) {
+                    boolean match;
+                    if (pathCheckList.isEmpty()) {
+                        match = 
pathWhiteList.stream().noneMatch(clazzPath::contains);
+                    } else {
+                        match =
+                                
pathCheckList.stream().anyMatch(clazzPath::contains)
+                                        && 
pathWhiteList.stream().noneMatch(clazzPath::contains);
+                    }
+
+                    if (match) {
                         List<String> collect =
                                 imports.stream()
                                         .filter(
@@ -156,11 +193,17 @@ public class ImportShadeClassCheckTest {
         return errorMap;
     }
 
-    private String errorMsg(String checkType, Map<String, List<String>> 
errorMap) {
+    private String shadeErrorMsg(String checkType, Map<String, List<String>> 
errorMap) {
+        String msg =
+                String.format("%s shade is not up to code, need add prefix [", 
checkType)
+                        + SEATUNNEL_SHADE_PREFIX
+                        + "]. \n";
+        return errorMsg(msg, errorMap);
+    }
+
+    private String errorMsg(String message, Map<String, List<String>> 
errorMap) {
         StringBuilder msg = new StringBuilder();
-        msg.append(String.format("%s shade is not up to code, need add prefix 
[", checkType))
-                .append(SEATUNNEL_SHADE_PREFIX)
-                .append("]. \n");
+        msg.append(message).append("\n");
         errorMap.forEach(
                 (key, value) -> {
                     msg.append(key).append("\n");
diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/UTClassNameCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/UTClassNameCheckTest.java
index 55a20057b4..4b8b1ffb18 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/UTClassNameCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/UTClassNameCheckTest.java
@@ -37,7 +37,7 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.seatunnel.api.ImportShadeClassCheckTest.isWindows;
+import static org.apache.seatunnel.api.ImportClassCheckTest.isWindows;
 
 @Slf4j
 public class UTClassNameCheckTest {
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 1b42994154..dde94542b3 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -393,6 +393,7 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
                 || s.startsWith("seatunnel-coordinator-service")
                 || s.startsWith("GC task thread")
                 || s.contains("CompilerThread")
+                || s.startsWith("SeaTunnel-CompletableFuture-Thread-")
                 || s.contains("NioNetworking-closeListenerExecutor")
                 || s.contains("ForkJoinPool.commonPool")
                 || s.contains("DestroyJavaVM")
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java
index 1f07cb9489..fbdceb2c13 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java
@@ -35,6 +35,7 @@ import 
org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
 import org.apache.seatunnel.engine.core.job.ConnectorJarType;
 import org.apache.seatunnel.engine.core.job.JobStatus;
@@ -68,7 +69,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 194da82987..4aad6b3819 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobStatus;
@@ -57,7 +58,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Spliterators;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
index cebbca9c46..20b02f48f3 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
@@ -16,13 +16,17 @@
 
 package org.apache.seatunnel.engine.common.utils;
 
-import java.util.concurrent.CompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 
 /** A future which prevents completion by outside caller */
 public class PassiveCompletableFuture<T> extends CompletableFuture<T> {
 
     public PassiveCompletableFuture() {}
 
+    public PassiveCompletableFuture(java.util.concurrent.CompletableFuture<T> 
chainedFuture) {
+        this(new CompletableFuture<>(chainedFuture));
+    }
+
     public PassiveCompletableFuture(CompletableFuture<T> chainedFuture) {
         if (chainedFuture != null) {
             chainedFuture.whenComplete(
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/concurrent/CompletableFuture.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/concurrent/CompletableFuture.java
new file mode 100644
index 0000000000..b80f421b2c
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/concurrent/CompletableFuture.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.utils.concurrent;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** A {@link java.util.concurrent.CompletableFuture} with own executor. */
+public class CompletableFuture<T> extends 
java.util.concurrent.CompletableFuture<T> {
+
+    public static final Executor EXECUTOR =
+            new ThreadPoolExecutor(
+                    Math.min(8, Runtime.getRuntime().availableProcessors()),
+                    Integer.MAX_VALUE,
+                    60L,
+                    TimeUnit.SECONDS,
+                    new SynchronousQueue<>(),
+                    new ThreadFactory() {
+                        private final AtomicInteger seq = new AtomicInteger();
+
+                        @Override
+                        public Thread newThread(Runnable r) {
+                            Thread thread =
+                                    new Thread(
+                                            r,
+                                            
"SeaTunnel-CompletableFuture-Thread-"
+                                                    + seq.getAndIncrement());
+                            thread.setDaemon(true);
+                            return thread;
+                        }
+                    });
+
+    public CompletableFuture() {}
+
+    public CompletableFuture(java.util.concurrent.CompletableFuture<T> future) 
{
+        future.whenComplete(
+                (value, ex) -> {
+                    if (ex != null) {
+                        super.completeExceptionally(ex);
+                    } else {
+                        super.complete(value);
+                    }
+                });
+    }
+
+    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
+        return new 
CompletableFuture<>(java.util.concurrent.CompletableFuture.allOf(cfs));
+    }
+
+    public static CompletableFuture<Void> 
allOf(java.util.concurrent.CompletableFuture<?>... cfs) {
+        return new 
CompletableFuture<>(java.util.concurrent.CompletableFuture.allOf(cfs));
+    }
+
+    public boolean complete(T value) {
+        return super.complete(value);
+    }
+
+    public static <U> CompletableFuture<U> completedFuture(U value) {
+        return new CompletableFuture<>(
+                java.util.concurrent.CompletableFuture.completedFuture(value));
+    }
+
+    public static CompletableFuture<Void> runAsync(Runnable runnable) {
+        return new CompletableFuture<>(
+                java.util.concurrent.CompletableFuture.runAsync(runnable, 
EXECUTOR));
+    }
+
+    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor 
executor) {
+        return new CompletableFuture<>(
+                java.util.concurrent.CompletableFuture.runAsync(runnable, 
executor));
+    }
+
+    public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> 
fn) {
+        return new CompletableFuture<>(super.exceptionally(fn));
+    }
+
+    public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super 
Throwable> action) {
+        return new CompletableFuture<>(super.whenComplete(action));
+    }
+
+    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
+        return new CompletableFuture<>(super.thenAccept(action));
+    }
+
+    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
+        return new CompletableFuture<>(
+                java.util.concurrent.CompletableFuture.supplyAsync(supplier, 
EXECUTOR));
+    }
+
+    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, 
Executor executor) {
+        return new CompletableFuture<>(
+                java.util.concurrent.CompletableFuture.supplyAsync(supplier, 
executor));
+    }
+
+    public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> 
fn) {
+        return new CompletableFuture<>(super.thenApply(fn));
+    }
+
+    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? 
extends U> fn) {
+        return new CompletableFuture<>(super.thenApplyAsync(fn, EXECUTOR));
+    }
+
+    public <U> CompletableFuture<U> thenApplyAsync(
+            Function<? super T, ? extends U> fn, Executor executor) {
+        return new CompletableFuture<>(super.thenApplyAsync(fn, executor));
+    }
+
+    public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? 
super Throwable> action) {
+        return new CompletableFuture<>(super.whenCompleteAsync(action, 
EXECUTOR));
+    }
+
+    public CompletableFuture<T> whenCompleteAsync(
+            BiConsumer<? super T, ? super Throwable> action, Executor 
executor) {
+        return new CompletableFuture<>(super.whenCompleteAsync(action, 
executor));
+    }
+
+    public boolean completeExceptionally(Throwable ex) {
+        return super.completeExceptionally(ex);
+    }
+
+    public T get() throws InterruptedException, ExecutionException {
+        return super.get();
+    }
+
+    public T get(long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        return super.get(timeout, unit);
+    }
+
+    public T join() {
+        return super.join();
+    }
+
+    public void obtrudeException(Throwable ex) {
+        super.obtrudeException(ex);
+    }
+
+    public void obtrudeValue(T value) {
+        super.obtrudeValue(value);
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return super.cancel(mayInterruptIfRunning);
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return super.isCancelled();
+    }
+
+    public boolean isDone() {
+        return super.isDone();
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/concurrent/CompletableFutureTest.java
 
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/concurrent/CompletableFutureTest.java
new file mode 100644
index 0000000000..c1d6f5acab
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/concurrent/CompletableFutureTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.utils.concurrent;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class CompletableFutureTest {
+
+    @Test
+    void testCompletableFuture() {
+        CompletableFuture<Integer> future = new CompletableFuture<>();
+        future.complete(1);
+        Assertions.assertEquals(1, future.join());
+        future = new CompletableFuture<>();
+        future.completeExceptionally(new RuntimeException());
+        Assertions.assertThrows(RuntimeException.class, future::join);
+    }
+
+    @Test
+    void testCompletedNormally() {
+        CompletableFuture<Integer> future = new CompletableFuture<>();
+        future.complete(1);
+        Assertions.assertTrue(future.isDone());
+        Assertions.assertFalse(future.isCompletedExceptionally());
+        Assertions.assertFalse(future.isCancelled());
+    }
+
+    @Test
+    void testAsyncMethodWithOwnExecutor() {
+        AtomicInteger value = new AtomicInteger(0);
+        
Assertions.assertFalse(getThreads().contains("SeaTunnel-CompletableFuture-Thread-0"));
+        CompletableFuture.runAsync(value::getAndIncrement).join();
+        
Assertions.assertTrue(getThreads().contains("SeaTunnel-CompletableFuture-Thread-0"));
+        Assertions.assertEquals(1, value.get());
+        CompletableFuture.allOf(
+                        CompletableFuture.supplyAsync(
+                                () -> {
+                                    value.getAndIncrement();
+                                    try {
+                                        Thread.sleep(1000);
+                                    } catch (InterruptedException e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                    return null;
+                                }),
+                        CompletableFuture.supplyAsync(value::getAndIncrement))
+                .join();
+        
Assertions.assertTrue(getThreads().contains("SeaTunnel-CompletableFuture-Thread-1"));
+        Assertions.assertEquals(3, value.get());
+        CompletableFuture.allOf(
+                        getWhenCompleteAsync(value),
+                        getWhenCompleteAsync(value),
+                        getWhenCompleteAsync(value))
+                .join();
+        
Assertions.assertTrue(getThreads().contains("SeaTunnel-CompletableFuture-Thread-2"));
+        Assertions.assertEquals(6, value.get());
+        CompletableFuture.allOf(
+                        getThenApplyAsync(value),
+                        getThenApplyAsync(value),
+                        getThenApplyAsync(value),
+                        getThenApplyAsync(value))
+                .join();
+        
Assertions.assertTrue(getThreads().contains("SeaTunnel-CompletableFuture-Thread-3"));
+        Assertions.assertEquals(10, value.get());
+    }
+
+    private static CompletableFuture<Object> 
getWhenCompleteAsync(AtomicInteger value) {
+        return CompletableFuture.completedFuture(null)
+                .whenCompleteAsync(
+                        (aVoid, throwable) -> {
+                            value.getAndIncrement();
+                            try {
+                                Thread.sleep(1000);
+                            } catch (InterruptedException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+    }
+
+    private static CompletableFuture<Object> getThenApplyAsync(AtomicInteger 
value) {
+        return CompletableFuture.completedFuture(null)
+                .thenApplyAsync(
+                        aVoid -> {
+                            value.getAndIncrement();
+                            try {
+                                Thread.sleep(1000);
+                            } catch (InterruptedException e) {
+                                throw new RuntimeException(e);
+                            }
+                            return null;
+                        });
+    }
+
+    private static Set<String> getThreads() {
+        return Thread.getAllStackTraces().keySet().stream()
+                .map(Thread::getName)
+                .collect(Collectors.toSet());
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
index f89c4bfcd4..1a3db81dfe 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
@@ -18,10 +18,9 @@
 
 package org.apache.seatunnel.engine.core.checkpoint;
 
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 
-import java.util.concurrent.CompletableFuture;
-
 /** A checkpoint ID counter. */
 public interface CheckpointIDCounter {
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 5b74a1f181..5a65f248ae 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -37,6 +37,7 @@ import 
org.apache.seatunnel.engine.common.exception.JobNotFoundException;
 import org.apache.seatunnel.engine.common.exception.SavePointFailedException;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobInfo;
 import org.apache.seatunnel.engine.core.job.JobResult;
@@ -88,7 +89,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
index 8e69abd528..d66433df96 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
@@ -20,12 +20,14 @@ package org.apache.seatunnel.engine.server;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.EngineConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import 
org.apache.seatunnel.engine.server.telemetry.metrics.ExportsInstanceInitializer;
 
 import com.hazelcast.instance.impl.HazelcastInstanceFactory;
 import com.hazelcast.instance.impl.HazelcastInstanceImpl;
 import com.hazelcast.instance.impl.HazelcastInstanceProxy;
 import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.util.ConcurrencyUtil;
 import lombok.NonNull;
 
 public class SeaTunnelServerStarter {
@@ -52,6 +54,10 @@ public class SeaTunnelServerStarter {
 
     private static HazelcastInstanceImpl initializeHazelcastInstance(
             @NonNull SeaTunnelConfig seaTunnelConfig, String 
customInstanceName) {
+
+        // set the default async executor for Hazelcast InvocationFuture
+        ConcurrencyUtil.setDefaultAsyncExecutor(CompletableFuture.EXECUTOR);
+
         boolean condition = checkTelemetryConfig(seaTunnelConfig);
         String instanceName =
                 customInstanceName != null
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 785b2a9fc1..d42e95d480 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -31,6 +31,7 @@ import 
org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.config.server.ThreadShareMode;
 import org.apache.seatunnel.engine.common.exception.JobNotFoundException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
 import 
org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
@@ -82,7 +83,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 5a69a065bb..b221b00651 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
@@ -61,7 +62,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 0bede51c70..2cfef3fce3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -28,6 +28,7 @@ import 
org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.FactoryUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.job.Job;
@@ -55,7 +56,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.util.Arrays;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 import java.util.stream.Collectors;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
index 950e0be5ea..ad00f9834a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.checkpoint;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 
@@ -28,7 +29,6 @@ import com.hazelcast.spi.impl.NodeEngine;
 
 import java.nio.ByteBuffer;
 import java.util.Base64;
-import java.util.concurrent.CompletableFuture;
 
 import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
index 4b35f3ffab..496f3313c3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.checkpoint;
 
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -34,7 +35,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 
 public class PendingCheckpoint implements Checkpoint {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
index 97c7c63614..ca65a23626 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
@@ -17,10 +17,10 @@
 
 package org.apache.seatunnel.engine.server.checkpoint;
 
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index f8039cf649..9455d833e9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobResult;
 import org.apache.seatunnel.engine.core.job.JobStatus;
@@ -39,7 +40,6 @@ import lombok.extern.slf4j.Slf4j;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index abc82cfae6..bcc49daae2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.engine.common.config.server.QueueType;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
 import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
@@ -77,7 +78,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 7f75f489e0..b65b3cc56a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -26,6 +26,7 @@ import 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.exception.TaskGroupDeployException;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
@@ -58,7 +59,6 @@ import java.net.URL;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
index 1a9bc27984..c5db9909a5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.dag.physical;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.master.JobMaster;
 import 
org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException;
@@ -31,7 +32,6 @@ import lombok.NonNull;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 
 public class ResourceUtils {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 2d74b4c928..c1f4ca9121 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
@@ -41,7 +42,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index db3d7b1d21..46883fdfcf 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -40,6 +40,7 @@ import 
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
@@ -96,7 +97,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobCheckpointOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobCheckpointOperation.java
index e32ffccaab..38da14ca44 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobCheckpointOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobCheckpointOperation.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.operation;
 
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import 
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
 
@@ -29,7 +30,6 @@ import com.hazelcast.spi.impl.AllowedDuringPassiveState;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 public class GetJobCheckpointOperation extends Operation
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobDetailStatusOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobDetailStatusOperation.java
index 2831a78b16..90700888b5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobDetailStatusOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobDetailStatusOperation.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.operation;
 
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import 
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
 
@@ -28,7 +29,6 @@ import com.hazelcast.spi.impl.AllowedDuringPassiveState;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 public class GetJobDetailStatusOperation extends Operation
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobInfoOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobInfoOperation.java
index 60bbf2d178..1e52161f8d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobInfoOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobInfoOperation.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.operation;
 
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import 
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
 
@@ -29,7 +30,6 @@ import com.hazelcast.spi.impl.AllowedDuringPassiveState;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 public class GetJobInfoOperation extends Operation
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
index 0668647381..0534c29f9b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.operation;
 
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import 
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
 
@@ -27,7 +28,6 @@ import com.hazelcast.spi.impl.AllowedDuringPassiveState;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import static 
org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJsonString;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
index f0958a95e8..c70ad6453a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.operation;
 
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import 
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
@@ -28,7 +29,6 @@ import com.hazelcast.spi.impl.AllowedDuringPassiveState;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 public class GetJobStatusOperation extends Operation
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java
index 9277c324f3..cd137b89d1 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.operation;
 
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import 
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
 
@@ -27,7 +28,6 @@ import com.hazelcast.spi.impl.AllowedDuringPassiveState;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import static 
org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJsonString;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/ListJobStatusOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/ListJobStatusOperation.java
index e401fcab65..ffa2190c54 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/ListJobStatusOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/ListJobStatusOperation.java
@@ -18,12 +18,12 @@
 package org.apache.seatunnel.engine.server.operation;
 
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 
 import com.hazelcast.spi.impl.AllowedDuringPassiveState;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 public class ListJobStatusOperation extends Operation implements 
AllowedDuringPassiveState {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/UploadConnectorJarOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/UploadConnectorJarOperation.java
index b2c45fbf77..9ffd63ecf7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/UploadConnectorJarOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/UploadConnectorJarOperation.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.operation;
 
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import 
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
@@ -30,7 +31,6 @@ import 
com.hazelcast.nio.serialization.IdentifiedDataSerializable;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 public class UploadConnectorJarOperation extends Operation implements 
IdentifiedDataSerializable {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index 6c04748ccc..6d44bda504 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.resourcemanager;
 
 import org.apache.seatunnel.engine.common.config.EngineConfig;
 import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.SyncWorkerProfileOperation;
@@ -39,7 +40,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
@@ -176,7 +176,8 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
     }
 
     protected <E> CompletableFuture<E> sendToMember(Operation operation, 
Address address) {
-        return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, 
address);
+        return new CompletableFuture<>(
+                NodeEngineUtil.sendOperationToMemberNode(nodeEngine, 
operation, address));
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
index 0911345eb2..31a5f5922e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.resourcemanager;
 
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import 
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
@@ -25,7 +26,6 @@ import com.hazelcast.internal.services.MembershipServiceEvent;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 public interface ResourceManager {
     void init();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
index 5003f4a4ef..107b0190d0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.resourcemanager;
 import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
 
 import org.apache.seatunnel.engine.common.runtime.DeployType;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -36,7 +37,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/ThirdPartyResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/ThirdPartyResourceManager.java
index 7db839b19f..cd181f385a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/ThirdPartyResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/ThirdPartyResourceManager.java
@@ -17,10 +17,9 @@
 
 package org.apache.seatunnel.engine.server.resourcemanager.thirdparty;
 
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import 
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
 
-import java.util.concurrent.CompletableFuture;
-
 public interface ThirdPartyResourceManager {
 
     CompletableFuture<CreateWorkerResult> createNewWorker(ResourceProfile 
resourceProfile);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/kubernetes/KubernetesResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/kubernetes/KubernetesResourceManager.java
index abfef1175b..f6e715f96e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/kubernetes/KubernetesResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/kubernetes/KubernetesResourceManager.java
@@ -18,6 +18,7 @@
 package 
org.apache.seatunnel.engine.server.resourcemanager.thirdparty.kubernetes;
 
 import org.apache.seatunnel.engine.common.config.EngineConfig;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import 
org.apache.seatunnel.engine.server.resourcemanager.AbstractResourceManager;
 import 
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
 import 
org.apache.seatunnel.engine.server.resourcemanager.thirdparty.CreateWorkerResult;
@@ -25,8 +26,6 @@ import 
org.apache.seatunnel.engine.server.resourcemanager.thirdparty.ThirdPartyR
 
 import com.hazelcast.spi.impl.NodeEngine;
 
-import java.util.concurrent.CompletableFuture;
-
 public class KubernetesResourceManager extends AbstractResourceManager
         implements ThirdPartyResourceManager {
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/yarn/YarnResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/yarn/YarnResourceManager.java
index 489135eef5..287d7500a0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/yarn/YarnResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/yarn/YarnResourceManager.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.resourcemanager.thirdparty.yarn;
 
 import org.apache.seatunnel.engine.common.config.EngineConfig;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import 
org.apache.seatunnel.engine.server.resourcemanager.AbstractResourceManager;
 import 
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
 import 
org.apache.seatunnel.engine.server.resourcemanager.thirdparty.CreateWorkerResult;
@@ -25,8 +26,6 @@ import 
org.apache.seatunnel.engine.server.resourcemanager.thirdparty.ThirdPartyR
 
 import com.hazelcast.spi.impl.NodeEngine;
 
-import java.util.concurrent.CompletableFuture;
-
 public class YarnResourceManager extends AbstractResourceManager
         implements ThirdPartyResourceManager {
     public YarnResourceManager(NodeEngine nodeEngine, EngineConfig 
engineConfig) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
index 7744ccd4ee..ade0b8f975 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.task;
 
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
 import 
org.apache.seatunnel.engine.server.checkpoint.operation.TaskReportStatusOperation;
 import org.apache.seatunnel.engine.server.execution.ProgressState;
@@ -32,7 +33,6 @@ import java.io.IOException;
 import java.net.URL;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 90b05d8704..e04e12d69f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.tracing.MDCTracer;
 import org.apache.seatunnel.common.utils.function.ConsumerWithException;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
@@ -75,7 +76,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 8b6f66316d..d2ebb5f14e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.task;
 
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
 import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
@@ -49,7 +50,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index dbcde3e9d6..ba02ec8b72 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
 import 
org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
@@ -42,7 +43,6 @@ import lombok.NonNull;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 public class SourceSeaTunnelTask<T, SplitT extends SourceSplit> extends 
SeaTunnelTask {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java
index 9010a07a58..5f88a692b9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task;
 import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
 import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
@@ -33,8 +34,6 @@ import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import lombok.NonNull;
 
-import java.util.concurrent.CompletableFuture;
-
 public class TransformSeaTunnelTask extends SeaTunnelTask {
 
     private static final ILogger LOGGER = 
Logger.getLogger(TransformSeaTunnelTask.class);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
index af8044e60b..b1bb803c80 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
@@ -17,13 +17,13 @@
 
 package org.apache.seatunnel.engine.server.task.flow;
 
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 
 import lombok.Getter;
 import lombok.Setter;
 
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 
 public class AbstractFlowLifeCycle implements FlowLifeCycle {
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ActionFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ActionFlowLifeCycle.java
index fac2ff16e5..426a1a1bf7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ActionFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ActionFlowLifeCycle.java
@@ -17,12 +17,11 @@
 
 package org.apache.seatunnel.engine.server.task.flow;
 
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.server.checkpoint.Stateful;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 
-import java.util.concurrent.CompletableFuture;
-
 public abstract class ActionFlowLifeCycle extends AbstractFlowLifeCycle 
implements Stateful {
 
     protected Action action;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
index 8765f49b98..3b8df128f0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
@@ -19,11 +19,11 @@ package org.apache.seatunnel.engine.server.task.flow;
 
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import 
org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
 
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 
 public class IntermediateQueueFlowLifeCycle<T extends 
AbstractIntermediateQueue<?>>
         extends AbstractFlowLifeCycle
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
index 78e19328ac..cba7537c68 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.task.flow;
 
 import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
 import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
 import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
 import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
@@ -35,7 +36,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
 
 @SuppressWarnings("MagicNumber")
 @Slf4j
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
index 87b29eab21..654cfff67b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.task.flow;
 
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
 import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
@@ -34,7 +35,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 @Slf4j
 @SuppressWarnings("MagicNumber")
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 202f67bd5c..3f47ee84c3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -32,6 +32,7 @@ import 
org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
@@ -60,7 +61,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index 6c596da0c3..7843eff7f8 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
 import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
 import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -57,7 +58,6 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
index 632e529169..8eaf196358 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.transform.Collector;
 import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
 import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
 import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
@@ -38,7 +39,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 
 @Slf4j
 public class TransformFlowLifeCycle<T> extends ActionFlowLifeCycle
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
index 2053ca01a8..067d3e83d5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
@@ -25,6 +25,8 @@ import 
org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Disabled;
@@ -82,6 +84,40 @@ public class CoordinatorServiceTest {
         instance2.shutdown();
     }
 
+    @Test
+    public void testInvocationFutureUseCompletableFutureExecutor() {
+        HazelcastInstanceImpl instance =
+                SeaTunnelServerStarter.createHazelcastInstance(
+                        TestUtils.getClusterName(
+                                
"CoordinatorServiceTest_testInvocationFutureUseCompletableFutureExecutor"));
+
+        NodeEngineUtil.sendOperationToMemberNode(
+                        instance.node.getNodeEngine(),
+                        new PrintMessageOperation("hello"),
+                        instance.getCluster().getLocalMember().getAddress())
+                .whenComplete(
+                        (aVoid, error) -> {
+                            Assertions.assertTrue(
+                                    Thread.currentThread()
+                                            .getName()
+                                            
.startsWith("SeaTunnel-CompletableFuture-Thread"));
+                        })
+                .join();
+
+        NodeEngineUtil.sendOperationToMasterNode(
+                        instance.node.getNodeEngine(), new 
PrintMessageOperation("hello"))
+                .whenCompleteAsync(
+                        (aVoid, error) -> {
+                            Assertions.assertTrue(
+                                    Thread.currentThread()
+                                            .getName()
+                                            
.startsWith("SeaTunnel-CompletableFuture-Thread"));
+                        })
+                .join();
+
+        instance.shutdown();
+    }
+
     @Test
     public void testClearCoordinatorService() {
         HazelcastInstanceImpl coordinatorServiceTest =
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 6fff7b9337..86ab48b770 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server;
 import org.apache.seatunnel.shade.com.google.common.collect.Lists;
 
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.server.execution.BlockTask;
 import org.apache.seatunnel.engine.server.execution.ExceptionTestTask;
 import org.apache.seatunnel.engine.server.execution.FixedCallTestTimeTask;
@@ -52,7 +53,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
index 63e1277827..7b2c84655a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorag
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import org.apache.seatunnel.engine.common.utils.FactoryUtil;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 
@@ -33,7 +34,6 @@ import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.awaitility.Awaitility.await;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
index 0118c15879..7755effb74 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.resourcemanager;
 
 import org.apache.seatunnel.engine.common.config.EngineConfig;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -30,7 +31,6 @@ import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.net.UnknownHostException;
 import java.util.Collections;
-import java.util.concurrent.CompletableFuture;
 
 /** Used to test ResourceManager, override init method to register more 
workers. */
 public class FakeResourceManager extends AbstractResourceManager {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java
index 1dc427e4f8..40a239b375 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.resourcemanager;
 
 import org.apache.seatunnel.engine.common.config.EngineConfig;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -32,7 +33,6 @@ import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /** Used to test ResourceManager, override init method to register more 
workers. */
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueueTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueueTest.java
index 6716f5a215..f298fd164a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueueTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueueTest.java
@@ -17,11 +17,12 @@
 
 package org.apache.seatunnel.engine.server.utils;
 
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 


Reply via email to