This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b8b8ca1c2b [Fix][Zeta] Introduce SeaTunnel CompletableFuture to
prevent ForkJoinPool thread shortage (#8445)
b8b8ca1c2b is described below
commit b8b8ca1c2b747503f626047da2446539a9e06c9e
Author: Jia Fan <[email protected]>
AuthorDate: Wed Jan 8 09:40:30 2025 +0800
[Fix][Zeta] Introduce SeaTunnel CompletableFuture to prevent ForkJoinPool
thread shortage (#8445)
---
.../seatunnel/api/ChineseCharacterCheckTest.java | 2 +-
...assCheckTest.java => ImportClassCheckTest.java} | 91 ++++++++---
.../apache/seatunnel/api/UTClassNameCheckTest.java | 2 +-
.../container/seatunnel/SeaTunnelContainer.java | 1 +
.../engine/client/ConnectorPackageClientTest.java | 2 +-
.../engine/client/SeaTunnelClientTest.java | 2 +-
.../common/utils/PassiveCompletableFuture.java | 6 +-
.../common/utils/concurrent/CompletableFuture.java | 180 +++++++++++++++++++++
.../utils/concurrent/CompletableFutureTest.java | 119 ++++++++++++++
.../core/checkpoint/CheckpointIDCounter.java | 3 +-
.../engine/server/CoordinatorService.java | 2 +-
.../engine/server/SeaTunnelServerStarter.java | 6 +
.../engine/server/TaskExecutionService.java | 2 +-
.../server/checkpoint/CheckpointCoordinator.java | 2 +-
.../server/checkpoint/CheckpointManager.java | 2 +-
.../server/checkpoint/IMapCheckpointIDCounter.java | 2 +-
.../server/checkpoint/PendingCheckpoint.java | 2 +-
.../checkpoint/StandaloneCheckpointIDCounter.java | 2 +-
.../engine/server/dag/physical/PhysicalPlan.java | 2 +-
.../server/dag/physical/PhysicalPlanGenerator.java | 2 +-
.../engine/server/dag/physical/PhysicalVertex.java | 2 +-
.../engine/server/dag/physical/ResourceUtils.java | 2 +-
.../engine/server/dag/physical/SubPlan.java | 2 +-
.../seatunnel/engine/server/master/JobMaster.java | 2 +-
.../operation/GetJobCheckpointOperation.java | 2 +-
.../operation/GetJobDetailStatusOperation.java | 2 +-
.../server/operation/GetJobInfoOperation.java | 2 +-
.../server/operation/GetJobMetricsOperation.java | 2 +-
.../server/operation/GetJobStatusOperation.java | 2 +-
.../operation/GetRunningJobMetricsOperation.java | 2 +-
.../server/operation/ListJobStatusOperation.java | 2 +-
.../operation/UploadConnectorJarOperation.java | 2 +-
.../resourcemanager/AbstractResourceManager.java | 5 +-
.../server/resourcemanager/ResourceManager.java | 2 +-
.../resourcemanager/ResourceRequestHandler.java | 2 +-
.../thirdparty/ThirdPartyResourceManager.java | 3 +-
.../kubernetes/KubernetesResourceManager.java | 3 +-
.../thirdparty/yarn/YarnResourceManager.java | 3 +-
.../seatunnel/engine/server/task/AbstractTask.java | 2 +-
.../engine/server/task/SeaTunnelTask.java | 2 +-
.../server/task/SinkAggregatedCommitterTask.java | 2 +-
.../engine/server/task/SourceSeaTunnelTask.java | 2 +-
.../engine/server/task/TransformSeaTunnelTask.java | 3 +-
.../server/task/flow/AbstractFlowLifeCycle.java | 2 +-
.../server/task/flow/ActionFlowLifeCycle.java | 3 +-
.../task/flow/IntermediateQueueFlowLifeCycle.java | 2 +-
.../server/task/flow/ShuffleSinkFlowLifeCycle.java | 2 +-
.../task/flow/ShuffleSourceFlowLifeCycle.java | 2 +-
.../engine/server/task/flow/SinkFlowLifeCycle.java | 2 +-
.../server/task/flow/SourceFlowLifeCycle.java | 2 +-
.../server/task/flow/TransformFlowLifeCycle.java | 2 +-
.../engine/server/CoordinatorServiceTest.java | 36 +++++
.../engine/server/TaskExecutionServiceTest.java | 2 +-
.../server/checkpoint/CheckpointStorageTest.java | 2 +-
.../resourcemanager/FakeResourceManager.java | 2 +-
...FakeResourceManagerForRequestSlotRetryTest.java | 2 +-
.../engine/server/utils/PeekBlockingQueueTest.java | 3 +-
57 files changed, 467 insertions(+), 82 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ChineseCharacterCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ChineseCharacterCheckTest.java
index d0b2b838a1..edb3ca046d 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ChineseCharacterCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ChineseCharacterCheckTest.java
@@ -38,7 +38,7 @@ import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Stream;
-import static org.apache.seatunnel.api.ImportShadeClassCheckTest.isWindows;
+import static org.apache.seatunnel.api.ImportClassCheckTest.isWindows;
@Slf4j
public class ChineseCharacterCheckTest {
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ImportShadeClassCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ImportClassCheckTest.java
similarity index 63%
rename from
seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ImportShadeClassCheckTest.java
rename to
seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ImportClassCheckTest.java
index 84cbddff27..5bfe523fe1 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ImportShadeClassCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ImportClassCheckTest.java
@@ -47,7 +47,7 @@ import java.util.stream.Stream;
import static java.nio.file.StandardOpenOption.READ;
@Slf4j
-public class ImportShadeClassCheckTest {
+public class ImportClassCheckTest {
private static Map<String, NodeList<ImportDeclaration>> importsMap = new
HashMap<>();
private final String SEATUNNEL_SHADE_PREFIX =
"org.apache.seatunnel.shade.";
@@ -85,58 +85,95 @@ public class ImportShadeClassCheckTest {
@Test
public void guavaShadeCheck() {
Map<String, List<String>> errorMap =
- checkShade(Collections.singletonList("com.google.common"));
- Assertions.assertEquals(0, errorMap.size(), errorMsg("guava",
errorMap));
+
checkImportClassPrefixWithAll(Collections.singletonList("com.google.common"));
+ Assertions.assertEquals(0, errorMap.size(), shadeErrorMsg("guava",
errorMap));
log.info("check guava shade successfully");
}
@Test
public void jacksonShadeCheck() {
Map<String, List<String>> errorMap =
- checkShade(
+ checkImportClassPrefixWithExclude(
Collections.singletonList("com.fasterxml.jackson"),
Arrays.asList(
"org.apache.seatunnel.format.compatible.debezium.json",
"org.apache.seatunnel.format.compatible.kafka.connect.json",
"org.apache.seatunnel.connectors.druid.sink",
"org.apache.seatunnel.connectors.seatunnel.typesense.client"));
- Assertions.assertEquals(0, errorMap.size(), errorMsg("jackson",
errorMap));
+ Assertions.assertEquals(0, errorMap.size(), shadeErrorMsg("jackson",
errorMap));
log.info("check jackson shade successfully");
}
@Test
public void jettyShadeCheck() {
Map<String, List<String>> errorMap =
- checkShade(Collections.singletonList("org.eclipse.jetty"));
- Assertions.assertEquals(0, errorMap.size(), errorMsg("jetty",
errorMap));
+
checkImportClassPrefixWithAll(Collections.singletonList("org.eclipse.jetty"));
+ Assertions.assertEquals(0, errorMap.size(), shadeErrorMsg("jetty",
errorMap));
log.info("check jetty shade successfully");
}
@Test
public void janinoShadeCheck() {
Map<String, List<String>> errorMap =
- checkShade(Arrays.asList("org.codehaus.janino",
"org.codehaus.commons"));
- Assertions.assertEquals(0, errorMap.size(), errorMsg("janino",
errorMap));
+ checkImportClassPrefixWithAll(
+ Arrays.asList("org.codehaus.janino",
"org.codehaus.commons"));
+ Assertions.assertEquals(0, errorMap.size(), shadeErrorMsg("janino",
errorMap));
log.info("check janino shade successfully");
}
- private Map<String, List<String>> checkShade(List<String> prefixList) {
- return checkShade(prefixList, Collections.emptyList());
+ @Test
+ public void javaUtilCompletableFutureCheck() {
+ Map<String, List<String>> errorMap =
+ checkImportClassPrefix(
+
Collections.singletonList("java.util.concurrent.CompletableFuture"),
+
Collections.singletonList("org.apache.seatunnel.engine"),
+
Collections.singletonList("org.apache.seatunnel.engine.e2e"));
+ Assertions.assertEquals(
+ 0,
+ errorMap.size(),
+ errorMsg(
+ "Can not use java.util.concurrent.CompletableFuture,
please use
org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture instead.",
+ errorMap));
+ log.info("check java concurrent CompletableFuture successfully");
+ }
+
+ private Map<String, List<String>>
checkImportClassPrefixWithAll(List<String> prefixList) {
+ return checkImportClassPrefix(prefixList, Collections.emptyList(),
Collections.emptyList());
}
- private Map<String, List<String>> checkShade(
+ private Map<String, List<String>> checkImportClassPrefixWithExclude(
List<String> prefixList, List<String> packageWhiteList) {
+ return checkImportClassPrefix(prefixList, Collections.emptyList(),
packageWhiteList);
+ }
+
+ private Map<String, List<String>> checkImportClassPrefixWithInclude(
+ List<String> prefixList, List<String> packageCheckList) {
+ return checkImportClassPrefix(prefixList, packageCheckList,
Collections.emptyList());
+ }
+
+ private Map<String, List<String>> checkImportClassPrefix(
+ List<String> prefixList, List<String> packageCheckList,
List<String> packageWhiteList) {
+ List<String> pathWhiteList =
+ packageWhiteList.stream()
+ .map(whitePackage -> whitePackage.replace(".",
isWindows ? "\\" : "/"))
+ .collect(Collectors.toList());
+ List<String> pathCheckList =
+ packageCheckList.stream()
+ .map(whitePackage -> whitePackage.replace(".",
isWindows ? "\\" : "/"))
+ .collect(Collectors.toList());
Map<String, List<String>> errorMap = new HashMap<>();
importsMap.forEach(
(clazzPath, imports) -> {
- boolean match =
- packageWhiteList.stream()
- .map(
- whitePackage ->
- whitePackage.replace(
- ".", isWindows ?
"\\" : "/"))
- .anyMatch(clazzPath::contains);
- if (!match) {
+ boolean match;
+ if (pathCheckList.isEmpty()) {
+ match =
pathWhiteList.stream().noneMatch(clazzPath::contains);
+ } else {
+ match =
+
pathCheckList.stream().anyMatch(clazzPath::contains)
+ &&
pathWhiteList.stream().noneMatch(clazzPath::contains);
+ }
+
+ if (match) {
List<String> collect =
imports.stream()
.filter(
@@ -156,11 +193,17 @@ public class ImportShadeClassCheckTest {
return errorMap;
}
- private String errorMsg(String checkType, Map<String, List<String>>
errorMap) {
+ private String shadeErrorMsg(String checkType, Map<String, List<String>>
errorMap) {
+ String msg =
+ String.format("%s shade is not up to code, need add prefix [",
checkType)
+ + SEATUNNEL_SHADE_PREFIX
+ + "]. \n";
+ return errorMsg(msg, errorMap);
+ }
+
+ private String errorMsg(String message, Map<String, List<String>>
errorMap) {
StringBuilder msg = new StringBuilder();
- msg.append(String.format("%s shade is not up to code, need add prefix
[", checkType))
- .append(SEATUNNEL_SHADE_PREFIX)
- .append("]. \n");
+ msg.append(message).append("\n");
errorMap.forEach(
(key, value) -> {
msg.append(key).append("\n");
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/UTClassNameCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/UTClassNameCheckTest.java
index 55a20057b4..4b8b1ffb18 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/UTClassNameCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/UTClassNameCheckTest.java
@@ -37,7 +37,7 @@ import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.seatunnel.api.ImportShadeClassCheckTest.isWindows;
+import static org.apache.seatunnel.api.ImportClassCheckTest.isWindows;
@Slf4j
public class UTClassNameCheckTest {
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 1b42994154..dde94542b3 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -393,6 +393,7 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
|| s.startsWith("seatunnel-coordinator-service")
|| s.startsWith("GC task thread")
|| s.contains("CompilerThread")
+ || s.startsWith("SeaTunnel-CompletableFuture-Thread-")
|| s.contains("NioNetworking-closeListenerExecutor")
|| s.contains("ForkJoinPool.commonPool")
|| s.contains("DestroyJavaVM")
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java
index 1f07cb9489..fbdceb2c13 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java
@@ -35,6 +35,7 @@ import
org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.core.job.ConnectorJarType;
import org.apache.seatunnel.engine.core.job.JobStatus;
@@ -68,7 +69,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 194da82987..4aad6b3819 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobStatus;
@@ -57,7 +58,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Spliterators;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
index cebbca9c46..20b02f48f3 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
@@ -16,13 +16,17 @@
package org.apache.seatunnel.engine.common.utils;
-import java.util.concurrent.CompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
/** A future which prevents completion by outside caller */
public class PassiveCompletableFuture<T> extends CompletableFuture<T> {
public PassiveCompletableFuture() {}
+ public PassiveCompletableFuture(java.util.concurrent.CompletableFuture<T>
chainedFuture) {
+ this(new CompletableFuture<>(chainedFuture));
+ }
+
public PassiveCompletableFuture(CompletableFuture<T> chainedFuture) {
if (chainedFuture != null) {
chainedFuture.whenComplete(
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/concurrent/CompletableFuture.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/concurrent/CompletableFuture.java
new file mode 100644
index 0000000000..b80f421b2c
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/concurrent/CompletableFuture.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.utils.concurrent;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** A {@link java.util.concurrent.CompletableFuture} with own executor. */
+public class CompletableFuture<T> extends
java.util.concurrent.CompletableFuture<T> {
+
+ public static final Executor EXECUTOR =
+ new ThreadPoolExecutor(
+ Math.min(8, Runtime.getRuntime().availableProcessors()),
+ Integer.MAX_VALUE,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new ThreadFactory() {
+ private final AtomicInteger seq = new AtomicInteger();
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread =
+ new Thread(
+ r,
+
"SeaTunnel-CompletableFuture-Thread-"
+ + seq.getAndIncrement());
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+
+ public CompletableFuture() {}
+
+ public CompletableFuture(java.util.concurrent.CompletableFuture<T> future)
{
+ future.whenComplete(
+ (value, ex) -> {
+ if (ex != null) {
+ super.completeExceptionally(ex);
+ } else {
+ super.complete(value);
+ }
+ });
+ }
+
+ public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
+ return new
CompletableFuture<>(java.util.concurrent.CompletableFuture.allOf(cfs));
+ }
+
+ public static CompletableFuture<Void>
allOf(java.util.concurrent.CompletableFuture<?>... cfs) {
+ return new
CompletableFuture<>(java.util.concurrent.CompletableFuture.allOf(cfs));
+ }
+
+ public boolean complete(T value) {
+ return super.complete(value);
+ }
+
+ public static <U> CompletableFuture<U> completedFuture(U value) {
+ return new CompletableFuture<>(
+ java.util.concurrent.CompletableFuture.completedFuture(value));
+ }
+
+ public static CompletableFuture<Void> runAsync(Runnable runnable) {
+ return new CompletableFuture<>(
+ java.util.concurrent.CompletableFuture.runAsync(runnable,
EXECUTOR));
+ }
+
+ public static CompletableFuture<Void> runAsync(Runnable runnable, Executor
executor) {
+ return new CompletableFuture<>(
+ java.util.concurrent.CompletableFuture.runAsync(runnable,
executor));
+ }
+
+ public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T>
fn) {
+ return new CompletableFuture<>(super.exceptionally(fn));
+ }
+
+ public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super
Throwable> action) {
+ return new CompletableFuture<>(super.whenComplete(action));
+ }
+
+ public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
+ return new CompletableFuture<>(super.thenAccept(action));
+ }
+
+ public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
+ return new CompletableFuture<>(
+ java.util.concurrent.CompletableFuture.supplyAsync(supplier,
EXECUTOR));
+ }
+
+ public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
+ return new CompletableFuture<>(
+ java.util.concurrent.CompletableFuture.supplyAsync(supplier,
executor));
+ }
+
+ public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U>
fn) {
+ return new CompletableFuture<>(super.thenApply(fn));
+ }
+
+ public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ?
extends U> fn) {
+ return new CompletableFuture<>(super.thenApplyAsync(fn, EXECUTOR));
+ }
+
+ public <U> CompletableFuture<U> thenApplyAsync(
+ Function<? super T, ? extends U> fn, Executor executor) {
+ return new CompletableFuture<>(super.thenApplyAsync(fn, executor));
+ }
+
+ public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ?
super Throwable> action) {
+ return new CompletableFuture<>(super.whenCompleteAsync(action,
EXECUTOR));
+ }
+
+ public CompletableFuture<T> whenCompleteAsync(
+ BiConsumer<? super T, ? super Throwable> action, Executor
executor) {
+ return new CompletableFuture<>(super.whenCompleteAsync(action,
executor));
+ }
+
+ public boolean completeExceptionally(Throwable ex) {
+ return super.completeExceptionally(ex);
+ }
+
+ public T get() throws InterruptedException, ExecutionException {
+ return super.get();
+ }
+
+ public T get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return super.get(timeout, unit);
+ }
+
+ public T join() {
+ return super.join();
+ }
+
+ public void obtrudeException(Throwable ex) {
+ super.obtrudeException(ex);
+ }
+
+ public void obtrudeValue(T value) {
+ super.obtrudeValue(value);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return super.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return super.isCancelled();
+ }
+
+ public boolean isDone() {
+ return super.isDone();
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/concurrent/CompletableFutureTest.java
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/concurrent/CompletableFutureTest.java
new file mode 100644
index 0000000000..c1d6f5acab
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/concurrent/CompletableFutureTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.utils.concurrent;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class CompletableFutureTest {
+
+ @Test
+ void testCompletableFuture() {
+ CompletableFuture<Integer> future = new CompletableFuture<>();
+ future.complete(1);
+ Assertions.assertEquals(1, future.join());
+ future = new CompletableFuture<>();
+ future.completeExceptionally(new RuntimeException());
+ Assertions.assertThrows(RuntimeException.class, future::join);
+ }
+
+ @Test
+ void testCompletedNormally() {
+ CompletableFuture<Integer> future = new CompletableFuture<>();
+ future.complete(1);
+ Assertions.assertTrue(future.isDone());
+ Assertions.assertFalse(future.isCompletedExceptionally());
+ Assertions.assertFalse(future.isCancelled());
+ }
+
+ @Test
+ void testAsyncMethodWithOwnExecutor() {
+ AtomicInteger value = new AtomicInteger(0);
+
Assertions.assertFalse(getThreads().contains("SeaTunnel-CompletableFuture-Thread-0"));
+ CompletableFuture.runAsync(value::getAndIncrement).join();
+
Assertions.assertTrue(getThreads().contains("SeaTunnel-CompletableFuture-Thread-0"));
+ Assertions.assertEquals(1, value.get());
+ CompletableFuture.allOf(
+ CompletableFuture.supplyAsync(
+ () -> {
+ value.getAndIncrement();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }),
+ CompletableFuture.supplyAsync(value::getAndIncrement))
+ .join();
+
Assertions.assertTrue(getThreads().contains("SeaTunnel-CompletableFuture-Thread-1"));
+ Assertions.assertEquals(3, value.get());
+ CompletableFuture.allOf(
+ getWhenCompleteAsync(value),
+ getWhenCompleteAsync(value),
+ getWhenCompleteAsync(value))
+ .join();
+
Assertions.assertTrue(getThreads().contains("SeaTunnel-CompletableFuture-Thread-2"));
+ Assertions.assertEquals(6, value.get());
+ CompletableFuture.allOf(
+ getThenApplyAsync(value),
+ getThenApplyAsync(value),
+ getThenApplyAsync(value),
+ getThenApplyAsync(value))
+ .join();
+
Assertions.assertTrue(getThreads().contains("SeaTunnel-CompletableFuture-Thread-3"));
+ Assertions.assertEquals(10, value.get());
+ }
+
+ private static CompletableFuture<Object>
getWhenCompleteAsync(AtomicInteger value) {
+ return CompletableFuture.completedFuture(null)
+ .whenCompleteAsync(
+ (aVoid, throwable) -> {
+ value.getAndIncrement();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ private static CompletableFuture<Object> getThenApplyAsync(AtomicInteger
value) {
+ return CompletableFuture.completedFuture(null)
+ .thenApplyAsync(
+ aVoid -> {
+ value.getAndIncrement();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+ }
+
+ private static Set<String> getThreads() {
+ return Thread.getAllStackTraces().keySet().stream()
+ .map(Thread::getName)
+ .collect(Collectors.toSet());
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
index f89c4bfcd4..1a3db81dfe 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
@@ -18,10 +18,9 @@
package org.apache.seatunnel.engine.core.checkpoint;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
-import java.util.concurrent.CompletableFuture;
-
/** A checkpoint ID counter. */
public interface CheckpointIDCounter {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 5b74a1f181..5a65f248ae 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -37,6 +37,7 @@ import
org.apache.seatunnel.engine.common.exception.JobNotFoundException;
import org.apache.seatunnel.engine.common.exception.SavePointFailedException;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.core.job.JobResult;
@@ -88,7 +89,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
index 8e69abd528..d66433df96 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
@@ -20,12 +20,14 @@ package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import
org.apache.seatunnel.engine.server.telemetry.metrics.ExportsInstanceInitializer;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.instance.impl.HazelcastInstanceProxy;
import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.util.ConcurrencyUtil;
import lombok.NonNull;
public class SeaTunnelServerStarter {
@@ -52,6 +54,10 @@ public class SeaTunnelServerStarter {
private static HazelcastInstanceImpl initializeHazelcastInstance(
@NonNull SeaTunnelConfig seaTunnelConfig, String
customInstanceName) {
+
+ // set the default async executor for Hazelcast InvocationFuture
+ ConcurrencyUtil.setDefaultAsyncExecutor(CompletableFuture.EXECUTOR);
+
boolean condition = checkTelemetryConfig(seaTunnelConfig);
String instanceName =
customInstanceName != null
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 785b2a9fc1..d42e95d480 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -31,6 +31,7 @@ import
org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.server.ThreadShareMode;
import org.apache.seatunnel.engine.common.exception.JobNotFoundException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import
org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
@@ -82,7 +83,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 5a69a065bb..b221b00651 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
@@ -61,7 +62,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 0bede51c70..2cfef3fce3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -28,6 +28,7 @@ import
org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.FactoryUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.job.Job;
@@ -55,7 +56,6 @@ import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
index 950e0be5ea..ad00f9834a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.checkpoint;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
@@ -28,7 +29,6 @@ import com.hazelcast.spi.impl.NodeEngine;
import java.nio.ByteBuffer;
import java.util.Base64;
-import java.util.concurrent.CompletableFuture;
import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
index 4b35f3ffab..496f3313c3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.checkpoint;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -34,7 +35,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
public class PendingCheckpoint implements Checkpoint {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
index 97c7c63614..ca65a23626 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
@@ -17,10 +17,10 @@
package org.apache.seatunnel.engine.server.checkpoint;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index f8039cf649..9455d833e9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
@@ -39,7 +40,6 @@ import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index abc82cfae6..bcc49daae2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
@@ -77,7 +78,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 7f75f489e0..b65b3cc56a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -26,6 +26,7 @@ import
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.exception.TaskGroupDeployException;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
@@ -58,7 +59,6 @@ import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
index 1a9bc27984..c5db9909a5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.master.JobMaster;
import
org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException;
@@ -31,7 +32,6 @@ import lombok.NonNull;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
public class ResourceUtils {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 2d74b4c928..c1f4ca9121 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
@@ -41,7 +42,6 @@ import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index db3d7b1d21..46883fdfcf 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -40,6 +40,7 @@ import
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
@@ -96,7 +97,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobCheckpointOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobCheckpointOperation.java
index e32ffccaab..38da14ca44 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobCheckpointOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobCheckpointOperation.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.operation;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
@@ -29,7 +30,6 @@ import com.hazelcast.spi.impl.AllowedDuringPassiveState;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class GetJobCheckpointOperation extends Operation
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobDetailStatusOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobDetailStatusOperation.java
index 2831a78b16..90700888b5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobDetailStatusOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobDetailStatusOperation.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.operation;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
@@ -28,7 +29,6 @@ import com.hazelcast.spi.impl.AllowedDuringPassiveState;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class GetJobDetailStatusOperation extends Operation
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobInfoOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobInfoOperation.java
index 60bbf2d178..1e52161f8d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobInfoOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobInfoOperation.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.operation;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
@@ -29,7 +30,6 @@ import com.hazelcast.spi.impl.AllowedDuringPassiveState;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class GetJobInfoOperation extends Operation
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
index 0668647381..0534c29f9b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.operation;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
@@ -27,7 +28,6 @@ import com.hazelcast.spi.impl.AllowedDuringPassiveState;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static
org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJsonString;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
index f0958a95e8..c70ad6453a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.operation;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
@@ -28,7 +29,6 @@ import com.hazelcast.spi.impl.AllowedDuringPassiveState;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class GetJobStatusOperation extends Operation
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java
index 9277c324f3..cd137b89d1 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.operation;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
@@ -27,7 +28,6 @@ import com.hazelcast.spi.impl.AllowedDuringPassiveState;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static
org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJsonString;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/ListJobStatusOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/ListJobStatusOperation.java
index e401fcab65..ffa2190c54 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/ListJobStatusOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/ListJobStatusOperation.java
@@ -18,12 +18,12 @@
package org.apache.seatunnel.engine.server.operation;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import com.hazelcast.spi.impl.AllowedDuringPassiveState;
import com.hazelcast.spi.impl.operationservice.Operation;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ListJobStatusOperation extends Operation implements
AllowedDuringPassiveState {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/UploadConnectorJarOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/UploadConnectorJarOperation.java
index b2c45fbf77..9ffd63ecf7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/UploadConnectorJarOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/UploadConnectorJarOperation.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.operation;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
@@ -30,7 +31,6 @@ import
com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class UploadConnectorJarOperation extends Operation implements
IdentifiedDataSerializable {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index 6c04748ccc..6d44bda504 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.resourcemanager;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.SyncWorkerProfileOperation;
@@ -39,7 +40,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
@@ -176,7 +176,8 @@ public abstract class AbstractResourceManager implements
ResourceManager {
}
protected <E> CompletableFuture<E> sendToMember(Operation operation,
Address address) {
- return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation,
address);
+ return new CompletableFuture<>(
+ NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
operation, address));
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
index 0911345eb2..31a5f5922e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.resourcemanager;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
@@ -25,7 +26,6 @@ import com.hazelcast.internal.services.MembershipServiceEvent;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
public interface ResourceManager {
void init();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
index 5003f4a4ef..107b0190d0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.resourcemanager;
import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.engine.common.runtime.DeployType;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -36,7 +37,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/ThirdPartyResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/ThirdPartyResourceManager.java
index 7db839b19f..cd181f385a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/ThirdPartyResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/ThirdPartyResourceManager.java
@@ -17,10 +17,9 @@
package org.apache.seatunnel.engine.server.resourcemanager.thirdparty;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
-import java.util.concurrent.CompletableFuture;
-
public interface ThirdPartyResourceManager {
CompletableFuture<CreateWorkerResult> createNewWorker(ResourceProfile
resourceProfile);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/kubernetes/KubernetesResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/kubernetes/KubernetesResourceManager.java
index abfef1175b..f6e715f96e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/kubernetes/KubernetesResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/kubernetes/KubernetesResourceManager.java
@@ -18,6 +18,7 @@
package
org.apache.seatunnel.engine.server.resourcemanager.thirdparty.kubernetes;
import org.apache.seatunnel.engine.common.config.EngineConfig;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import
org.apache.seatunnel.engine.server.resourcemanager.AbstractResourceManager;
import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import
org.apache.seatunnel.engine.server.resourcemanager.thirdparty.CreateWorkerResult;
@@ -25,8 +26,6 @@ import
org.apache.seatunnel.engine.server.resourcemanager.thirdparty.ThirdPartyR
import com.hazelcast.spi.impl.NodeEngine;
-import java.util.concurrent.CompletableFuture;
-
public class KubernetesResourceManager extends AbstractResourceManager
implements ThirdPartyResourceManager {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/yarn/YarnResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/yarn/YarnResourceManager.java
index 489135eef5..287d7500a0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/yarn/YarnResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/thirdparty/yarn/YarnResourceManager.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.resourcemanager.thirdparty.yarn;
import org.apache.seatunnel.engine.common.config.EngineConfig;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import
org.apache.seatunnel.engine.server.resourcemanager.AbstractResourceManager;
import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import
org.apache.seatunnel.engine.server.resourcemanager.thirdparty.CreateWorkerResult;
@@ -25,8 +26,6 @@ import
org.apache.seatunnel.engine.server.resourcemanager.thirdparty.ThirdPartyR
import com.hazelcast.spi.impl.NodeEngine;
-import java.util.concurrent.CompletableFuture;
-
public class YarnResourceManager extends AbstractResourceManager
implements ThirdPartyResourceManager {
public YarnResourceManager(NodeEngine nodeEngine, EngineConfig
engineConfig) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
index 7744ccd4ee..ade0b8f975 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.task;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import
org.apache.seatunnel.engine.server.checkpoint.operation.TaskReportStatusOperation;
import org.apache.seatunnel.engine.server.execution.ProgressState;
@@ -32,7 +33,6 @@ import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 90b05d8704..e04e12d69f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.tracing.MDCTracer;
import org.apache.seatunnel.common.utils.function.ConsumerWithException;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
@@ -75,7 +76,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 8b6f66316d..d2ebb5f14e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.task;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
@@ -49,7 +50,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index dbcde3e9d6..ba02ec8b72 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
import
org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
@@ -42,7 +43,6 @@ import lombok.NonNull;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class SourceSeaTunnelTask<T, SplitT extends SourceSplit> extends
SeaTunnelTask {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java
index 9010a07a58..5f88a692b9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TransformSeaTunnelTask.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
@@ -33,8 +34,6 @@ import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.NonNull;
-import java.util.concurrent.CompletableFuture;
-
public class TransformSeaTunnelTask extends SeaTunnelTask {
private static final ILogger LOGGER =
Logger.getLogger(TransformSeaTunnelTask.class);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
index af8044e60b..b1bb803c80 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
@@ -17,13 +17,13 @@
package org.apache.seatunnel.engine.server.task.flow;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import lombok.Getter;
import lombok.Setter;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
public class AbstractFlowLifeCycle implements FlowLifeCycle {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ActionFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ActionFlowLifeCycle.java
index fac2ff16e5..426a1a1bf7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ActionFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ActionFlowLifeCycle.java
@@ -17,12 +17,11 @@
package org.apache.seatunnel.engine.server.task.flow;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.server.checkpoint.Stateful;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
-import java.util.concurrent.CompletableFuture;
-
public abstract class ActionFlowLifeCycle extends AbstractFlowLifeCycle
implements Stateful {
protected Action action;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
index 8765f49b98..3b8df128f0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
@@ -19,11 +19,11 @@ package org.apache.seatunnel.engine.server.task.flow;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import
org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
public class IntermediateQueueFlowLifeCycle<T extends
AbstractIntermediateQueue<?>>
extends AbstractFlowLifeCycle
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
index 78e19328ac..cba7537c68 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.task.flow;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
@@ -35,7 +36,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
@SuppressWarnings("MagicNumber")
@Slf4j
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
index 87b29eab21..654cfff67b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.task.flow;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
@@ -34,7 +35,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
@Slf4j
@SuppressWarnings("MagicNumber")
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 202f67bd5c..3f47ee84c3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -32,6 +32,7 @@ import
org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
@@ -60,7 +61,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index 6c596da0c3..7843eff7f8 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -57,7 +58,6 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
index 632e529169..8eaf196358 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.transform.Collector;
import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
@@ -38,7 +39,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
@Slf4j
public class TransformFlowLifeCycle<T> extends ActionFlowLifeCycle
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
index 2053ca01a8..067d3e83d5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
@@ -25,6 +25,8 @@ import
org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
@@ -82,6 +84,40 @@ public class CoordinatorServiceTest {
instance2.shutdown();
}
+ @Test
+ public void testInvocationFutureUseCompletableFutureExecutor() {
+ HazelcastInstanceImpl instance =
+ SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(
+
"CoordinatorServiceTest_testInvocationFutureUseCompletableFutureExecutor"));
+
+ NodeEngineUtil.sendOperationToMemberNode(
+ instance.node.getNodeEngine(),
+ new PrintMessageOperation("hello"),
+ instance.getCluster().getLocalMember().getAddress())
+ .whenComplete(
+ (aVoid, error) -> {
+ Assertions.assertTrue(
+ Thread.currentThread()
+ .getName()
+
.startsWith("SeaTunnel-CompletableFuture-Thread"));
+ })
+ .join();
+
+ NodeEngineUtil.sendOperationToMasterNode(
+ instance.node.getNodeEngine(), new
PrintMessageOperation("hello"))
+ .whenCompleteAsync(
+ (aVoid, error) -> {
+ Assertions.assertTrue(
+ Thread.currentThread()
+ .getName()
+
.startsWith("SeaTunnel-CompletableFuture-Thread"));
+ })
+ .join();
+
+ instance.shutdown();
+ }
+
@Test
public void testClearCoordinatorService() {
HazelcastInstanceImpl coordinatorServiceTest =
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 6fff7b9337..86ab48b770 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.server.execution.BlockTask;
import org.apache.seatunnel.engine.server.execution.ExceptionTestTask;
import org.apache.seatunnel.engine.server.execution.FixedCallTestTimeTask;
@@ -52,7 +53,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
index 63e1277827..7b2c84655a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorag
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.utils.FactoryUtil;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
@@ -33,7 +34,6 @@ import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
index 0118c15879..7755effb74 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.resourcemanager;
import org.apache.seatunnel.engine.common.config.EngineConfig;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -30,7 +31,6 @@ import com.hazelcast.spi.impl.operationservice.Operation;
import java.net.UnknownHostException;
import java.util.Collections;
-import java.util.concurrent.CompletableFuture;
/** Used to test ResourceManager, override init method to register more
workers. */
public class FakeResourceManager extends AbstractResourceManager {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java
index 1dc427e4f8..40a239b375 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.resourcemanager;
import org.apache.seatunnel.engine.common.config.EngineConfig;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -32,7 +33,6 @@ import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
/** Used to test ResourceManager, override init method to register more
workers. */
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueueTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueueTest.java
index 6716f5a215..f298fd164a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueueTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueueTest.java
@@ -17,11 +17,12 @@
package org.apache.seatunnel.engine.server.utils;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;