This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 47becb96f [Bug] [Checkpoint] Fix Checkpoint Continue Trigger After Job 
CANCELED. (#3808)
47becb96f is described below

commit 47becb96fa9fc8b4a5def6fc3a6db2ce34ea83df
Author: Hisoka <[email protected]>
AuthorDate: Wed Dec 28 15:18:46 2022 +0800

    [Bug] [Checkpoint] Fix Checkpoint Continue Trigger After Job CANCELED. 
(#3808)
    
    * [Bug] [Checkpoint] Fix Checkpoint Continue Trigger After Job CANCELED.
---
 .../engine/server/CoordinatorService.java          |  1 -
 ...ilureReason.java => CheckpointCloseReason.java} |  5 +-
 .../server/checkpoint/CheckpointCoordinator.java   | 60 ++++++++++++++--------
 .../server/checkpoint/CheckpointException.java     | 22 ++++----
 .../server/checkpoint/CheckpointManager.java       |  7 ++-
 .../server/checkpoint/PendingCheckpoint.java       | 10 ++--
 .../engine/server/dag/physical/PhysicalVertex.java |  1 +
 .../engine/server/dag/physical/SubPlan.java        |  3 +-
 .../engine/server/master/JobMasterTest.java        |  4 +-
 9 files changed, 68 insertions(+), 45 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 1f7263e1d..086fa6783 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -263,7 +263,6 @@ public class CoordinatorService {
                     onJobDone(jobMaster, jobId);
                 }
             });
-            return;
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
similarity index 90%
rename from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
rename to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
index ccd121f74..919500ca4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
@@ -17,17 +17,18 @@
 
 package org.apache.seatunnel.engine.server.checkpoint;
 
