This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3b3c79d964 [Feature][Zeta] Support the min-pause configuration for 
checkpoints (#9804)
3b3c79d964 is described below

commit 3b3c79d96466f51eff80752fb348f48b0b499f44
Author: xiaochen <[email protected]>
AuthorDate: Mon Sep 8 18:37:08 2025 +0800

    [Feature][Zeta] Support the min-pause configuration for checkpoints (#9804)
---
 docs/en/seatunnel-engine/checkpoint-storage.md     |   1 -
 .../seatunnel-engine/hybrid-cluster-deployment.md  |   6 ++
 .../separated-cluster-deployment.md                |   6 ++
 docs/zh/seatunnel-engine/checkpoint-storage.md     |   1 -
 .../seatunnel-engine/hybrid-cluster-deployment.md  |   5 +
 .../separated-cluster-deployment.md                |   5 +
 .../seatunnel/api/options/EnvCommonOptions.java    |   8 ++
 .../seatunnel/api/options/EnvOptionRule.java       |   1 +
 .../execution/AbstractFlinkRuntimeEnvironment.java |   3 +
 .../core/starter/flink/utils/ConfigKeyName.java    |   1 +
 .../config/YamlSeaTunnelDomConfigProcessor.java    |   8 ++
 .../common/config/server/CheckpointConfig.java     |   6 ++
 .../common/config/server/ServerConfigOptions.java  |   8 ++
 .../server/checkpoint/CheckpointCoordinator.java   |  26 ++++-
 .../server/checkpoint/CheckpointManager.java       |   2 +-
 .../seatunnel/engine/server/master/JobMaster.java  |   6 ++
 .../checkpoint/CheckpointCoordinatorTest.java      | 116 +++++++++++++++++++++
 17 files changed, 204 insertions(+), 5 deletions(-)

diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md 
b/docs/en/seatunnel-engine/checkpoint-storage.md
index 19c617e015..973777c0f7 100644
--- a/docs/en/seatunnel-engine/checkpoint-storage.md
+++ b/docs/en/seatunnel-engine/checkpoint-storage.md
@@ -278,4 +278,3 @@ seatunnel:
           disable.cache: false
           fs.defaultFS: file:///
 ```
-
diff --git a/docs/en/seatunnel-engine/hybrid-cluster-deployment.md 
b/docs/en/seatunnel-engine/hybrid-cluster-deployment.md
index 7e09112589..474ebe3af3 100644
--- a/docs/en/seatunnel-engine/hybrid-cluster-deployment.md
+++ b/docs/en/seatunnel-engine/hybrid-cluster-deployment.md
@@ -89,6 +89,10 @@ The interval between two checkpoints, in milliseconds. If 
the `checkpoint.interv
 
 The timeout for checkpoints. If the checkpoint cannot be completed within the 
timeout, a checkpoint failure will be triggered and the job will fail. If the 
`checkpoint.timeout` parameter is configured in the job configuration file's 
`env`, the one set in the job configuration file will be used.
 
+**min-pause**
+
+The minimum pause (in milliseconds) between consecutive checkpoints. This 
ensures that checkpoints are not triggered too frequently.
+
 Example
 
 ```yaml
@@ -101,6 +105,8 @@ seatunnel:
         checkpoint:
             interval: 300000
             timeout: 10000
+            min-pause: 5000
+
 ```
 
 **checkpoint storage**
diff --git a/docs/en/seatunnel-engine/separated-cluster-deployment.md 
b/docs/en/seatunnel-engine/separated-cluster-deployment.md
index 4afbc033ed..d8ab285324 100644
--- a/docs/en/seatunnel-engine/separated-cluster-deployment.md
+++ b/docs/en/seatunnel-engine/separated-cluster-deployment.md
@@ -129,6 +129,11 @@ The interval between two checkpoints, in milliseconds. If 
the `checkpoint.interv
 
 The timeout time of the checkpoint. If the checkpoint cannot be completed 
within the timeout time, it will trigger a checkpoint failure and the job 
fails. If the `checkpoint.timeout` parameter is configured in the `env` of the 
job configuration file, it will be subject to the setting in the job 
configuration file.
 
+
+**min-pause**
+
+The minimum pause (in milliseconds) between consecutive checkpoints. This 
ensures that checkpoints are not triggered too frequently.
+
 Example
 
 ```yaml
@@ -141,6 +146,7 @@ seatunnel:
         checkpoint:
             interval: 300000
             timeout: 10000
+            min-pause: 5000
 ```
 
 **checkpoint storage**
diff --git a/docs/zh/seatunnel-engine/checkpoint-storage.md 
b/docs/zh/seatunnel-engine/checkpoint-storage.md
index a60fdff5ae..030789f006 100644
--- a/docs/zh/seatunnel-engine/checkpoint-storage.md
+++ b/docs/zh/seatunnel-engine/checkpoint-storage.md
@@ -251,4 +251,3 @@ seatunnel:
           disable.cache: false
           fs.defaultFS: file:/// 
 ```
-
diff --git a/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md 
b/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
index 664e6e3296..9dea1873bb 100644
--- a/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
+++ b/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
@@ -89,6 +89,10 @@ seatunnel:
 
 
检查点的超时时间。如果在超时时间内无法完成检查点,则会触发检查点失败,作业失败。如果在作业的配置文件的`env`中配置了`checkpoint.timeout`参数,将以作业配置文件中设置的为准。
 
+**min-pause**
+
+连续检查点之间的最小暂停时间(以毫秒为单位),确保检查点不会频繁触发。
+
 示例
 
 ```yaml
@@ -101,6 +105,7 @@ seatunnel:
         checkpoint:
             interval: 300000
             timeout: 10000
+            min-pause: 5000
 ```
 
 **checkpoint storage**
diff --git a/docs/zh/seatunnel-engine/separated-cluster-deployment.md 
b/docs/zh/seatunnel-engine/separated-cluster-deployment.md
index 8fa11e9174..bb0d2ca41e 100644
--- a/docs/zh/seatunnel-engine/separated-cluster-deployment.md
+++ b/docs/zh/seatunnel-engine/separated-cluster-deployment.md
@@ -133,6 +133,10 @@ seatunnel:
 
 
检查点的超时时间。如果在超时时间内无法完成检查点,则会触发检查点失败,作业失败。如果在作业的配置文件的`env`中配置了`checkpoint.timeout`参数,将以作业配置文件中设置的为准。
 
+**min-pause**
+
+连续检查点之间的最小暂停时间(以毫秒为单位),确保检查点不会频繁触发。
+
 示例
 
 ```yaml
@@ -145,6 +149,7 @@ seatunnel:
         checkpoint:
             interval: 300000
             timeout: 10000
+            min-pause: 5000
 ```
 
 **checkpoint storage**
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java
index 663727caf8..0e4e1a7d63 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java
@@ -84,6 +84,14 @@ public class EnvCommonOptions {
                     .noDefaultValue()
                     .withDescription("The timeout (in milliseconds) for a 
checkpoint.");
 
+    public static Option<Integer> CHECKPOINT_MIN_PAUSE =
+            Options.key("min-pause")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription(
+                            "The minimum pause (in milliseconds) between 
consecutive checkpoints. "
+                                    + "This ensures that checkpoints are not 
triggered too frequently and provides.");
+
     public static Option<SaveModeExecuteLocation> SAVEMODE_EXECUTE_LOCATION =
             Options.key("savemode.execute.location")
                     .enumType(SaveModeExecuteLocation.class)
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvOptionRule.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvOptionRule.java
index 0c13cba4df..bb025aa4d2 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvOptionRule.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvOptionRule.java
@@ -42,6 +42,7 @@ public class EnvOptionRule implements Factory {
                         EnvCommonOptions.JARS,
                         EnvCommonOptions.CHECKPOINT_INTERVAL,
                         EnvCommonOptions.CHECKPOINT_TIMEOUT,
+                        EnvCommonOptions.CHECKPOINT_MIN_PAUSE,
                         EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND,
                         EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND,
                         EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION,
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
index c81d0a3dba..39710b60b9 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
@@ -101,6 +101,9 @@ public abstract class AbstractFlinkRuntimeEnvironment 
implements RuntimeEnvironm
         } else if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_TIMEOUT)) {
             long timeout = config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
             checkpointConfig.setCheckpointTimeout(timeout);
+        } else if 
(config.hasPath(EnvCommonOptions.CHECKPOINT_MIN_PAUSE.key())) {
+            long minPause = 
config.getLong(EnvCommonOptions.CHECKPOINT_MIN_PAUSE.key());
+            checkpointConfig.setMinPauseBetweenCheckpoints(minPause);
         }
 
         if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_MODE)) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
index 925d242cc3..bb57ff3891 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
@@ -31,6 +31,7 @@ public class ConfigKeyName {
     @Deprecated public static final String CHECKPOINT_INTERVAL = 
"execution.checkpoint.interval";
     @Deprecated public static final String CHECKPOINT_MODE = 
"execution.checkpoint.mode";
     @Deprecated public static final String CHECKPOINT_TIMEOUT = 
"execution.checkpoint.timeout";
+    @Deprecated public static final String CHECKPOINT_MIN_PAUSE = 
"execution.checkpoint.min-pause";
     @Deprecated public static final String CHECKPOINT_DATA_URI = 
"execution.checkpoint.data-uri";
 
     @Deprecated
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index f4aec7dfc3..65355453dc 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -282,6 +282,14 @@ public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor
                                 
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_TIMEOUT
                                         .key(),
                                 getTextContent(node)));
