This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d2b1bf419 [Hotfix][Zeta] The pipeline needs to wait for the 
CheckpointCoordinator to end (#4272)
d2b1bf419 is described below

commit d2b1bf4193d1a54ad8a65c48a1c161b2acf08942
Author: Eric <[email protected]>
AuthorDate: Mon Mar 6 14:23:18 2023 +0800

    [Hotfix][Zeta] The pipeline needs to wait for the CheckpointCoordinator to 
end (#4272)
---
 .../server/checkpoint/CheckpointCoordinator.java   | 33 ++++++++++++-
 .../checkpoint/CheckpointCoordinatorState.java     | 34 +++++++++++++
 .../checkpoint/CheckpointCoordinatorStatus.java    | 26 ++++++++++
 .../server/checkpoint/CheckpointManager.java       | 39 +++++++++------
 .../operation/NotifyTaskStartOperation.java        |  4 +-
 .../engine/server/dag/physical/PhysicalPlan.java   | 15 ------
 .../engine/server/dag/physical/SubPlan.java        | 55 ++++++++++++++++------
 .../seatunnel/engine/server/master/JobMaster.java  |  4 +-
 .../engine/server/master/JobMasterTest.java        |  5 +-
 9 files changed, 162 insertions(+), 53 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index f7e8626c4..260f09582 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -126,6 +126,8 @@ public class CheckpointCoordinator {
 
     private final ExecutorService executorService;
 
+    private CompletableFuture<CheckpointCoordinatorState> 
checkpointCoordinatorFuture;
+
     @SneakyThrows
     public CheckpointCoordinator(
             CheckpointManager manager,
@@ -168,6 +170,7 @@ public class CheckpointCoordinator {
             this.latestCompletedCheckpoint =
                     serializer.deserialize(pipelineState.getStates(), 
CompletedCheckpoint.class);
         }
+        this.checkpointCoordinatorFuture = new CompletableFuture();
     }
 
     public int getPipelineId() {
@@ -210,8 +213,13 @@ public class CheckpointCoordinator {
     }
 
     private void handleCoordinatorError(CheckpointCloseReason reason, 
Throwable e) {
+        CheckpointException checkpointException = new 
CheckpointException(reason, e);
         cleanPendingCheckpoint(reason);
-        checkpointManager.handleCheckpointError(pipelineId, new 
CheckpointException(reason, e));
+        checkpointCoordinatorFuture.complete(
+                new CheckpointCoordinatorState(
+                        CheckpointCoordinatorStatus.FAILED,
+                        ExceptionUtils.getMessage(checkpointException)));
+        checkpointManager.handleCheckpointError(pipelineId);
     }
 
     private void restoreTaskState(TaskLocation taskLocation) {
@@ -282,6 +290,7 @@ public class CheckpointCoordinator {
 
     protected void restoreCoordinator(boolean alreadyStarted) {
         LOG.info("received restore CheckpointCoordinator with alreadyStarted= 
" + alreadyStarted);
+        checkpointCoordinatorFuture = new CompletableFuture<>();
         
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
         shutdown = false;
         if (alreadyStarted) {
@@ -653,6 +662,8 @@ public class CheckpointCoordinator {
         latestCompletedCheckpoint = completedCheckpoint;
         if (isCompleted()) {
             
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
+            checkpointCoordinatorFuture.complete(
+                    new 
CheckpointCoordinatorState(CheckpointCoordinatorStatus.FINISHED, null));
         }
     }
 
@@ -669,7 +680,8 @@ public class CheckpointCoordinator {
         if (latestCompletedCheckpoint == null) {
             return false;
         }
-        return latestCompletedCheckpoint.getCheckpointType() == 
COMPLETED_POINT_TYPE;
+        return latestCompletedCheckpoint.getCheckpointType() == 
COMPLETED_POINT_TYPE
+                || latestCompletedCheckpoint.getCheckpointType() == 
SAVEPOINT_TYPE;
     }
 
     public boolean isEndOfSavePoint() {
@@ -678,4 +690,21 @@ public class CheckpointCoordinator {
         }
         return latestCompletedCheckpoint.getCheckpointType() == SAVEPOINT_TYPE;
     }
+
+    public PassiveCompletableFuture<CheckpointCoordinatorState>
+            waitCheckpointCoordinatorComplete() {
+        return new PassiveCompletableFuture<>(checkpointCoordinatorFuture);
+    }
+
+    public PassiveCompletableFuture<CheckpointCoordinatorState> 
cancelCheckpoint() {
+        // checkpoint maybe already failed before all tasks complete.
+        if (checkpointCoordinatorFuture.isDone()) {
+            return new PassiveCompletableFuture<>(checkpointCoordinatorFuture);
+        }
+        cleanPendingCheckpoint(CheckpointCloseReason.PIPELINE_END);
+        CheckpointCoordinatorState checkpointCoordinatorState =
+                new 
CheckpointCoordinatorState(CheckpointCoordinatorStatus.CANCELED, null);
+        checkpointCoordinatorFuture.complete(checkpointCoordinatorState);
+        return new PassiveCompletableFuture<>(checkpointCoordinatorFuture);
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorState.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorState.java
new file mode 100644
index 000000000..f417896df
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorState.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import lombok.Getter;
+
+@Getter
+public class CheckpointCoordinatorState {
+
+    private final CheckpointCoordinatorStatus checkpointCoordinatorStatus;
+
+    private final String throwableMsg;
+
+    public CheckpointCoordinatorState(
+            CheckpointCoordinatorStatus checkpointCoordinatorStatus, String 
throwableMsg) {
+        this.checkpointCoordinatorStatus = checkpointCoordinatorStatus;
+        this.throwableMsg = throwableMsg;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorStatus.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorStatus.java
new file mode 100644
index 000000000..86abfea9c
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorStatus.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+public enum CheckpointCoordinatorStatus {
+    FINISHED,
+
+    CANCELED,
+
+    FAILED;
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 3c21e8e4f..3e18a458e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -171,8 +171,8 @@ public class CheckpointManager {
         
getCheckpointCoordinator(pipelineId).restoreCoordinator(alreadyStarted);
     }
 
-    protected void handleCheckpointError(int pipelineId, Throwable e) {
-        jobMaster.handleCheckpointError(pipelineId, e);
+    protected void handleCheckpointError(int pipelineId) {
+        jobMaster.handleCheckpointError(pipelineId);
     }
 
     private CheckpointCoordinator getCheckpointCoordinator(TaskLocation 
taskLocation) {
@@ -211,18 +211,6 @@ public class CheckpointManager {
         getCheckpointCoordinator(taskLocation).readyToClose(taskLocation);
     }
 
-    /**
-     * Called by the JobMaster. <br>
-     * Listen to the {@link PipelineStatus} of the {@link SubPlan}, which is 
used to cancel the
-     * running {@link PendingCheckpoint} when the SubPlan is abnormal.
-     */
-    public CompletableFuture<Void> listenPipelineRetry(
-            int pipelineId, PipelineStatus pipelineStatus) {
-        getCheckpointCoordinator(pipelineId)
-                .cleanPendingCheckpoint(CheckpointCloseReason.PIPELINE_END);
-        return CompletableFuture.completedFuture(null);
-    }
-
     /**
      * Called by the JobMaster. <br>
      * Listen to the {@link PipelineStatus} of the {@link Pipeline}, which is 
used to shut down the
@@ -287,4 +275,27 @@ public class CheckpointManager {
                 jobMaster.queryTaskGroupAddress(
                         
operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
     }
+
+    /**
+     * Call By JobMaster If all the tasks canceled or some task failed, 
JobMaster will call this
+     * method to cancel checkpoint coordinator.
+     *
+     * @param pipelineId
+     * @return
+     */
+    public PassiveCompletableFuture<CheckpointCoordinatorState> 
cancelCheckpoint(int pipelineId) {
+        return getCheckpointCoordinator(pipelineId).cancelCheckpoint();
+    }
+
+    /**
+     * Call By JobMaster If all the tasks is finished, JobMaster will call 
this method to wait
+     * checkpoint coordinator complete.
+     *
+     * @param pipelineId
+     * @return
+     */
+    public PassiveCompletableFuture<CheckpointCoordinatorState> 
waitCheckpointCoordinatorComplete(
+            int pipelineId) {
+        return 
getCheckpointCoordinator(pipelineId).waitCheckpointCoordinatorComplete();
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
index b75d4813f..62224aea4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
@@ -59,7 +59,9 @@ public class NotifyTaskStartOperation extends TaskOperation {
                         true,
                         exception ->
                                 exception instanceof 
TaskGroupContextNotFoundException
-                                        && 
!server.taskIsEnded(taskLocation.getTaskGroupLocation()),
+                                        || exception instanceof 
NullPointerException
+                                                && !server.taskIsEnded(
+                                                        
taskLocation.getTaskGroupLocation()),
                         Constant.OPERATION_RETRY_SLEEP));
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index a0b4fe066..f8d59188b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -139,21 +139,6 @@ public class PhysicalPlan {
         future.thenAcceptAsync(
                 pipelineState -> {
                     try {
-                        // Notify checkpoint manager when the pipeline end, 
Whether the pipeline
-                        // will be restarted or not
-                        LOGGER.info(
-                                String.format(
-                                        "received pipeline %s callback, state 
%s",
-                                        subPlan.getPipelineFullName(),
-                                        pipelineState.getPipelineStatus()));
-                        if (jobMaster.getCheckpointManager() != null) {
-                            jobMaster
-                                    .getCheckpointManager()
-                                    .listenPipelineRetry(
-                                            
subPlan.getPipelineLocation().getPipelineId(),
-                                            pipelineState.getPipelineStatus())
-                                    .join();
-                        }
                         if 
(PipelineStatus.CANCELED.equals(pipelineState.getPipelineStatus())) {
                             if (subPlan.canRestorePipeline()) {
                                 subPlan.restorePipeline();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 9e8d6adb8..cc68d8629 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -22,6 +22,8 @@ import 
org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinatorState;
+import 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinatorStatus;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.master.JobMaster;
@@ -92,8 +94,6 @@ public class SubPlan {
 
     private final Object restoreLock = new Object();
 
-    private Throwable checkpointThrowable;
-
     public SubPlan(
             int pipelineId,
             int totalPipelineNum,
@@ -179,17 +179,47 @@ public class SubPlan {
                             PipelineStatus pipelineStatus = null;
                             if (failedTaskNum.get() > 0) {
                                 pipelineStatus = PipelineStatus.FAILED;
+                                // we don't care the checkpoint error reason 
when the task is
+                                // failed.
+                                jobMaster
+                                        .getCheckpointManager()
+                                        .cancelCheckpoint(getPipelineId())
+                                        .join();
                             } else if (canceledTaskNum.get() > 0) {
-                                if (checkpointThrowable != null) {
+                                pipelineStatus = PipelineStatus.CANCELED;
+                                CheckpointCoordinatorState 
checkpointCoordinatorState =
+                                        jobMaster
+                                                .getCheckpointManager()
+                                                
.cancelCheckpoint(getPipelineId())
+                                                .join();
+                                if (CheckpointCoordinatorStatus.FAILED.equals(
+                                        checkpointCoordinatorState
+                                                
.getCheckpointCoordinatorStatus())) {
                                     pipelineStatus = PipelineStatus.FAILED;
                                     errorByPhysicalVertex.set(
-                                            
ExceptionUtils.getMessage(checkpointThrowable));
-                                    checkpointThrowable = null;
-                                } else {
-                                    pipelineStatus = PipelineStatus.CANCELED;
+                                            
checkpointCoordinatorState.getThrowableMsg());
                                 }
                             } else {
                                 pipelineStatus = PipelineStatus.FINISHED;
+                                CheckpointCoordinatorState 
checkpointCoordinatorState =
+                                        jobMaster
+                                                .getCheckpointManager()
+                                                
.waitCheckpointCoordinatorComplete(getPipelineId())
+                                                .join();
+
+                                if (CheckpointCoordinatorStatus.FAILED.equals(
+                                        checkpointCoordinatorState
+                                                
.getCheckpointCoordinatorStatus())) {
+                                    pipelineStatus = PipelineStatus.FAILED;
+                                    errorByPhysicalVertex.set(
+                                            
checkpointCoordinatorState.getThrowableMsg());
+                                } else if 
(CheckpointCoordinatorStatus.CANCELED.equals(
+                                        checkpointCoordinatorState
+                                                
.getCheckpointCoordinatorStatus())) {
+                                    pipelineStatus = PipelineStatus.CANCELED;
+                                    errorByPhysicalVertex.set(
+                                            
checkpointCoordinatorState.getThrowableMsg());
+                                }
                             }
 
                             if (!checkNeedRestore(pipelineStatus)) {
@@ -339,10 +369,7 @@ public class SubPlan {
 
     private void cancelCheckpointCoordinator() {
         if (jobMaster.getCheckpointManager() != null) {
-            jobMaster
-                    .getCheckpointManager()
-                    .listenPipelineRetry(pipelineId, PipelineStatus.CANCELING)
-                    .join();
+            
jobMaster.getCheckpointManager().cancelCheckpoint(pipelineId).join();
         }
     }
 
@@ -524,12 +551,10 @@ public class SubPlan {
         return pipelineRestoreNum;
     }
 
-    public void handleCheckpointError(Throwable e) {
+    public void handleCheckpointError() {
         LOGGER.warning(
                 String.format(
-                        "%s checkpoint have error, cancel the pipeline", 
getPipelineFullName()),
-                e);
-        this.checkpointThrowable = e;
+                        "%s checkpoint have error, cancel the pipeline", 
getPipelineFullName()));
         this.cancelPipeline();
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 302b2bca6..0b840e22c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -319,13 +319,13 @@ public class JobMaster {
         }
     }
 
-    public void handleCheckpointError(long pipelineId, Throwable e) {
+    public void handleCheckpointError(long pipelineId) {
         this.physicalPlan
                 .getPipelineList()
                 .forEach(
                         pipeline -> {
                             if (pipeline.getPipelineLocation().getPipelineId() 
== pipelineId) {
-                                pipeline.handleCheckpointError(e);
+                                pipeline.handleCheckpointError();
                             }
                         });
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index 655c8e8a6..7e25f0c1c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -27,8 +27,6 @@ import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.TestUtils;
-import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason;
-import org.apache.seatunnel.engine.server.checkpoint.CheckpointException;
 import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -136,8 +134,7 @@ public class JobMasterTest extends 
AbstractSeaTunnelServerTest {
                         () -> Assertions.assertEquals(JobStatus.RUNNING, 
jobMaster.getJobStatus()));
 
         // call checkpoint timeout
-        jobMaster.handleCheckpointError(
-                1, new 
CheckpointException(CheckpointCloseReason.CHECKPOINT_EXPIRED));
+        jobMaster.handleCheckpointError(1);
 
         // Because handleCheckpointTimeout is an async method, so we need 
sleep 5s to waiting job
         // status become running again

Reply via email to