-public enum CheckpointFailureReason {
+public enum CheckpointCloseReason {
 
     PIPELINE_END("Pipeline turn to end state."),
     CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
     CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
     CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
+    CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
     CHECKPOINT_INSIDE_ERROR("CheckpointCoordinator inside have error.");
 
     private final String message;
 
-    CheckpointFailureReason(String message) {
+    CheckpointCloseReason(String message) {
         this.message = message;
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index abdde3482..8d275b657 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -46,7 +46,6 @@ import 
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.Getter;
-import lombok.Setter;
 import lombok.SneakyThrows;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -123,8 +122,6 @@ public class CheckpointCoordinator {
      * Flag marking the coordinator as shut down (not accepting any messages 
anymore).
      */
     private volatile boolean shutdown;
-
-    @Setter
     private volatile boolean isAllTaskReady = false;
 
     @SneakyThrows
@@ -184,17 +181,17 @@ public class CheckpointCoordinator {
                     break;
             }
         }).exceptionally(error -> {
-            handleCoordinatorError("task running failed", error, 
CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR);
+            handleCoordinatorError("task running failed", error, 
CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
             return null;
         });
     }
 
-    private void handleCoordinatorError(String message, Throwable e, 
CheckpointFailureReason reason) {
+    private void handleCoordinatorError(String message, Throwable e, 
CheckpointCloseReason reason) {
         LOG.error(message, e);
         handleCoordinatorError(reason);
     }
 
-    private void handleCoordinatorError(CheckpointFailureReason reason) {
+    private void handleCoordinatorError(CheckpointCloseReason reason) {
         cleanPendingCheckpoint(reason);
         checkpointManager.handleCheckpointError(pipelineId, new 
CheckpointException(reason));
     }
@@ -259,10 +256,26 @@ public class CheckpointCoordinator {
         }
     }
 
+    protected void restoreCoordinator(boolean alreadyStarted) {
+        
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
+        shutdown = false;
+        if (alreadyStarted) {
+            tryTriggerPendingCheckpoint();
+            isAllTaskReady = true;
+        } else {
+            isAllTaskReady = false;
+        }
+    }
+
     protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
         synchronized (lock) {
+            if (Thread.currentThread().isInterrupted()) {
+                LOG.warn("currentThread already be interrupted, skip trigger 
checkpoint");
+                return;
+            }
             if (isCompleted() || isShutdown()) {
-                LOG.warn(String.format("can't trigger checkpoint with type: 
%s, because checkpoint coordinator already have completed checkpoint", 
checkpointType));
+                LOG.warn(String.format("can't trigger checkpoint with type: 
%s, because checkpoint coordinator already have last completed checkpoint: (%s) 
or shutdown (%b).",
+                    checkpointType, latestCompletedCheckpoint != null ? 
latestCompletedCheckpoint.getCheckpointType() : "null", shutdown));
                 return;
             }
             final long currentTimestamp = Instant.now().toEpochMilli();
@@ -322,18 +335,20 @@ public class CheckpointCoordinator {
     }
 
     private void 
startTriggerPendingCheckpoint(CompletableFuture<PendingCheckpoint> 
pendingCompletableFuture) {
-        pendingCompletableFuture.thenAcceptAsync(pendingCheckpoint -> {
+        pendingCompletableFuture.thenAccept(pendingCheckpoint -> {
             LOG.info("wait checkpoint completed: " + 
pendingCheckpoint.getCheckpointId());
             PassiveCompletableFuture<CompletedCheckpoint> completableFuture = 
pendingCheckpoint.getCompletableFuture();
             completableFuture.whenComplete((completedCheckpoint, error) -> {
                 if (error != null) {
-                    handleCoordinatorError("trigger checkpoint failed", error, 
CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR);
-                } else {
+                    handleCoordinatorError("trigger checkpoint failed", error, 
CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
+                } else if (completedCheckpoint != null) {
                     try {
                         completePendingCheckpoint(completedCheckpoint);
                     } catch (Throwable e) {
-                        handleCoordinatorError("complete checkpoint failed", 
e, CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR);
+                        handleCoordinatorError("complete checkpoint failed", 
e, CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
                     }
+                } else {
+                    LOG.info("skip this checkpoint cause by 
completedCheckpoint is null");
                 }
             });
 
@@ -359,7 +374,7 @@ public class CheckpointCoordinator {
                     // If any task is not acked within the checkpoint timeout
                     if 
(pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null && 
!pendingCheckpoint.isFullyAcknowledged()) {
                         if (tolerableFailureCheckpoints-- <= 0) {
-                            
handleCoordinatorError(CheckpointFailureReason.CHECKPOINT_EXPIRED);
+                            
handleCoordinatorError(CheckpointCloseReason.CHECKPOINT_EXPIRED);
                         }
                     }
                 }, coordinatorConfig.getCheckpointTimeout(),
@@ -438,13 +453,18 @@ public class CheckpointCoordinator {
             .toArray(InvocationFuture[]::new);
     }
 
-    protected void cleanPendingCheckpoint(CheckpointFailureReason 
failureReason) {
+    protected void cleanPendingCheckpoint(CheckpointCloseReason closedReason) {
+        shutdown = true;
+        isAllTaskReady = false;
         synchronized (lock) {
-            pendingCheckpoints.values().forEach(pendingCheckpoint ->
-                pendingCheckpoint.abortCheckpoint(failureReason, null)
-            );
-            // TODO: clear related future & scheduler task
-            pendingCheckpoints.clear();
+            LOG.info("start clean pending checkpoint cause {}", 
closedReason.message());
+            if (!pendingCheckpoints.isEmpty()) {
+                pendingCheckpoints.values().forEach(pendingCheckpoint ->
+                    pendingCheckpoint.abortCheckpoint(closedReason, null)
+                );
+                // TODO: clear related future & scheduler task
+                pendingCheckpoints.clear();
+            }
             pendingCounter.set(0);
             scheduler.shutdownNow();
             scheduler = Executors.newScheduledThreadPool(
@@ -454,7 +474,6 @@ public class CheckpointCoordinator {
                     
thread.setName(String.format("checkpoint-coordinator-%s/%s", pipelineId, 
jobId));
                     return thread;
                 });
-            isAllTaskReady = false;
         }
     }
 
@@ -527,8 +546,7 @@ public class CheckpointCoordinator {
         // TODO: notifyCheckpointCompleted fail
         latestCompletedCheckpoint = completedCheckpoint;
         if (isCompleted()) {
-            shutdown = true;
-            
cleanPendingCheckpoint(CheckpointFailureReason.CHECKPOINT_COORDINATOR_COMPLETED);
+            
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointException.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointException.java
index 32832f900..a29e21df9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointException.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointException.java
@@ -25,30 +25,30 @@ public class CheckpointException extends Exception {
 
     private static final long serialVersionUID = 3257526119022486948L;
 
-    private final CheckpointFailureReason checkpointFailureReason;
+    private final CheckpointCloseReason checkpointCloseReason;
 
-    public CheckpointException(CheckpointFailureReason failureReason) {
+    public CheckpointException(CheckpointCloseReason failureReason) {
         super(failureReason.message());
-        this.checkpointFailureReason = checkNotNull(failureReason);
+        this.checkpointCloseReason = checkNotNull(failureReason);
     }
 
-    public CheckpointException(String message, CheckpointFailureReason 
failureReason) {
+    public CheckpointException(String message, CheckpointCloseReason 
failureReason) {
         super(message + " Failure reason: " + failureReason.message());
-        this.checkpointFailureReason = checkNotNull(failureReason);
+        this.checkpointCloseReason = checkNotNull(failureReason);
     }
 
-    public CheckpointException(CheckpointFailureReason failureReason, 
Throwable cause) {
+    public CheckpointException(CheckpointCloseReason failureReason, Throwable 
cause) {
         super(failureReason.message(), cause);
-        this.checkpointFailureReason = checkNotNull(failureReason);
+        this.checkpointCloseReason = checkNotNull(failureReason);
     }
 
     public CheckpointException(
-            String message, CheckpointFailureReason failureReason, Throwable 
cause) {
+        String message, CheckpointCloseReason failureReason, Throwable cause) {
         super(message + " Failure reason: " + failureReason.message(), cause);
-        this.checkpointFailureReason = checkNotNull(failureReason);
+        this.checkpointCloseReason = checkNotNull(failureReason);
     }
 
-    public CheckpointFailureReason getCheckpointFailureReason() {
-        return checkpointFailureReason;
+    public CheckpointCloseReason getCheckpointFailureReason() {
+        return checkpointCloseReason;
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index ab42cf79b..7cc6d13b8 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -137,9 +137,8 @@ public class CheckpointManager {
         return getCheckpointCoordinator(pipelineId).startSavepoint();
     }
 
-    public void reportedPipelineRunning(int pipelineId) {
-        getCheckpointCoordinator(pipelineId).setAllTaskReady(true);
-        getCheckpointCoordinator(pipelineId).tryTriggerPendingCheckpoint();
+    public void reportedPipelineRunning(int pipelineId, boolean 
alreadyStarted) {
+        
getCheckpointCoordinator(pipelineId).restoreCoordinator(alreadyStarted);
     }
 
     protected void handleCheckpointError(int pipelineId, Throwable e) {
@@ -182,7 +181,7 @@ public class CheckpointManager {
      * <br> Listen to the {@link PipelineStatus} of the {@link SubPlan}, which 
is used to cancel the running {@link PendingCheckpoint} when the SubPlan is 
abnormal.
      */
     public CompletableFuture<Void> listenPipelineRetry(int pipelineId, 
PipelineStatus pipelineStatus) {
-        
getCheckpointCoordinator(pipelineId).cleanPendingCheckpoint(CheckpointFailureReason.PIPELINE_END);
+        
getCheckpointCoordinator(pipelineId).cleanPendingCheckpoint(CheckpointCloseReason.PIPELINE_END);
         return CompletableFuture.completedFuture(null);
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
index 3fff60ad9..23ad0aa59 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
@@ -157,9 +157,13 @@ public class PendingCheckpoint implements Checkpoint {
             taskStatistics);
     }
 
-    public void abortCheckpoint(CheckpointFailureReason failureReason,
+    public void abortCheckpoint(CheckpointCloseReason closedReason,
                                 @Nullable Throwable cause) {
-        this.failureCause = new CheckpointException(failureReason, cause);
-        completableFuture.completeExceptionally(failureCause);
+        if 
(closedReason.equals(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET)) {
+            completableFuture.complete(null);
+        } else {
+            this.failureCause = new CheckpointException(closedReason, cause);
+            completableFuture.completeExceptionally(failureCause);
+        }
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 87143874f..49744bbd3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -415,6 +415,7 @@ public class PhysicalVertex {
                 }
             }
         }
+        this.taskFuture.complete(new TaskExecutionState(taskGroupLocation, 
ExecutionState.CANCELED, null));
     }
 
     private void updateStateTimestamps(@NonNull ExecutionState targetState) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 5bf859158..b1fc23b3b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -338,6 +338,7 @@ public class SubPlan {
                     forcePipelineFinish();
                     return;
                 }
+                
jobMaster.getCheckpointManager().reportedPipelineRunning(pipelineId, false);
                 reSchedulerPipelineFuture = 
jobMaster.reSchedulerPipeline(this);
                 if (reSchedulerPipelineFuture != null) {
                     reSchedulerPipelineFuture.join();
@@ -371,7 +372,7 @@ public class SubPlan {
         } else if (PipelineStatus.CANCELING.equals(getPipelineState())) {
             cancelPipelineTasks();
         } else if (PipelineStatus.RUNNING.equals(getPipelineState())) {
-            
jobMaster.getCheckpointManager().reportedPipelineRunning(this.getPipelineLocation().getPipelineId());
+            
jobMaster.getCheckpointManager().reportedPipelineRunning(this.getPipelineLocation().getPipelineId(),
 true);
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index a7ba48ce6..755d6e4b8 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -28,8 +28,8 @@ import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.TestUtils;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointException;
-import org.apache.seatunnel.engine.server.checkpoint.CheckpointFailureReason;
 import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -123,7 +123,7 @@ public class JobMasterTest extends 
AbstractSeaTunnelServerTest {
             .untilAsserted(() -> Assertions.assertEquals(JobStatus.RUNNING, 
jobMaster.getJobStatus()));
 
         // call checkpoint timeout
-        jobMaster.handleCheckpointError(1, new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
+        jobMaster.handleCheckpointError(1, new 
CheckpointException(CheckpointCloseReason.CHECKPOINT_EXPIRED));
 
         // Because handleCheckpointTimeout is an async method, so we need 
sleep 5s to waiting job status become running again
         Thread.sleep(5000);

Reply via email to