+            } else if 
(ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_MIN_PAUSE
+                    .key()
+                    .equals(name)) {
+                checkpointConfig.setCheckpointMinPause(
+                        getIntegerValue(
+                                
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_MIN_PAUSE
+                                        .key(),
+                                getTextContent(node)));
             } else if (ServerConfigOptions.MasterServerConfigOptions
                     .SCHEMA_CHANGE_CHECKPOINT_TIMEOUT
                     .key()
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
index 2a1c959704..7b7031b7c2 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
@@ -32,6 +32,8 @@ public class CheckpointConfig implements Serializable {
             
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_INTERVAL.defaultValue();
     private long checkpointTimeout =
             
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_TIMEOUT.defaultValue();
+    private long checkpointMinPause =
+            
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_MIN_PAUSE.defaultValue();
     private long schemaChangeCheckpointTimeout =
             
ServerConfigOptions.MasterServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT
                     .defaultValue();
@@ -55,6 +57,10 @@ public class CheckpointConfig implements Serializable {
         this.checkpointTimeout = checkpointTimeout;
     }
 
+    public void setCheckpointMinPause(long checkpointMinPause) {
+        this.checkpointMinPause = checkpointMinPause;
+    }
+
     public void setSchemaChangeCheckpointTimeout(long checkpointTimeout) {
         checkArgument(
                 checkpointTimeout >= MINIMAL_CHECKPOINT_TIME,
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index ac14f3bcb2..a55eb76f02 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -122,6 +122,14 @@ public class ServerConfigOptions {
                         .defaultValue(30000)
                         .withDescription("The timeout (in milliseconds) for a 
checkpoint.");
 
+        public static final Option<Integer> CHECKPOINT_MIN_PAUSE =
+                Options.key("min-pause")
+                        .intType()
+                        .defaultValue(-1)
+                        .withDescription(
+                                "The minimum pause (in milliseconds) between 
consecutive checkpoints. "
+                                        + "This ensures that checkpoints are 
not triggered too frequently and provides.");
+
         public static final Option<String> CHECKPOINT_STORAGE_TYPE =
                 Options.key("type")
                         .stringType()
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 957090494c..3295ae728a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -361,7 +361,8 @@ public class CheckpointCoordinator {
         }
     }
 
-    private void notifyCompleted(CompletedCheckpoint completedCheckpoint) {
+    @VisibleForTesting
+    protected void notifyCompleted(CompletedCheckpoint completedCheckpoint) {
         if (completedCheckpoint != null) {
             try {
                 LOG.info(
@@ -404,7 +405,9 @@ public class CheckpointCoordinator {
         scheduleTriggerPendingCheckpoint(CHECKPOINT_TYPE, delayMills);
     }
 
-    private void scheduleTriggerPendingCheckpoint(CheckpointType 
checkpointType, long delayMills) {
+    @VisibleForTesting
+    protected void scheduleTriggerPendingCheckpoint(
+            CheckpointType checkpointType, long delayMills) {
         scheduler.schedule(
                 () -> tryTriggerPendingCheckpoint(checkpointType),
                 delayMills,
@@ -514,6 +517,24 @@ public class CheckpointCoordinator {
                         checkpointType, 
coordinatorConfig.getCheckpointInterval() - interval);
                 return;
             }
+
+            if (latestCompletedCheckpoint != null
+                    && coordinatorConfig.getCheckpointMinPause() != -1) {
+                long lastCompletedTime = 
latestCompletedCheckpoint.getCompletedTimestamp();
+                long timeSinceLastCompleted = currentTimestamp - 
lastCompletedTime;
+                if (timeSinceLastCompleted < 
coordinatorConfig.getCheckpointMinPause()) {
+                    long minPauseDelay =
+                            coordinatorConfig.getCheckpointMinPause() - 
timeSinceLastCompleted;
+                    LOG.info(
+                            "skip trigger checkpoint because the last 
completed timestamp is {} and current timestamp is {}, the time since 
completion ({} ms) is less than min-pause ({} ms).",
+                            lastCompletedTime,
+                            currentTimestamp,
+                            timeSinceLastCompleted,
+                            coordinatorConfig.getCheckpointMinPause());
+                    scheduleTriggerPendingCheckpoint(checkpointType, 
minPauseDelay);
+                    return;
+                }
+            }
         }
         synchronized (lock) {
             if (isCompleted() || isShutdown()) {
@@ -909,6 +930,7 @@ public class CheckpointCoordinator {
         notifyCompleted(completedCheckpoint);
         
pendingCheckpoints.remove(checkpointId).abortCheckpointTimeoutFutureWhenIsCompleted();
         pendingCounter.decrementAndGet();
+
         if (isCompleted()) {
             
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
             if (latestCompletedCheckpoint.getCheckpointType().isSavepoint()) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index d579fea9ec..c39b3a2567 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -310,7 +310,7 @@ public class CheckpointManager {
 
     protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation 
operation) {
         log.debug(
-                "Sead Operation : "
+                "Send Operation : "
                         + operation.getClass().getSimpleName()
                         + " to "
                         + jobMaster.queryTaskGroupAddress(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index f58141680d..a8922426cb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -333,6 +333,7 @@ public class JobMaster {
         CheckpointConfig jobCheckpointConfig = new CheckpointConfig();
         
jobCheckpointConfig.setCheckpointTimeout(defaultCheckpointConfig.getCheckpointTimeout());
         
jobCheckpointConfig.setCheckpointInterval(defaultCheckpointConfig.getCheckpointInterval());
+        
jobCheckpointConfig.setCheckpointMinPause(defaultCheckpointConfig.getCheckpointMinPause());
 
         CheckpointStorageConfig jobCheckpointStorageConfig = new 
CheckpointStorageConfig();
         
jobCheckpointStorageConfig.setStorage(defaultCheckpointConfig.getStorage().getStorage());
@@ -357,6 +358,11 @@ public class JobMaster {
                     Long.parseLong(
                             
jobEnv.get(EnvCommonOptions.CHECKPOINT_TIMEOUT.key()).toString()));
         }
+        if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_MIN_PAUSE.key())) {
+            jobCheckpointConfig.setCheckpointMinPause(
+                    Long.parseLong(
+                            
jobEnv.get(EnvCommonOptions.CHECKPOINT_MIN_PAUSE.key()).toString()));
+        }
         return jobCheckpointConfig;
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
index f0fb67a664..16d5fe1c5f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
@@ -27,12 +27,15 @@ import 
org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import 
org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -164,4 +167,117 @@ public class CheckpointCoordinatorTest
             executorService.shutdownNow();
         }
     }
+
+    @Test
+    void testCheckpointMinPause()
+            throws CheckpointStorageException, ExecutionException, 
InterruptedException,
+                    TimeoutException {
+        CheckpointConfig checkpointConfig = new CheckpointConfig();
+        checkpointConfig.setStorage(new CheckpointStorageConfig());
+        checkpointConfig.setCheckpointInterval(10000); // 10 seconds
+        checkpointConfig.setCheckpointMinPause(5000); // 5 seconds min-pause
+        checkpointConfig.setCheckpointTimeout(30000);
+
+        Map<Integer, CheckpointPlan> planMap = new HashMap<>();
+        TaskLocation taskLocation = new TaskLocation(new TaskGroupLocation(1L, 
1, 1), 1, 1);
+        planMap.put(
+                1,
+                CheckpointPlan.builder()
+                        .pipelineId(1)
+                        .pipelineSubtasks(Collections.singleton(taskLocation))
+                        .startingSubtasks(Collections.singleton(taskLocation))
+                        .build());
+
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        JobMaster mockJobMaster = Mockito.mock(JobMaster.class);
+        Mockito.when(mockJobMaster.getJobId()).thenReturn(1L);
+        Mockito.when(mockJobMaster.isNeedRestore()).thenReturn(false);
+        
Mockito.when(mockJobMaster.queryTaskGroupAddress(Mockito.any(TaskGroupLocation.class)))
+                .thenReturn(nodeEngine.getThisAddress());
+
+        // Simulate the scenario: checkpoint starts at 0s, completes at 8s, 
next should trigger at
+        // 13s
+        Instant time0s = Instant.ofEpochMilli(0);
+        // Checkpoint completes at 8s
+        Instant time8s = Instant.ofEpochMilli(8000);
+        Instant time10s = Instant.ofEpochMilli(10000);
+
+        CompletedCheckpoint completedCheckpoint =
+                new CompletedCheckpoint(
+                        1L,
+                        1,
+                        1L,
+                        time0s.toEpochMilli(), // triggerTimestamp (started at 
0s)
+                        CheckpointType.CHECKPOINT_TYPE,
+                        time8s.toEpochMilli(), // completedTimestamp 
(completed at 8s)
+                        new HashMap<>(),
+                        new HashMap<>());
+
+        try (MockedStatic<Instant> mockedInstant = 
Mockito.mockStatic(Instant.class)) {
+            mockedInstant.when(Instant::now).thenReturn(time10s);
+
+            CheckpointManager checkpointManager =
+                    new CheckpointManager(
+                            1L,
+                            false,
+                            nodeEngine,
+                            mockJobMaster,
+                            planMap,
+                            checkpointConfig,
+                            
server.getCheckpointService().getCheckpointStorage(),
+                            executorService,
+                            
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE)) {
+
+                        @Override
+                        public void acknowledgeTask(TaskAcknowledgeOperation 
ackOperation) {
+                            
mockedInstant.when(Instant::now).thenReturn(time8s);
+                            super.acknowledgeTask(ackOperation);
+                        }
+
+                        @Override
+                        public CheckpointCoordinator 
getCheckpointCoordinator(int pipelineId) {
+
+                            CheckpointCoordinator originalCoordinator =
+                                    super.getCheckpointCoordinator(pipelineId);
+                            CheckpointCoordinator spyCheckpointCoordinator =
+                                    Mockito.spy(originalCoordinator);
+                            Mockito.doAnswer(
+                                            invocation -> {
+                                                Object argument = 
invocation.getArgument(1);
+                                                Assertions.assertEquals(
+                                                        3000,
+                                                        
Integer.parseInt(argument.toString()),
+                                                        "Checkpoint should be 
delayed by exactly 3 seconds (from 10s to 13s)");
+                                                return 
invocation.callRealMethod();
+                                            })
+                                    .when(spyCheckpointCoordinator)
+                                    .scheduleTriggerPendingCheckpoint(
+                                            Mockito.any(CheckpointType.class), 
Mockito.anyLong());
+
+                            Mockito.doReturn(new InvocationFuture[0])
+                                    .when(spyCheckpointCoordinator)
+                                    
.notifyCheckpointCompleted(completedCheckpoint);
+                            Mockito.doReturn(new InvocationFuture[0])
+                                    .when(spyCheckpointCoordinator)
+                                    .notifyCheckpointEnd(completedCheckpoint);
+
+                            ReflectionUtils.setField(
+                                    spyCheckpointCoordinator,
+                                    "latestCompletedCheckpoint",
+                                    completedCheckpoint);
+
+                            return spyCheckpointCoordinator;
+                        }
+                    };
+
+            ReflectionUtils.setField(
+                    checkpointManager.getCheckpointCoordinator(1),
+                    "latestTriggerTimestamp",
+                    new AtomicLong(time0s.toEpochMilli()));
+            checkpointManager.reportedPipelineRunning(1, true);
+
+        } finally {
+            executorService.shutdownNow();
+        }
+    }
 }

Reply via email to