This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 37838165a8 [Fix][Zeta] Fix losing checkpoint scheduling in extreme 
cases (#9246)
37838165a8 is described below

commit 37838165a8db066bf9f4b37baf3f0a5563788cc8
Author: Jia Fan <[email protected]>
AuthorDate: Wed May 14 01:28:10 2025 +0800

    [Fix][Zeta] Fix losing checkpoint scheduling in extreme cases (#9246)
---
 .../seatunnel/engine/e2e/CheckpointEnableIT.java   |  3 +-
 .../server/checkpoint/CheckpointCoordinator.java   | 53 ++++++++++++++--------
 .../checkpoint/CheckpointCoordinatorTest.java      | 52 +++++++++++++++++++++
 3 files changed, 88 insertions(+), 20 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
index 661da1b7cd..fff0646bcb 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
@@ -186,7 +186,8 @@ public class CheckpointEnableIT extends TestSuiteBase {
 
         // check sink file is right
         AtomicReference<Boolean> checkSinkFile = new AtomicReference<>(false);
-        await().atMost(300000, TimeUnit.MILLISECONDS)
+        // the default checkpoint interval is 300s, so we need to wait for 
300+60s
+        await().atMost(360000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
                         () -> {
                             Container.ExecResult disableSinkFileExecResult =
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index c34b69954c..505dc32051 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -137,7 +137,7 @@ public class CheckpointCoordinator {
     /** Flag marking the coordinator as shut down (not accepting any messages 
anymore). */
     private volatile boolean shutdown;
 
-    private volatile boolean isAllTaskReady = false;
+    private final AtomicBoolean isAllTaskReady = new AtomicBoolean(false);
 
     private final ExecutorService executorService;
 
@@ -345,7 +345,10 @@ public class CheckpointCoordinator {
                 return;
             }
         }
-        isAllTaskReady = true;
+        if (!isAllTaskReady.compareAndSet(false, true)) {
+            LOG.info("all task already ready, skip notify task start");
+            return;
+        }
         InvocationFuture<?>[] futures = notifyTaskStart();
         CompletableFuture.allOf(futures).join();
         notifyCompleted(latestCompletedCheckpoint);
@@ -451,7 +454,6 @@ public class CheckpointCoordinator {
                 }
             }
             readyToCloseIdleTask.addAll(subTaskList);
-            tryTriggerPendingCheckpoint(CheckpointType.CHECKPOINT_TYPE);
         }
     }
 
@@ -470,18 +472,18 @@ public class CheckpointCoordinator {
     }
 
     protected void restoreCoordinator(boolean alreadyStarted) {
-        LOG.info("received restore CheckpointCoordinator with alreadyStarted= 
" + alreadyStarted);
+        LOG.info("received restore CheckpointCoordinator with alreadyStarted = 
{}", alreadyStarted);
         errorByPhysicalVertex = new AtomicReference<>();
         checkpointCoordinatorFuture = new CompletableFuture<>();
         updateStatus(CheckpointCoordinatorStatus.RUNNING);
         
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
         shutdown = false;
         if (alreadyStarted) {
-            isAllTaskReady = true;
+            isAllTaskReady.set(true);
             notifyCompleted(latestCompletedCheckpoint);
             tryTriggerPendingCheckpoint(CHECKPOINT_TYPE);
         } else {
-            isAllTaskReady = false;
+            isAllTaskReady.set(false);
         }
     }
 
@@ -492,15 +494,24 @@ public class CheckpointCoordinator {
         }
         final long currentTimestamp = Instant.now().toEpochMilli();
         if (checkpointType.notFinalCheckpoint() && 
checkpointType.notSchemaChangeCheckpoint()) {
-            long diffFromLastTimestamp = currentTimestamp - 
latestTriggerTimestamp.get();
-            if (diffFromLastTimestamp <= 0) {
+            if (!isAllTaskReady.get()) {
+                LOG.info("Not all tasks are ready, skipping checkpoint 
trigger");
+                return;
+            }
+            long interval = currentTimestamp - latestTriggerTimestamp.get();
+            if (interval <= 0) {
                 LOG.error(
                         "The time on your server may not be incremental which 
can lead checkpoint to stop. The latestTriggerTimestamp: ({}), but the 
currentTimestamp: ({})",
                         latestTriggerTimestamp.get(),
                         currentTimestamp);
             }
-            if (diffFromLastTimestamp < 
coordinatorConfig.getCheckpointInterval()
-                    || !isAllTaskReady) {
+            if (interval < coordinatorConfig.getCheckpointInterval()) {
+                LOG.info(
+                        "skip trigger checkpoint because the last trigger 
timestamp is {} and current timestamp is {}, the interval is less than config.",
+                        latestTriggerTimestamp.get(),
+                        currentTimestamp);
+                scheduleTriggerPendingCheckpoint(
+                        checkpointType, 
coordinatorConfig.getCheckpointInterval() - interval);
                 return;
             }
         }
@@ -534,6 +545,10 @@ public class CheckpointCoordinator {
             // if checkpoint type are final type, we don't need to trigger 
next checkpoint
             if (checkpointType.notFinalCheckpoint() && 
checkpointType.notSchemaChangeCheckpoint()) {
                 
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
+            } else {
+                LOG.info(
+                        "skip schedule trigger checkpoint because checkpoint 
type is {}",
+                        checkpointType);
             }
         }
     }
@@ -557,7 +572,7 @@ public class CheckpointCoordinator {
             return completableFutureWithError(
                     CheckpointCloseReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
         }
-        if (!isAllTaskReady) {
+        if (!isAllTaskReady.get()) {
             return completableFutureWithError(
                     CheckpointCloseReason.TASK_NOT_ALL_READY_WHEN_SAVEPOINT);
         }
@@ -596,7 +611,7 @@ public class CheckpointCoordinator {
             CompletableFuture<PendingCheckpoint> pendingCompletableFuture) {
         pendingCompletableFuture.thenAccept(
                 pendingCheckpoint -> {
-                    LOG.info("wait checkpoint completed: " + 
pendingCheckpoint.getCheckpointId());
+                    LOG.info("wait checkpoint completed: {}", 
pendingCheckpoint.getCheckpointId());
                     PassiveCompletableFuture<CompletedCheckpoint> 
completableFuture =
                             pendingCheckpoint.getCompletableFuture();
                     completableFuture.whenCompleteAsync(
@@ -647,8 +662,8 @@ public class CheckpointCoordinator {
                     }
                     if (coordinatorConfig.isCheckpointEnable()) {
                         LOG.debug(
-                                "Start a scheduled task to prevent checkpoint 
timeouts for barrier "
-                                        + pendingCheckpoint.getInfo());
+                                "Start a scheduled task to prevent checkpoint 
timeouts for barrier {}",
+                                pendingCheckpoint.getInfo());
                         long checkpointTimeout = 
coordinatorConfig.getCheckpointTimeout();
                         if 
(pendingCheckpoint.getCheckpointType().isSchemaChangeAfterCheckpoint()) {
                             checkpointTimeout =
@@ -665,8 +680,8 @@ public class CheckpointCoordinator {
                                                             != null
                                                     && 
!pendingCheckpoint.isFullyAcknowledged()) {
                                                 LOG.info(
-                                                        "timeout checkpoint: "
-                                                                + 
pendingCheckpoint.getInfo());
+                                                        "timeout checkpoint: 
{}",
+                                                        
pendingCheckpoint.getInfo());
                                                 handleCoordinatorError(
                                                         
CheckpointCloseReason.CHECKPOINT_EXPIRED,
                                                         null);
@@ -781,7 +796,7 @@ public class CheckpointCoordinator {
 
     protected void cleanPendingCheckpoint(CheckpointCloseReason closedReason) {
         shutdown = true;
-        isAllTaskReady = false;
+        isAllTaskReady.set(false);
         synchronized (lock) {
             LOG.info("start clean pending checkpoint cause {}", 
closedReason.message());
             if (!pendingCheckpoints.isEmpty()) {
@@ -817,7 +832,7 @@ public class CheckpointCoordinator {
         final long checkpointId = ackOperation.getBarrier().getId();
         final PendingCheckpoint pendingCheckpoint = 
pendingCheckpoints.get(checkpointId);
         if (pendingCheckpoint == null) {
-            LOG.info("skip already ack checkpoint " + checkpointId);
+            LOG.info("skip already ack checkpoint {}", checkpointId);
             return;
         }
         TaskLocation location = ackOperation.getTaskLocation();
@@ -995,7 +1010,7 @@ public class CheckpointCoordinator {
                     new RetryUtils.RetryMaterial(
                             Constant.OPERATION_RETRY_TIME,
                             true,
-                            exception -> 
ExceptionUtil.isOperationNeedRetryException(exception),
+                            ExceptionUtil::isOperationNeedRetryException,
                             Constant.OPERATION_RETRY_SLEEP));
         } catch (Exception e) {
             LOG.warn(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
index 3a62cc6e64..243361327a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.checkpoint;
 
+import org.apache.seatunnel.common.utils.ReflectionUtils;
 import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import 
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
@@ -29,7 +30,10 @@ import 
org.apache.seatunnel.engine.server.execution.TaskLocation;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -39,6 +43,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.seatunnel.engine.common.Constant.IMAP_RUNNING_JOB_STATE;
 
@@ -109,4 +114,51 @@ public class CheckpointCoordinatorTest
             executorService.shutdownNow();
         }
     }
+
+    @Test
+    void testCheckpointContinuesWorkAfterClockDrift()
+            throws CheckpointStorageException, ExecutionException, 
InterruptedException,
+                    TimeoutException {
+        CheckpointConfig checkpointConfig = new CheckpointConfig();
+        checkpointConfig.setStorage(new CheckpointStorageConfig());
+        checkpointConfig.setCheckpointTimeout(5000);
+        checkpointConfig.setCheckpointInterval(5000);
+        Map<Integer, CheckpointPlan> planMap = new HashMap<>();
+        planMap.put(
+                1,
+                CheckpointPlan.builder()
+                        .pipelineId(1)
+                        .pipelineSubtasks(Collections.singleton(new 
TaskLocation()))
+                        .build());
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        CompletableFuture<Boolean> invokedHandleCheckpointError = new 
CompletableFuture<>();
+        Instant now = Instant.now();
+        Instant startTime = now.minusSeconds(10);
+        try (MockedStatic<Instant> mockedInstant = 
Mockito.mockStatic(Instant.class)) {
+            mockedInstant.when(Instant::now).thenReturn(startTime);
+            CheckpointManager checkpointManager =
+                    new CheckpointManager(
+                            1L,
+                            false,
+                            nodeEngine,
+                            null,
+                            planMap,
+                            checkpointConfig,
+                            executorService,
+                            
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE)) {
+                        @Override
+                        protected void handleCheckpointError(int pipelineId, 
boolean neverRestore) {
+                            invokedHandleCheckpointError.complete(true);
+                        }
+                    };
+            ReflectionUtils.setField(
+                    checkpointManager.getCheckpointCoordinator(1),
+                    "latestTriggerTimestamp",
+                    new AtomicLong(startTime.toEpochMilli()));
+            checkpointManager.reportedPipelineRunning(1, true);
+            Assertions.assertTrue(invokedHandleCheckpointError.get(1, 
TimeUnit.MINUTES));
+        } finally {
+            executorService.shutdownNow();
+        }
+    }
 }

Reply via email to