This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7519817630 [Improve][Zeta] Improve CheckpointCoordinator notify 
complete when restore (#5136)
7519817630 is described below

commit 75198176306fd7184d308da01a0ee33a3ef70c64
Author: Jia Fan <[email protected]>
AuthorDate: Mon Jul 24 10:52:22 2023 +0800

    [Improve][Zeta] Improve CheckpointCoordinator notify complete when restore 
(#5136)
    
    * [Improve] Improve CheckpointCoordinator notify complete when restore
    
    * update
---
 .../server/checkpoint/CheckpointCloseReason.java   |  3 ++-
 .../server/checkpoint/CheckpointCoordinator.java   | 25 ++++++++++++++++++----
 .../server/checkpoint/CheckpointManager.java       |  4 ++--
 .../seatunnel/engine/server/master/JobMaster.java  |  5 ++++-
 .../engine/server/master/JobMasterTest.java        |  2 +-
 5 files changed, 30 insertions(+), 9 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
index ae1af4d41f..9f35f62fd6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
@@ -26,7 +26,8 @@ public enum CheckpointCloseReason {
     CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
     CHECKPOINT_INSIDE_ERROR("CheckpointCoordinator inside have error."),
     AGGREGATE_COMMIT_ERROR("Aggregate commit error."),
-    TASK_NOT_ALL_READY_WHEN_SAVEPOINT("Task not all ready, savepoint error");
+    TASK_NOT_ALL_READY_WHEN_SAVEPOINT("Task not all ready, savepoint error"),
+    CHECKPOINT_NOTIFY_COMPLETE_FAILED("Checkpoint notify complete failed");
 
     private final String message;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 9e0ef2a53a..9bf0e77069 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -272,7 +272,8 @@ public class CheckpointCoordinator {
         checkpointCoordinatorFuture.complete(
                 new CheckpointCoordinatorState(
                         CheckpointCoordinatorStatus.FAILED, 
errorByPhysicalVertex.get()));
-        checkpointManager.handleCheckpointError(pipelineId);
+        checkpointManager.handleCheckpointError(
+                pipelineId, 
reason.equals(CheckpointCloseReason.CHECKPOINT_NOTIFY_COMPLETE_FAILED));
     }
 
     private void restoreTaskState(TaskLocation taskLocation) {
@@ -316,9 +317,26 @@ public class CheckpointCoordinator {
         isAllTaskReady = true;
         InvocationFuture<?>[] futures = notifyTaskStart();
         CompletableFuture.allOf(futures).join();
+        notifyCompleted(latestCompletedCheckpoint);
         
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
     }
 
+    private void notifyCompleted(CompletedCheckpoint completedCheckpoint) {
+        if (completedCheckpoint != null) {
+            try {
+                LOG.info("start notify checkpoint completed, checkpoint:{}", 
completedCheckpoint);
+                InvocationFuture<?>[] invocationFutures =
+                        
notifyCheckpointCompleted(completedCheckpoint.getCheckpointId());
+                CompletableFuture.allOf(invocationFutures).join();
+            } catch (Throwable e) {
+                handleCoordinatorError(
+                        "notify checkpoint completed failed",
+                        e,
+                        
CheckpointCloseReason.CHECKPOINT_NOTIFY_COMPLETE_FAILED);
+            }
+        }
+    }
+
     public InvocationFuture<?>[] notifyTaskStart() {
         return plan.getPipelineSubtasks().stream()
                 .map(NotifyTaskStartOperation::new)
@@ -358,6 +376,7 @@ public class CheckpointCoordinator {
         shutdown = false;
         if (alreadyStarted) {
             isAllTaskReady = true;
+            notifyCompleted(latestCompletedCheckpoint);
             tryTriggerPendingCheckpoint(CHECKPOINT_TYPE);
         } else {
             isAllTaskReady = false;
@@ -719,10 +738,8 @@ public class CheckpointCoordinator {
                 completedCheckpoint.getCheckpointId(),
                 completedCheckpoint.getPipelineId(),
                 completedCheckpoint.getJobId());
-        InvocationFuture<?>[] invocationFutures = 
notifyCheckpointCompleted(checkpointId);
-        CompletableFuture.allOf(invocationFutures).join();
-        // TODO: notifyCheckpointCompleted fail
         latestCompletedCheckpoint = completedCheckpoint;
+        notifyCompleted(completedCheckpoint);
         if (isCompleted()) {
             
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
             if 
(latestCompletedCheckpoint.getCheckpointType().equals(SAVEPOINT_TYPE)) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index f34ae2f6a0..0c5a91698e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -174,8 +174,8 @@ public class CheckpointManager {
         
getCheckpointCoordinator(pipelineId).restoreCoordinator(alreadyStarted);
     }
 
-    protected void handleCheckpointError(int pipelineId) {
-        jobMaster.handleCheckpointError(pipelineId);
+    protected void handleCheckpointError(int pipelineId, boolean neverRestore) 
{
+        jobMaster.handleCheckpointError(pipelineId, neverRestore);
     }
 
     private CheckpointCoordinator getCheckpointCoordinator(TaskLocation 
taskLocation) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 13b89a69dd..11cc5f21b0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -325,7 +325,10 @@ public class JobMaster {
         }
     }
 
-    public void handleCheckpointError(long pipelineId) {
+    public void handleCheckpointError(long pipelineId, boolean neverRestore) {
+        if (neverRestore) {
+            this.neverNeedRestore();
+        }
         this.physicalPlan
                 .getPipelineList()
                 .forEach(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index efdc9e0f71..bb331bd018 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -139,7 +139,7 @@ public class JobMasterTest extends 
AbstractSeaTunnelServerTest {
 
         jobMaster.neverNeedRestore();
         // call checkpoint timeout
-        jobMaster.handleCheckpointError(1);
+        jobMaster.handleCheckpointError(1, false);
 
         PassiveCompletableFuture<JobResult> jobMasterCompleteFuture =
                 jobMaster.getJobMasterCompleteFuture();

Reply via email to