This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c16cc2d11 [engine][checkpoint] future and thread pool resource 
recovery (#3072)
c16cc2d11 is described below

commit c16cc2d11fef7c78b4785804db831f88352280b8
Author: Zongwen Li <[email protected]>
AuthorDate: Wed Oct 12 14:56:23 2022 +0800

    [engine][checkpoint] future and thread pool resource recovery (#3072)
---
 LICENSE                                            |  9 +-
 .../engine/core/checkpoint/CheckpointType.java     |  2 +-
 .../server/checkpoint/CheckpointCoordinator.java   | 96 +++++++++++++---------
 .../server/checkpoint/CheckpointException.java     | 54 ++++++++++++
 .../CheckpointFailureReason.java}                  | 33 ++++----
 .../server/checkpoint/CheckpointManager.java       | 33 +++++---
 .../server/checkpoint/PendingCheckpoint.java       | 40 +++++++--
 .../server/task/SourceSplitEnumeratorTask.java     |  4 +-
 .../engine/server/task/record/Barrier.java         | 11 ++-
 .../engine/server/TaskExecutionServiceTest.java    | 25 ++----
 10 files changed, 203 insertions(+), 104 deletions(-)

diff --git a/LICENSE b/LICENSE
index b8d8a2446..2927f5026 100644
--- a/LICENSE
+++ b/LICENSE
@@ -209,7 +209,8 @@ The text of each license is the standard Apache 2.0 license.
 
 tools/dependencies/checkLicense.sh files from 
https://github.com/apache/skywalking
 mvnw files from https://github.com/apache/maven-wrapper Apache 2.0
-seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/RowKind.java 
from  https://github.com/apache/flink
+seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/RowKind.java 
from https://github.com/apache/flink
+seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java
 from https://github.com/apache/flink
 
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/
    from  https://github.com/lightbend/config
 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/
  from https://github.com/apache/flink
 
seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/
   from https://github.com/apache/iceberg
@@ -224,5 +225,7 @@ 
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engi
 
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
                                    from https://github.com/apache/flink
 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java
                     from https://github.com/apache/flink
 
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
               from https://github.com/hazelcast/hazelcast
-
-
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointException.java
               from https://github.com/apache/flink
+seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
                   from https://github.com/apache/flink
+seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/InternalCheckpointListener.java
            from https://github.com/apache/flink
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
     from https://github.com/apache/flink
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
index b6a9818a8..fe30dc491 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
@@ -32,7 +32,7 @@ public enum CheckpointType {
     /**
      * Automatically triggered by the Task.
      */
-    AUTO_SAVEPOINT_TYPE(true, "auto-savepoint");
+    COMPLETED_POINT_TYPE(true, "completed-point");
 
     private final boolean auto;
     private final String name;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 6fb96a950..6525e1beb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.engine.server.checkpoint;
 
 import static 
org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
-import static 
org.apache.seatunnel.engine.core.checkpoint.CheckpointType.AUTO_SAVEPOINT_TYPE;
+import static 
org.apache.seatunnel.engine.core.checkpoint.CheckpointType.COMPLETED_POINT_TYPE;
 import static 
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.READY_START;
 
 import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
@@ -100,7 +100,7 @@ public class CheckpointCoordinator {
 
     private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
 
-    private CompletedCheckpoint latestCompletedCheckpoint;
+    private volatile CompletedCheckpoint latestCompletedCheckpoint;
 
     private final CheckpointCoordinatorConfiguration coordinatorConfig;
 
@@ -113,7 +113,9 @@ public class CheckpointCoordinator {
 
     private final Object lock = new Object();
 
-    private final Object autoSavepointLock = new Object();
+    /** Flag marking the coordinator as shut down (not accepting any messages 
any more). */
+    private volatile boolean shutdown;
+
     public CheckpointCoordinator(CheckpointManager manager,
                                  CheckpointStorage checkpointStorage,
                                  CheckpointStorageConfiguration storageConfig,
@@ -221,6 +223,14 @@ public class CheckpointCoordinator {
         }
     }
 
+    private boolean canTriggered() {
+        return !isCompleted() && !isShutdown();
+    }
+
+    public boolean isShutdown() {
+        return shutdown;
+    }
+
     public static Map<Long, Integer> getPipelineTasks(Set<TaskLocation> 
pipelineSubtasks) {
         return pipelineSubtasks.stream()
             .collect(Collectors.groupingBy(TaskLocation::getTaskVertexId, 
Collectors.toList()))
@@ -228,16 +238,20 @@ public class CheckpointCoordinator {
             .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
entry.getValue().size()));
     }
 
-    public PassiveCompletableFuture<PendingCheckpoint> startSavepoint() {
+    public PassiveCompletableFuture<CompletedCheckpoint> startSavepoint() {
         CompletableFuture<PendingCheckpoint> savepoint = 
createPendingCheckpoint(Instant.now().toEpochMilli(), 
CheckpointType.SAVEPOINT_TYPE);
         startTriggerPendingCheckpoint(savepoint);
-        return new PassiveCompletableFuture<>(savepoint);
+        return savepoint.join().getCompletableFuture();
     }
 
     private void 
startTriggerPendingCheckpoint(CompletableFuture<PendingCheckpoint> 
pendingCompletableFuture) {
-        // Trigger the barrier and wait for all tasks to ACK
         pendingCompletableFuture.thenAcceptAsync(pendingCheckpoint -> {
-            if (AUTO_SAVEPOINT_TYPE != pendingCheckpoint.getCheckpointType()) {
+            LOG.debug("wait checkpoint completed: " + pendingCheckpoint);
+            PassiveCompletableFuture<CompletedCheckpoint> completableFuture = 
pendingCheckpoint.getCompletableFuture();
+            completableFuture.thenAcceptAsync(this::completePendingCheckpoint);
+
+            if (COMPLETED_POINT_TYPE != pendingCheckpoint.getCheckpointType()) 
{
+                // Trigger the barrier and wait for all tasks to ACK
                 LOG.debug("trigger checkpoint barrier" + pendingCheckpoint);
                 CompletableFuture.supplyAsync(() ->
                         new 
CheckpointBarrier(pendingCheckpoint.getCheckpointId(),
@@ -246,18 +260,13 @@ public class CheckpointCoordinator {
                     .thenApplyAsync(this::triggerCheckpoint)
                     .thenApplyAsync(invocationFutures -> 
CompletableFuture.allOf(invocationFutures).join());
             }
-            LOG.debug("wait checkpoint completed: " + pendingCheckpoint);
-            pendingCheckpoint.getCompletableFuture()
-                .thenAcceptAsync(this::completePendingCheckpoint);
-        });
 
-        // If any task is not acked within the checkpoint timeout
-        pendingCompletableFuture.thenAcceptAsync(pendingCheckpoint -> {
             LOG.debug("Start a scheduled task to prevent checkpoint timeouts");
             scheduler.schedule(() -> {
+                    // If any task is not acked within the checkpoint timeout
                     if 
(pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null && 
!pendingCheckpoint.isFullyAcknowledged()) {
                         if (tolerableFailureCheckpoints-- <= 0) {
-                            cleanPendingCheckpoint();
+                            
cleanPendingCheckpoint(CheckpointFailureReason.CHECKPOINT_EXPIRED);
                             // TODO: notify job master to restore the pipeline.
                         }
                     }
@@ -268,22 +277,24 @@ public class CheckpointCoordinator {
     }
 
     CompletableFuture<PendingCheckpoint> createPendingCheckpoint(long 
triggerTimestamp, CheckpointType checkpointType) {
-        CompletableFuture<Long> idFuture = CompletableFuture.supplyAsync(() -> 
{
-            try {
-                // this must happen outside the coordinator-wide lock,
-                // because it communicates with external services
-                // (in HA mode) and may block for a while.
-                return checkpointIdCounter.getAndIncrement();
-            } catch (Throwable e) {
-                throw new CompletionException(e);
-            }
-        });
-        return createPendingCheckpoint(triggerTimestamp, idFuture, 
checkpointType);
+        synchronized (lock) {
+            CompletableFuture<Long> idFuture = 
CompletableFuture.supplyAsync(() -> {
+                try {
+                    // this must happen outside the coordinator-wide lock,
+                    // because it communicates with external services
+                    // (in HA mode) and may block for a while.
+                    return checkpointIdCounter.getAndIncrement();
+                } catch (Throwable e) {
+                    throw new CompletionException(e);
+                }
+            });
+            return triggerPendingCheckpoint(triggerTimestamp, idFuture, 
checkpointType);
+        }
     }
 
-    CompletableFuture<PendingCheckpoint> createPendingCheckpoint(long 
triggerTimestamp, CompletableFuture<Long> idFuture, CheckpointType 
checkpointType) {
+    CompletableFuture<PendingCheckpoint> triggerPendingCheckpoint(long 
triggerTimestamp, CompletableFuture<Long> idFuture, CheckpointType 
checkpointType) {
+        assert Thread.holdsLock(lock);
         latestTriggerTimestamp.set(triggerTimestamp);
-        CompletableFuture<PendingCheckpoint> completableFuture = new 
CompletableFuture<>();
         return idFuture.thenApplyAsync(checkpointId ->
             new PendingCheckpoint(this.jobId,
                 this.plan.getPipelineId(),
@@ -292,8 +303,7 @@ public class CheckpointCoordinator {
                 checkpointType,
                 getNotYetAcknowledgedTasks(),
                 getTaskStatistics(),
-                getActionStates(),
-                completableFuture)
+                getActionStates())
         ).thenApplyAsync(pendingCheckpoint -> {
             pendingCheckpoints.put(pendingCheckpoint.getCheckpointId(), 
pendingCheckpoint);
             return pendingCheckpoint;
@@ -335,9 +345,13 @@ public class CheckpointCoordinator {
             .toArray(InvocationFuture[]::new);
     }
 
-    protected void cleanPendingCheckpoint() {
+    protected void cleanPendingCheckpoint(CheckpointFailureReason 
failureReason) {
+        pendingCheckpoints.values().forEach(pendingCheckpoint ->
+            pendingCheckpoint.abortCheckpoint(failureReason, null)
+        );
         // TODO: clear related future & scheduler task
         pendingCheckpoints.clear();
+        scheduler.shutdown();
     }
 
     protected void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
@@ -345,13 +359,13 @@ public class CheckpointCoordinator {
         final PendingCheckpoint pendingCheckpoint = 
pendingCheckpoints.get(checkpointId);
         TaskLocation location = ackOperation.getTaskLocation();
         LOG.debug("task[{}]({}/{}) ack. {}", location.getTaskID(), 
location.getPipelineId(), location.getJobId(), 
ackOperation.getBarrier().toString());
-        if (checkpointId == Barrier.PREPARE_CLOSE_BARRIER_ID) {
-            synchronized (autoSavepointLock) {
-                if (pendingCheckpoints.get(checkpointId) == null) {
-                    CompletableFuture<PendingCheckpoint> future = 
createPendingCheckpoint(
+        if (ackOperation.getBarrier().getCheckpointType() == 
COMPLETED_POINT_TYPE) {
+            synchronized (lock) {
+                if (pendingCheckpoints.get(Barrier.PREPARE_CLOSE_BARRIER_ID) 
== null) {
+                    CompletableFuture<PendingCheckpoint> future = 
triggerPendingCheckpoint(
                         Instant.now().toEpochMilli(),
                         
CompletableFuture.completedFuture(Barrier.PREPARE_CLOSE_BARRIER_ID),
-                        AUTO_SAVEPOINT_TYPE);
+                        COMPLETED_POINT_TYPE);
                     startTriggerPendingCheckpoint(future);
                     future.join();
                 }
@@ -370,11 +384,10 @@ public class CheckpointCoordinator {
                 SubtaskStatus.RUNNING);
     }
 
-    public void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) 
{
-        LOG.info("pending checkpoint({}/{}@{}) completed!", 
pendingCheckpoint.getCheckpointId(), pendingCheckpoint.getPipelineId(), 
pendingCheckpoint.getJobId());
+    public void completePendingCheckpoint(CompletedCheckpoint 
completedCheckpoint) {
+        LOG.info("pending checkpoint({}/{}@{}) completed!", 
completedCheckpoint.getCheckpointId(), completedCheckpoint.getPipelineId(), 
completedCheckpoint.getJobId());
         pendingCounter.decrementAndGet();
-        final long checkpointId = pendingCheckpoint.getCheckpointId();
-        CompletedCheckpoint completedCheckpoint = 
pendingCheckpoint.toCompletedCheckpoint();
+        final long checkpointId = completedCheckpoint.getCheckpointId();
         pendingCheckpoints.remove(checkpointId);
         if (pendingCheckpoints.size() + 1 == 
coordinatorConfig.getMaxConcurrentCheckpoints()) {
             // latest checkpoint completed time > checkpoint interval
@@ -403,6 +416,9 @@ public class CheckpointCoordinator {
         CompletableFuture.allOf(invocationFutures).join();
         // TODO: notifyCheckpointCompleted fail
         latestCompletedCheckpoint = completedCheckpoint;
+        if (isCompleted()) {
+            
cleanPendingCheckpoint(CheckpointFailureReason.CHECKPOINT_COORDINATOR_COMPLETED);
+        }
     }
 
     public InvocationFuture<?>[] notifyCheckpointCompleted(long checkpointId) {
@@ -417,6 +433,6 @@ public class CheckpointCoordinator {
         if (latestCompletedCheckpoint == null) {
             return false;
         }
-        return latestCompletedCheckpoint.getCheckpointType() == 
AUTO_SAVEPOINT_TYPE;
+        return latestCompletedCheckpoint.getCheckpointType() == 
COMPLETED_POINT_TYPE;
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointException.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointException.java
new file mode 100644
index 000000000..32832f900
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/** Base class for checkpoint related exceptions. */
+public class CheckpointException extends Exception {
+
+    private static final long serialVersionUID = 3257526119022486948L;
+
+    private final CheckpointFailureReason checkpointFailureReason;
+
+    public CheckpointException(CheckpointFailureReason failureReason) {
+        super(failureReason.message());
+        this.checkpointFailureReason = checkNotNull(failureReason);
+    }
+
+    public CheckpointException(String message, CheckpointFailureReason 
failureReason) {
+        super(message + " Failure reason: " + failureReason.message());
+        this.checkpointFailureReason = checkNotNull(failureReason);
+    }
+
+    public CheckpointException(CheckpointFailureReason failureReason, 
Throwable cause) {
+        super(failureReason.message(), cause);
+        this.checkpointFailureReason = checkNotNull(failureReason);
+    }
+
+    public CheckpointException(
+            String message, CheckpointFailureReason failureReason, Throwable 
cause) {
+        super(message + " Failure reason: " + failureReason.message(), cause);
+        this.checkpointFailureReason = checkNotNull(failureReason);
+    }
+
+    public CheckpointFailureReason getCheckpointFailureReason() {
+        return checkpointFailureReason;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
similarity index 60%
copy from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
copy to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
index 322404baa..8d4276f5e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
@@ -15,27 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.task.record;
+package org.apache.seatunnel.engine.server.checkpoint;
 
-/**
- * barrier flowing in data flow
- */
-public interface Barrier {
-    Long PREPARE_CLOSE_BARRIER_ID = 0L;
+public enum CheckpointFailureReason {
+
+    TASK_FAILURE("Task has failed."),
+    CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
+    CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
+    CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown.");
 
-    /**
-     * The ID of the barrier.
-     */
-    long getId();
+    private final String message;
 
-    /**
-     * Whether the task needs to perform a status snapshot after the barrier 
is aligned.
-     * For example, DDL barrier does not require a snapshot.
-     */
-    boolean snapshot();
+    CheckpointFailureReason(String message) {
+        this.message = message;
+    }
 
-    /**
-     * Barrier indicating that the task should prepare to close.
-     */
-    boolean prepareClose();
+    public String message() {
+        return message;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index cde298c41..b62e1a454 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -38,6 +38,7 @@ import 
org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 import com.hazelcast.cluster.Address;
 import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -50,6 +51,7 @@ import java.util.stream.Collectors;
  * Maintain the life cycle of the {@link CheckpointCoordinator} through the 
{@link CheckpointPlan} and the status of the job.
  * </p>
  */
+@Slf4j
 public class CheckpointManager {
 
     private final Long jobId;
@@ -91,7 +93,7 @@ public class CheckpointManager {
      * <br> After the savepoint is triggered, it will cause the job to stop 
automatically.
      */
     @SuppressWarnings("unchecked")
-    public PassiveCompletableFuture<PendingCheckpoint>[] triggerSavepoints() {
+    public PassiveCompletableFuture<CompletedCheckpoint>[] triggerSavepoints() 
{
         return coordinatorMap.values()
             .parallelStream()
             .map(CheckpointCoordinator::startSavepoint)
@@ -102,12 +104,20 @@ public class CheckpointManager {
      * Called by the JobMaster, actually triggered by the user.
      * <br> After the savepoint is triggered, it will cause the pipeline to 
stop automatically.
      */
-    public PassiveCompletableFuture<PendingCheckpoint> triggerSavepoint(int 
pipelineId) {
-        return coordinatorMap.get(pipelineId).startSavepoint();
+    public PassiveCompletableFuture<CompletedCheckpoint> triggerSavepoint(int 
pipelineId) {
+        return getCheckpointCoordinator(pipelineId).startSavepoint();
     }
 
     private CheckpointCoordinator getCheckpointCoordinator(TaskLocation 
taskLocation) {
-        return coordinatorMap.get(taskLocation.getPipelineId());
+        return getCheckpointCoordinator(taskLocation.getPipelineId());
+    }
+
+    private CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
+        CheckpointCoordinator coordinator = coordinatorMap.get(pipelineId);
+        if (coordinator == null) {
+            throw new RuntimeException(String.format("The checkpoint 
coordinator(%s) don't exist", pipelineId));
+        }
+        return coordinator;
     }
 
     /**
@@ -131,7 +141,7 @@ public class CheckpointManager {
         switch (executionState) {
             case FAILED:
             case CANCELED:
-                
coordinatorMap.get(groupLocation.getPipelineId()).cleanPendingCheckpoint();
+                
getCheckpointCoordinator(groupLocation.getPipelineId()).cleanPendingCheckpoint(CheckpointFailureReason.TASK_FAILURE);
                 return;
             default:
         }
@@ -142,11 +152,7 @@ public class CheckpointManager {
      * <br> Returns whether the pipeline has completed; No need to 
deploy/restore the {@link SubPlan} if the pipeline has been completed;
      */
     public boolean isCompletedPipeline(int pipelineId) {
-        CheckpointCoordinator coordinator = coordinatorMap.get(pipelineId);
-        if (coordinator == null) {
-            throw new RuntimeException(String.format("The checkpoint 
coordinator(%s) don't exist", pipelineId));
-        }
-        return coordinator.isCompleted();
+        return getCheckpointCoordinator(pipelineId).isCompleted();
     }
 
     /**
@@ -154,7 +160,12 @@ public class CheckpointManager {
      * <br> used for the ack of the checkpoint, including the state snapshot 
of all {@link Action} within the {@link Task}.
      */
     public void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
-        
getCheckpointCoordinator(ackOperation.getTaskLocation()).acknowledgeTask(ackOperation);
+        CheckpointCoordinator coordinator = 
getCheckpointCoordinator(ackOperation.getTaskLocation());
+        if (coordinator.isCompleted()) {
+            log.info("The checkpoint coordinator({}) is completed", 
ackOperation.getTaskLocation().getPipelineId());
+            return;
+        }
+        coordinator.acknowledgeTask(ackOperation);
     }
 
     protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation 
operation) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
index 8432d0d57..91cb9013a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
@@ -22,7 +22,8 @@ import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 
-import lombok.AllArgsConstructor;
+import com.beust.jcommander.internal.Nullable;
+import lombok.Getter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,7 +33,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
-@AllArgsConstructor
 public class PendingCheckpoint implements Checkpoint {
     private static final Logger LOG = 
LoggerFactory.getLogger(PendingCheckpoint.class);
     private final long jobId;
@@ -51,7 +51,29 @@ public class PendingCheckpoint implements Checkpoint {
 
     private final Map<Long, ActionState> actionStates;
 
-    private final CompletableFuture<PendingCheckpoint> completableFuture;
+    private final CompletableFuture<CompletedCheckpoint> completableFuture;
+
+    @Getter
+    private CheckpointException failureCause;
+
+    public PendingCheckpoint(long jobId,
+                             int pipelineId,
+                             long checkpointId,
+                             long triggerTimestamp,
+                             CheckpointType checkpointType,
+                             Set<Long> notYetAcknowledgedTasks,
+                             Map<Long, TaskStatistics> taskStatistics,
+                             Map<Long, ActionState> actionStates) {
+        this.jobId = jobId;
+        this.pipelineId = pipelineId;
+        this.checkpointId = checkpointId;
+        this.triggerTimestamp = triggerTimestamp;
+        this.checkpointType = checkpointType;
+        this.notYetAcknowledgedTasks = notYetAcknowledgedTasks;
+        this.taskStatistics = taskStatistics;
+        this.actionStates = actionStates;
+        this.completableFuture = new CompletableFuture<>();
+    }
 
     @Override
     public long getCheckpointId() {
@@ -86,7 +108,7 @@ public class PendingCheckpoint implements Checkpoint {
         return actionStates;
     }
 
-    public PassiveCompletableFuture<PendingCheckpoint> getCompletableFuture() {
+    public PassiveCompletableFuture<CompletedCheckpoint> 
getCompletableFuture() {
         return new PassiveCompletableFuture<>(completableFuture);
     }
 
@@ -114,7 +136,7 @@ public class PendingCheckpoint implements Checkpoint {
 
         if (isFullyAcknowledged()) {
             LOG.debug("checkpoint is full ack!");
-            completableFuture.complete(this);
+            completableFuture.complete(toCompletedCheckpoint());
         }
     }
 
@@ -122,7 +144,7 @@ public class PendingCheckpoint implements Checkpoint {
         return notYetAcknowledgedTasks.size() == 0;
     }
 
-    public CompletedCheckpoint toCompletedCheckpoint() {
+    private CompletedCheckpoint toCompletedCheckpoint() {
         return new CompletedCheckpoint(
             jobId,
             pipelineId,
@@ -133,4 +155,10 @@ public class PendingCheckpoint implements Checkpoint {
             actionStates,
             taskStatistics);
     }
+
+    public void abortCheckpoint(CheckpointFailureReason failureReason,
+                                @Nullable Throwable cause) {
+        this.failureCause = new CheckpointException(failureReason, cause);
+        completableFuture.completeExceptionally(failureCause);
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index ebc253c6a..83fb06b82 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -29,7 +29,6 @@ import static 
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTask
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
@@ -51,7 +50,6 @@ import lombok.NonNull;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URL;
-import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -233,7 +231,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
             case RUNNING:
                 // The reader closes automatically after reading
                 if (prepareCloseStatus) {
-                    triggerBarrier(new 
CheckpointBarrier(Barrier.PREPARE_CLOSE_BARRIER_ID, 
Instant.now().toEpochMilli(), CheckpointType.AUTO_SAVEPOINT_TYPE));
+                    triggerBarrier(Barrier.completedBarrier());
                     currState = PREPARE_CLOSE;
                 }
                 break;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
index 322404baa..5985fd2f4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
@@ -17,11 +17,16 @@
 
 package org.apache.seatunnel.engine.server.task.record;
 
+import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
+
+import java.time.Instant;
+
 /**
  * barrier flowing in data flow
  */
 public interface Barrier {
-    Long PREPARE_CLOSE_BARRIER_ID = 0L;
+    Long PREPARE_CLOSE_BARRIER_ID = Long.MAX_VALUE;
 
     /**
      * The ID of the barrier.
@@ -38,4 +43,8 @@ public interface Barrier {
      * Barrier indicating that the task should prepare to close.
      */
     boolean prepareClose();
+
+    static CheckpointBarrier completedBarrier() {
+        return new CheckpointBarrier(Barrier.PREPARE_CLOSE_BARRIER_ID, 
Instant.now().toEpochMilli(), CheckpointType.COMPLETED_POINT_TYPE);
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 80d1cb42e..f12bff8cb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -36,6 +36,7 @@ import org.apache.seatunnel.engine.server.execution.TestTask;
 import com.google.common.collect.Lists;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
@@ -61,26 +62,6 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
     }
 
     @Test
-    public void testAll() throws InterruptedException {
-        LOGGER.info("----------start Cancel test----------");
-        //testCancel();
-
-        LOGGER.info("----------start Finish test----------");
-        //testFinish();
-
-        LOGGER.info("----------start Delay test----------");
-        // This test will error while we have more and more test case.
-        //testDelay();
-        //testDelay();
-
-        LOGGER.info("----------start ThrowException test----------");
-        //testThrowException();
-
-        LOGGER.info("----------start CriticalCallTime test----------");
-        //testCriticalCallTime();
-
-    }
-
     public void testCancel() {
         TaskExecutionService taskExecutionService = 
server.getTaskExecutionService();
 
@@ -99,6 +80,7 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
             .untilAsserted(() -> assertEquals(CANCELED, 
completableFuture.get().getExecutionState()));
     }
 
+    @Test
     public void testFinish() {
         TaskExecutionService taskExecutionService = 
server.getTaskExecutionService();
 
@@ -122,6 +104,7 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
     /**
      * Test task execution time is the same as the timer timeout
      */
+    @Test
     public void testCriticalCallTime() throws InterruptedException {
         AtomicBoolean stopMark = new AtomicBoolean(false);
         CopyOnWriteArrayList<Long> stopTime = new CopyOnWriteArrayList<>();
@@ -153,6 +136,7 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
 
     }
 
+    @Test
     public void testThrowException() throws InterruptedException {
         TaskExecutionService taskExecutionService = 
server.getTaskExecutionService();
 
@@ -203,6 +187,7 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
             .untilAsserted(() -> assertEquals(FINISHED, 
taskCts.get().getExecutionState()));
     }
 
+    @RepeatedTest(2)
     public void testDelay() throws InterruptedException {
 
         long lowLagSleep = 10;


Reply via email to