This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new bc28eb1747 [Improve][Zeta] Move checkpoint notify complete in 
checkpoint stage (#5185)
bc28eb1747 is described below

commit bc28eb1747d880babe7e04b8b399caf20da8f74b
Author: Jia Fan <[email protected]>
AuthorDate: Mon Jul 31 14:50:38 2023 +0800

    [Improve][Zeta] Move checkpoint notify complete in checkpoint stage (#5185)
---
 .../engine/server/checkpoint/CheckpointCoordinator.java  | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 0f9e03df34..e6b3c3d27a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -695,14 +695,6 @@ public class CheckpointCoordinator {
                 completedCheckpoint.getCheckpointTimestamp(),
                 completedCheckpoint.getCompletedTimestamp());
         final long checkpointId = completedCheckpoint.getCheckpointId();
-        pendingCheckpoints.remove(checkpointId);
-        pendingCounter.decrementAndGet();
-        if (pendingCheckpoints.size() + 1 == 
coordinatorConfig.getMaxConcurrentCheckpoints()) {
-            // latest checkpoint completed time > checkpoint interval
-            if (notFinalCheckpoint(completedCheckpoint.getCheckpointType())) {
-                scheduleTriggerPendingCheckpoint(0L);
-            }
-        }
         completedCheckpoints.addLast(completedCheckpoint);
         try {
             byte[] states = serializer.serialize(completedCheckpoint);
@@ -742,6 +734,14 @@ public class CheckpointCoordinator {
                 completedCheckpoint.getJobId());
         latestCompletedCheckpoint = completedCheckpoint;
         notifyCompleted(completedCheckpoint);
+        pendingCheckpoints.remove(checkpointId);
+        pendingCounter.decrementAndGet();
+        if (pendingCheckpoints.size() + 1 == 
coordinatorConfig.getMaxConcurrentCheckpoints()) {
+            // latest checkpoint completed time > checkpoint interval
+            if (notFinalCheckpoint(completedCheckpoint.getCheckpointType())) {
+                scheduleTriggerPendingCheckpoint(0L);
+            }
+        }
         if (isCompleted()) {
             
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
             if 
(latestCompletedCheckpoint.getCheckpointType().equals(SAVEPOINT_TYPE)) {

Reply via email to