This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3b3c79d964 [Feature][Zeta] Support the min-pause configuration for
checkpoints (#9804)
3b3c79d964 is described below
commit 3b3c79d96466f51eff80752fb348f48b0b499f44
Author: xiaochen <[email protected]>
AuthorDate: Mon Sep 8 18:37:08 2025 +0800
[Feature][Zeta] Support the min-pause configuration for checkpoints (#9804)
---
docs/en/seatunnel-engine/checkpoint-storage.md | 1 -
.../seatunnel-engine/hybrid-cluster-deployment.md | 6 ++
.../separated-cluster-deployment.md | 6 ++
docs/zh/seatunnel-engine/checkpoint-storage.md | 1 -
.../seatunnel-engine/hybrid-cluster-deployment.md | 5 +
.../separated-cluster-deployment.md | 5 +
.../seatunnel/api/options/EnvCommonOptions.java | 8 ++
.../seatunnel/api/options/EnvOptionRule.java | 1 +
.../execution/AbstractFlinkRuntimeEnvironment.java | 3 +
.../core/starter/flink/utils/ConfigKeyName.java | 1 +
.../config/YamlSeaTunnelDomConfigProcessor.java | 8 ++
.../common/config/server/CheckpointConfig.java | 6 ++
.../common/config/server/ServerConfigOptions.java | 8 ++
.../server/checkpoint/CheckpointCoordinator.java | 26 ++++-
.../server/checkpoint/CheckpointManager.java | 2 +-
.../seatunnel/engine/server/master/JobMaster.java | 6 ++
.../checkpoint/CheckpointCoordinatorTest.java | 116 +++++++++++++++++++++
17 files changed, 204 insertions(+), 5 deletions(-)
diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md
b/docs/en/seatunnel-engine/checkpoint-storage.md
index 19c617e015..973777c0f7 100644
--- a/docs/en/seatunnel-engine/checkpoint-storage.md
+++ b/docs/en/seatunnel-engine/checkpoint-storage.md
@@ -278,4 +278,3 @@ seatunnel:
disable.cache: false
fs.defaultFS: file:///
```
-
diff --git a/docs/en/seatunnel-engine/hybrid-cluster-deployment.md
b/docs/en/seatunnel-engine/hybrid-cluster-deployment.md
index 7e09112589..474ebe3af3 100644
--- a/docs/en/seatunnel-engine/hybrid-cluster-deployment.md
+++ b/docs/en/seatunnel-engine/hybrid-cluster-deployment.md
@@ -89,6 +89,10 @@ The interval between two checkpoints, in milliseconds. If
the `checkpoint.interv
The timeout for checkpoints. If the checkpoint cannot be completed within the
timeout, a checkpoint failure will be triggered and the job will fail. If the
`checkpoint.timeout` parameter is configured in the job configuration file's
`env`, the one set in the job configuration file will be used.
+**min-pause**
+
+The minimum pause (in milliseconds) between consecutive checkpoints. This
ensures that checkpoints are not triggered too frequently.
+
Example
```yaml
@@ -101,6 +105,8 @@ seatunnel:
checkpoint:
interval: 300000
timeout: 10000
+ min-pause: 5000
+
```
**checkpoint storage**
diff --git a/docs/en/seatunnel-engine/separated-cluster-deployment.md
b/docs/en/seatunnel-engine/separated-cluster-deployment.md
index 4afbc033ed..d8ab285324 100644
--- a/docs/en/seatunnel-engine/separated-cluster-deployment.md
+++ b/docs/en/seatunnel-engine/separated-cluster-deployment.md
@@ -129,6 +129,11 @@ The interval between two checkpoints, in milliseconds. If
the `checkpoint.interv
The timeout time of the checkpoint. If the checkpoint cannot be completed
within the timeout time, it will trigger a checkpoint failure and the job
fails. If the `checkpoint.timeout` parameter is configured in the `env` of the
job configuration file, it will be subject to the setting in the job
configuration file.
+
+**min-pause**
+
+The minimum pause (in milliseconds) between consecutive checkpoints. This
ensures that checkpoints are not triggered too frequently.
+
Example
```yaml
@@ -141,6 +146,7 @@ seatunnel:
checkpoint:
interval: 300000
timeout: 10000
+ min-pause: 5000
```
**checkpoint storage**
diff --git a/docs/zh/seatunnel-engine/checkpoint-storage.md
b/docs/zh/seatunnel-engine/checkpoint-storage.md
index a60fdff5ae..030789f006 100644
--- a/docs/zh/seatunnel-engine/checkpoint-storage.md
+++ b/docs/zh/seatunnel-engine/checkpoint-storage.md
@@ -251,4 +251,3 @@ seatunnel:
disable.cache: false
fs.defaultFS: file:///
```
-
diff --git a/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
b/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
index 664e6e3296..9dea1873bb 100644
--- a/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
+++ b/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
@@ -89,6 +89,10 @@ seatunnel:
检查点的超时时间。如果在超时时间内无法完成检查点,则会触发检查点失败,作业失败。如果在作业的配置文件的`env`中配置了`checkpoint.timeout`参数,将以作业配置文件中设置的为准。
+**min-pause**
+
+连续检查点之间的最小暂停时间(以毫秒为单位),确保检查点不会频繁触发。
+
示例
```yaml
@@ -101,6 +105,7 @@ seatunnel:
checkpoint:
interval: 300000
timeout: 10000
+ min-pause: 5000
```
**checkpoint storage**
diff --git a/docs/zh/seatunnel-engine/separated-cluster-deployment.md
b/docs/zh/seatunnel-engine/separated-cluster-deployment.md
index 8fa11e9174..bb0d2ca41e 100644
--- a/docs/zh/seatunnel-engine/separated-cluster-deployment.md
+++ b/docs/zh/seatunnel-engine/separated-cluster-deployment.md
@@ -133,6 +133,10 @@ seatunnel:
检查点的超时时间。如果在超时时间内无法完成检查点,则会触发检查点失败,作业失败。如果在作业的配置文件的`env`中配置了`checkpoint.timeout`参数,将以作业配置文件中设置的为准。
+**min-pause**
+
+连续检查点之间的最小暂停时间(以毫秒为单位),确保检查点不会频繁触发。
+
示例
```yaml
@@ -145,6 +149,7 @@ seatunnel:
checkpoint:
interval: 300000
timeout: 10000
+ min-pause: 5000
```
**checkpoint storage**
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java
index 663727caf8..0e4e1a7d63 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java
@@ -84,6 +84,14 @@ public class EnvCommonOptions {
.noDefaultValue()
.withDescription("The timeout (in milliseconds) for a
checkpoint.");
+ public static Option<Integer> CHECKPOINT_MIN_PAUSE =
+ Options.key("min-pause")
+ .intType()
+ .defaultValue(-1)
+ .withDescription(
+ "The minimum pause (in milliseconds) between
consecutive checkpoints. "
+ + "This ensures that checkpoints are not
triggered too frequently and provides.");
+
public static Option<SaveModeExecuteLocation> SAVEMODE_EXECUTE_LOCATION =
Options.key("savemode.execute.location")
.enumType(SaveModeExecuteLocation.class)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvOptionRule.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvOptionRule.java
index 0c13cba4df..bb025aa4d2 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvOptionRule.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvOptionRule.java
@@ -42,6 +42,7 @@ public class EnvOptionRule implements Factory {
EnvCommonOptions.JARS,
EnvCommonOptions.CHECKPOINT_INTERVAL,
EnvCommonOptions.CHECKPOINT_TIMEOUT,
+ EnvCommonOptions.CHECKPOINT_MIN_PAUSE,
EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND,
EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND,
EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION,
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
index c81d0a3dba..39710b60b9 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
@@ -101,6 +101,9 @@ public abstract class AbstractFlinkRuntimeEnvironment
implements RuntimeEnvironm
} else if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.CHECKPOINT_TIMEOUT)) {
long timeout = config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
checkpointConfig.setCheckpointTimeout(timeout);
+ } else if
(config.hasPath(EnvCommonOptions.CHECKPOINT_MIN_PAUSE.key())) {
+ long minPause =
config.getLong(EnvCommonOptions.CHECKPOINT_MIN_PAUSE.key());
+ checkpointConfig.setMinPauseBetweenCheckpoints(minPause);
}
if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.CHECKPOINT_MODE)) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
index 925d242cc3..bb57ff3891 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
@@ -31,6 +31,7 @@ public class ConfigKeyName {
@Deprecated public static final String CHECKPOINT_INTERVAL =
"execution.checkpoint.interval";
@Deprecated public static final String CHECKPOINT_MODE =
"execution.checkpoint.mode";
@Deprecated public static final String CHECKPOINT_TIMEOUT =
"execution.checkpoint.timeout";
+ @Deprecated public static final String CHECKPOINT_MIN_PAUSE =
"execution.checkpoint.min-pause";
@Deprecated public static final String CHECKPOINT_DATA_URI =
"execution.checkpoint.data-uri";
@Deprecated
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index f4aec7dfc3..65355453dc 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -282,6 +282,14 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_TIMEOUT
.key(),
getTextContent(node)));
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_MIN_PAUSE
+ .key()
+ .equals(name)) {
+ checkpointConfig.setCheckpointMinPause(
+ getIntegerValue(
+
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_MIN_PAUSE
+ .key(),
+ getTextContent(node)));
} else if (ServerConfigOptions.MasterServerConfigOptions
.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT
.key()
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
index 2a1c959704..7b7031b7c2 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
@@ -32,6 +32,8 @@ public class CheckpointConfig implements Serializable {
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_INTERVAL.defaultValue();
private long checkpointTimeout =
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_TIMEOUT.defaultValue();
+ private long checkpointMinPause =
+
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_MIN_PAUSE.defaultValue();
private long schemaChangeCheckpointTimeout =
ServerConfigOptions.MasterServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT
.defaultValue();
@@ -55,6 +57,10 @@ public class CheckpointConfig implements Serializable {
this.checkpointTimeout = checkpointTimeout;
}
+ public void setCheckpointMinPause(long checkpointMinPause) {
+ this.checkpointMinPause = checkpointMinPause;
+ }
+
public void setSchemaChangeCheckpointTimeout(long checkpointTimeout) {
checkArgument(
checkpointTimeout >= MINIMAL_CHECKPOINT_TIME,
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index ac14f3bcb2..a55eb76f02 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -122,6 +122,14 @@ public class ServerConfigOptions {
.defaultValue(30000)
.withDescription("The timeout (in milliseconds) for a
checkpoint.");
+ public static final Option<Integer> CHECKPOINT_MIN_PAUSE =
+ Options.key("min-pause")
+ .intType()
+ .defaultValue(-1)
+ .withDescription(
+ "The minimum pause (in milliseconds) between
consecutive checkpoints. "
+ + "This ensures that checkpoints are
not triggered too frequently and provides.");
+
public static final Option<String> CHECKPOINT_STORAGE_TYPE =
Options.key("type")
.stringType()
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 957090494c..3295ae728a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -361,7 +361,8 @@ public class CheckpointCoordinator {
}
}
- private void notifyCompleted(CompletedCheckpoint completedCheckpoint) {
+ @VisibleForTesting
+ protected void notifyCompleted(CompletedCheckpoint completedCheckpoint) {
if (completedCheckpoint != null) {
try {
LOG.info(
@@ -404,7 +405,9 @@ public class CheckpointCoordinator {
scheduleTriggerPendingCheckpoint(CHECKPOINT_TYPE, delayMills);
}
- private void scheduleTriggerPendingCheckpoint(CheckpointType
checkpointType, long delayMills) {
+ @VisibleForTesting
+ protected void scheduleTriggerPendingCheckpoint(
+ CheckpointType checkpointType, long delayMills) {
scheduler.schedule(
() -> tryTriggerPendingCheckpoint(checkpointType),
delayMills,
@@ -514,6 +517,24 @@ public class CheckpointCoordinator {
checkpointType,
coordinatorConfig.getCheckpointInterval() - interval);
return;
}
+
+ if (latestCompletedCheckpoint != null
+ && coordinatorConfig.getCheckpointMinPause() != -1) {
+ long lastCompletedTime =
latestCompletedCheckpoint.getCompletedTimestamp();
+ long timeSinceLastCompleted = currentTimestamp -
lastCompletedTime;
+ if (timeSinceLastCompleted <
coordinatorConfig.getCheckpointMinPause()) {
+ long minPauseDelay =
+ coordinatorConfig.getCheckpointMinPause() -
timeSinceLastCompleted;
+ LOG.info(
+ "skip trigger checkpoint because the last
completed timestamp is {} and current timestamp is {}, the time since
completion ({} ms) is less than min-pause ({} ms).",
+ lastCompletedTime,
+ currentTimestamp,
+ timeSinceLastCompleted,
+ coordinatorConfig.getCheckpointMinPause());
+ scheduleTriggerPendingCheckpoint(checkpointType,
minPauseDelay);
+ return;
+ }
+ }
}
synchronized (lock) {
if (isCompleted() || isShutdown()) {
@@ -909,6 +930,7 @@ public class CheckpointCoordinator {
notifyCompleted(completedCheckpoint);
pendingCheckpoints.remove(checkpointId).abortCheckpointTimeoutFutureWhenIsCompleted();
pendingCounter.decrementAndGet();
+
if (isCompleted()) {
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
if (latestCompletedCheckpoint.getCheckpointType().isSavepoint()) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index d579fea9ec..c39b3a2567 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -310,7 +310,7 @@ public class CheckpointManager {
protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation
operation) {
log.debug(
- "Sead Operation : "
+ "Send Operation : "
+ operation.getClass().getSimpleName()
+ " to "
+ jobMaster.queryTaskGroupAddress(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index f58141680d..a8922426cb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -333,6 +333,7 @@ public class JobMaster {
CheckpointConfig jobCheckpointConfig = new CheckpointConfig();
jobCheckpointConfig.setCheckpointTimeout(defaultCheckpointConfig.getCheckpointTimeout());
jobCheckpointConfig.setCheckpointInterval(defaultCheckpointConfig.getCheckpointInterval());
+
jobCheckpointConfig.setCheckpointMinPause(defaultCheckpointConfig.getCheckpointMinPause());
CheckpointStorageConfig jobCheckpointStorageConfig = new
CheckpointStorageConfig();
jobCheckpointStorageConfig.setStorage(defaultCheckpointConfig.getStorage().getStorage());
@@ -357,6 +358,11 @@ public class JobMaster {
Long.parseLong(
jobEnv.get(EnvCommonOptions.CHECKPOINT_TIMEOUT.key()).toString()));
}
+ if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_MIN_PAUSE.key())) {
+ jobCheckpointConfig.setCheckpointMinPause(
+ Long.parseLong(
+
jobEnv.get(EnvCommonOptions.CHECKPOINT_MIN_PAUSE.key()).toString()));
+ }
return jobCheckpointConfig;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
index f0fb67a664..16d5fe1c5f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
@@ -27,12 +27,15 @@ import
org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import
org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
@@ -164,4 +167,117 @@ public class CheckpointCoordinatorTest
executorService.shutdownNow();
}
}
+
+ @Test
+ void testCheckpointMinPause()
+ throws CheckpointStorageException, ExecutionException,
InterruptedException,
+ TimeoutException {
+ CheckpointConfig checkpointConfig = new CheckpointConfig();
+ checkpointConfig.setStorage(new CheckpointStorageConfig());
+ checkpointConfig.setCheckpointInterval(10000); // 10 seconds
+ checkpointConfig.setCheckpointMinPause(5000); // 5 seconds min-pause
+ checkpointConfig.setCheckpointTimeout(30000);
+
+ Map<Integer, CheckpointPlan> planMap = new HashMap<>();
+ TaskLocation taskLocation = new TaskLocation(new TaskGroupLocation(1L,
1, 1), 1, 1);
+ planMap.put(
+ 1,
+ CheckpointPlan.builder()
+ .pipelineId(1)
+ .pipelineSubtasks(Collections.singleton(taskLocation))
+ .startingSubtasks(Collections.singleton(taskLocation))
+ .build());
+
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ JobMaster mockJobMaster = Mockito.mock(JobMaster.class);
+ Mockito.when(mockJobMaster.getJobId()).thenReturn(1L);
+ Mockito.when(mockJobMaster.isNeedRestore()).thenReturn(false);
+
Mockito.when(mockJobMaster.queryTaskGroupAddress(Mockito.any(TaskGroupLocation.class)))
+ .thenReturn(nodeEngine.getThisAddress());
+
+ // Simulate the scenario: checkpoint starts at 0s, completes at 8s,
next should trigger at
+ // 13s
+ Instant time0s = Instant.ofEpochMilli(0);
+ // Checkpoint completes at 8s
+ Instant time8s = Instant.ofEpochMilli(8000);
+ Instant time10s = Instant.ofEpochMilli(10000);
+
+ CompletedCheckpoint completedCheckpoint =
+ new CompletedCheckpoint(
+ 1L,
+ 1,
+ 1L,
+ time0s.toEpochMilli(), // triggerTimestamp (started at
0s)
+ CheckpointType.CHECKPOINT_TYPE,
+ time8s.toEpochMilli(), // completedTimestamp
(completed at 8s)
+ new HashMap<>(),
+ new HashMap<>());
+
+ try (MockedStatic<Instant> mockedInstant =
Mockito.mockStatic(Instant.class)) {
+ mockedInstant.when(Instant::now).thenReturn(time10s);
+
+ CheckpointManager checkpointManager =
+ new CheckpointManager(
+ 1L,
+ false,
+ nodeEngine,
+ mockJobMaster,
+ planMap,
+ checkpointConfig,
+
server.getCheckpointService().getCheckpointStorage(),
+ executorService,
+
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE)) {
+
+ @Override
+ public void acknowledgeTask(TaskAcknowledgeOperation
ackOperation) {
+
mockedInstant.when(Instant::now).thenReturn(time8s);
+ super.acknowledgeTask(ackOperation);
+ }
+
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator(int pipelineId) {
+
+ CheckpointCoordinator originalCoordinator =
+ super.getCheckpointCoordinator(pipelineId);
+ CheckpointCoordinator spyCheckpointCoordinator =
+ Mockito.spy(originalCoordinator);
+ Mockito.doAnswer(
+ invocation -> {
+ Object argument =
invocation.getArgument(1);
+ Assertions.assertEquals(
+ 3000,
+
Integer.parseInt(argument.toString()),
+ "Checkpoint should be
delayed by exactly 3 seconds (from 10s to 13s)");
+ return
invocation.callRealMethod();
+ })
+ .when(spyCheckpointCoordinator)
+ .scheduleTriggerPendingCheckpoint(
+ Mockito.any(CheckpointType.class),
Mockito.anyLong());
+
+ Mockito.doReturn(new InvocationFuture[0])
+ .when(spyCheckpointCoordinator)
+
.notifyCheckpointCompleted(completedCheckpoint);
+ Mockito.doReturn(new InvocationFuture[0])
+ .when(spyCheckpointCoordinator)
+ .notifyCheckpointEnd(completedCheckpoint);
+
+ ReflectionUtils.setField(
+ spyCheckpointCoordinator,
+ "latestCompletedCheckpoint",
+ completedCheckpoint);
+
+ return spyCheckpointCoordinator;
+ }
+ };
+
+ ReflectionUtils.setField(
+ checkpointManager.getCheckpointCoordinator(1),
+ "latestTriggerTimestamp",
+ new AtomicLong(time0s.toEpochMilli()));
+ checkpointManager.reportedPipelineRunning(1, true);
+
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
}