This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 37838165a8 [Fix][Zeta] Fix losing checkpoint scheduling in extreme
cases (#9246)
37838165a8 is described below
commit 37838165a8db066bf9f4b37baf3f0a5563788cc8
Author: Jia Fan <[email protected]>
AuthorDate: Wed May 14 01:28:10 2025 +0800
[Fix][Zeta] Fix losing checkpoint scheduling in extreme cases (#9246)
---
.../seatunnel/engine/e2e/CheckpointEnableIT.java | 3 +-
.../server/checkpoint/CheckpointCoordinator.java | 53 ++++++++++++++--------
.../checkpoint/CheckpointCoordinatorTest.java | 52 +++++++++++++++++++++
3 files changed, 88 insertions(+), 20 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
index 661da1b7cd..fff0646bcb 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
@@ -186,7 +186,8 @@ public class CheckpointEnableIT extends TestSuiteBase {
// check sink file is right
AtomicReference<Boolean> checkSinkFile = new AtomicReference<>(false);
- await().atMost(300000, TimeUnit.MILLISECONDS)
+ // the default checkpoint interval is 300s, so we need to wait for
300+60s
+ await().atMost(360000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Container.ExecResult disableSinkFileExecResult =
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index c34b69954c..505dc32051 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -137,7 +137,7 @@ public class CheckpointCoordinator {
/** Flag marking the coordinator as shut down (not accepting any messages
anymore). */
private volatile boolean shutdown;
- private volatile boolean isAllTaskReady = false;
+ private final AtomicBoolean isAllTaskReady = new AtomicBoolean(false);
private final ExecutorService executorService;
@@ -345,7 +345,10 @@ public class CheckpointCoordinator {
return;
}
}
- isAllTaskReady = true;
+ if (!isAllTaskReady.compareAndSet(false, true)) {
+ LOG.info("all task already ready, skip notify task start");
+ return;
+ }
InvocationFuture<?>[] futures = notifyTaskStart();
CompletableFuture.allOf(futures).join();
notifyCompleted(latestCompletedCheckpoint);
@@ -451,7 +454,6 @@ public class CheckpointCoordinator {
}
}
readyToCloseIdleTask.addAll(subTaskList);
- tryTriggerPendingCheckpoint(CheckpointType.CHECKPOINT_TYPE);
}
}
@@ -470,18 +472,18 @@ public class CheckpointCoordinator {
}
protected void restoreCoordinator(boolean alreadyStarted) {
- LOG.info("received restore CheckpointCoordinator with alreadyStarted=
" + alreadyStarted);
+ LOG.info("received restore CheckpointCoordinator with alreadyStarted =
{}", alreadyStarted);
errorByPhysicalVertex = new AtomicReference<>();
checkpointCoordinatorFuture = new CompletableFuture<>();
updateStatus(CheckpointCoordinatorStatus.RUNNING);
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
shutdown = false;
if (alreadyStarted) {
- isAllTaskReady = true;
+ isAllTaskReady.set(true);
notifyCompleted(latestCompletedCheckpoint);
tryTriggerPendingCheckpoint(CHECKPOINT_TYPE);
} else {
- isAllTaskReady = false;
+ isAllTaskReady.set(false);
}
}
@@ -492,15 +494,24 @@ public class CheckpointCoordinator {
}
final long currentTimestamp = Instant.now().toEpochMilli();
if (checkpointType.notFinalCheckpoint() &&
checkpointType.notSchemaChangeCheckpoint()) {
- long diffFromLastTimestamp = currentTimestamp -
latestTriggerTimestamp.get();
- if (diffFromLastTimestamp <= 0) {
+ if (!isAllTaskReady.get()) {
+ LOG.info("Not all tasks are ready, skipping checkpoint
trigger");
+ return;
+ }
+ long interval = currentTimestamp - latestTriggerTimestamp.get();
+ if (interval <= 0) {
LOG.error(
"The time on your server may not be incremental which
can lead checkpoint to stop. The latestTriggerTimestamp: ({}), but the
currentTimestamp: ({})",
latestTriggerTimestamp.get(),
currentTimestamp);
}
- if (diffFromLastTimestamp <
coordinatorConfig.getCheckpointInterval()
- || !isAllTaskReady) {
+ if (interval < coordinatorConfig.getCheckpointInterval()) {
+ LOG.info(
+ "skip trigger checkpoint because the last trigger
timestamp is {} and current timestamp is {}, the interval is less than config.",
+ latestTriggerTimestamp.get(),
+ currentTimestamp);
+ scheduleTriggerPendingCheckpoint(
+ checkpointType,
coordinatorConfig.getCheckpointInterval() - interval);
return;
}
}
@@ -534,6 +545,10 @@ public class CheckpointCoordinator {
// if checkpoint type are final type, we don't need to trigger
next checkpoint
if (checkpointType.notFinalCheckpoint() &&
checkpointType.notSchemaChangeCheckpoint()) {
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
+ } else {
+ LOG.info(
+ "skip schedule trigger checkpoint because checkpoint
type is {}",
+ checkpointType);
}
}
}
@@ -557,7 +572,7 @@ public class CheckpointCoordinator {
return completableFutureWithError(
CheckpointCloseReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
}
- if (!isAllTaskReady) {
+ if (!isAllTaskReady.get()) {
return completableFutureWithError(
CheckpointCloseReason.TASK_NOT_ALL_READY_WHEN_SAVEPOINT);
}
@@ -596,7 +611,7 @@ public class CheckpointCoordinator {
CompletableFuture<PendingCheckpoint> pendingCompletableFuture) {
pendingCompletableFuture.thenAccept(
pendingCheckpoint -> {
- LOG.info("wait checkpoint completed: " +
pendingCheckpoint.getCheckpointId());
+ LOG.info("wait checkpoint completed: {}",
pendingCheckpoint.getCheckpointId());
PassiveCompletableFuture<CompletedCheckpoint>
completableFuture =
pendingCheckpoint.getCompletableFuture();
completableFuture.whenCompleteAsync(
@@ -647,8 +662,8 @@ public class CheckpointCoordinator {
}
if (coordinatorConfig.isCheckpointEnable()) {
LOG.debug(
- "Start a scheduled task to prevent checkpoint
timeouts for barrier "
- + pendingCheckpoint.getInfo());
+ "Start a scheduled task to prevent checkpoint
timeouts for barrier {}",
+ pendingCheckpoint.getInfo());
long checkpointTimeout =
coordinatorConfig.getCheckpointTimeout();
if
(pendingCheckpoint.getCheckpointType().isSchemaChangeAfterCheckpoint()) {
checkpointTimeout =
@@ -665,8 +680,8 @@ public class CheckpointCoordinator {
!= null
&&
!pendingCheckpoint.isFullyAcknowledged()) {
LOG.info(
- "timeout checkpoint: "
- +
pendingCheckpoint.getInfo());
+ "timeout checkpoint:
{}",
+
pendingCheckpoint.getInfo());
handleCoordinatorError(
CheckpointCloseReason.CHECKPOINT_EXPIRED,
null);
@@ -781,7 +796,7 @@ public class CheckpointCoordinator {
protected void cleanPendingCheckpoint(CheckpointCloseReason closedReason) {
shutdown = true;
- isAllTaskReady = false;
+ isAllTaskReady.set(false);
synchronized (lock) {
LOG.info("start clean pending checkpoint cause {}",
closedReason.message());
if (!pendingCheckpoints.isEmpty()) {
@@ -817,7 +832,7 @@ public class CheckpointCoordinator {
final long checkpointId = ackOperation.getBarrier().getId();
final PendingCheckpoint pendingCheckpoint =
pendingCheckpoints.get(checkpointId);
if (pendingCheckpoint == null) {
- LOG.info("skip already ack checkpoint " + checkpointId);
+ LOG.info("skip already ack checkpoint {}", checkpointId);
return;
}
TaskLocation location = ackOperation.getTaskLocation();
@@ -995,7 +1010,7 @@ public class CheckpointCoordinator {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
- exception ->
ExceptionUtil.isOperationNeedRetryException(exception),
+ ExceptionUtil::isOperationNeedRetryException,
Constant.OPERATION_RETRY_SLEEP));
} catch (Exception e) {
LOG.warn(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
index 3a62cc6e64..243361327a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.checkpoint;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
@@ -29,7 +30,10 @@ import
org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -39,6 +43,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.seatunnel.engine.common.Constant.IMAP_RUNNING_JOB_STATE;
@@ -109,4 +114,51 @@ public class CheckpointCoordinatorTest
executorService.shutdownNow();
}
}
+
+ @Test
+ void testCheckpointContinuesWorkAfterClockDrift()
+ throws CheckpointStorageException, ExecutionException,
InterruptedException,
+ TimeoutException {
+ CheckpointConfig checkpointConfig = new CheckpointConfig();
+ checkpointConfig.setStorage(new CheckpointStorageConfig());
+ checkpointConfig.setCheckpointTimeout(5000);
+ checkpointConfig.setCheckpointInterval(5000);
+ Map<Integer, CheckpointPlan> planMap = new HashMap<>();
+ planMap.put(
+ 1,
+ CheckpointPlan.builder()
+ .pipelineId(1)
+ .pipelineSubtasks(Collections.singleton(new
TaskLocation()))
+ .build());
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ CompletableFuture<Boolean> invokedHandleCheckpointError = new
CompletableFuture<>();
+ Instant now = Instant.now();
+ Instant startTime = now.minusSeconds(10);
+ try (MockedStatic<Instant> mockedInstant =
Mockito.mockStatic(Instant.class)) {
+ mockedInstant.when(Instant::now).thenReturn(startTime);
+ CheckpointManager checkpointManager =
+ new CheckpointManager(
+ 1L,
+ false,
+ nodeEngine,
+ null,
+ planMap,
+ checkpointConfig,
+ executorService,
+
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE)) {
+ @Override
+ protected void handleCheckpointError(int pipelineId,
boolean neverRestore) {
+ invokedHandleCheckpointError.complete(true);
+ }
+ };
+ ReflectionUtils.setField(
+ checkpointManager.getCheckpointCoordinator(1),
+ "latestTriggerTimestamp",
+ new AtomicLong(startTime.toEpochMilli()));
+ checkpointManager.reportedPipelineRunning(1, true);
+ Assertions.assertTrue(invokedHandleCheckpointError.get(1,
TimeUnit.MINUTES));
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
}