This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1175aa908dd45260910d1255789a14eced68cb97
Author: Roman Khachatryan <[email protected]>
AuthorDate: Sun Jan 18 21:03:40 2026 +0100

    [FLINK-38939] Minimize checkpoint trigger delay if sources are waiting for 
a checkpoint
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  7 ++-
 ...ctivatedCheckpointCompletedCheckpointStore.java |  3 +-
 .../tasks/CheckpointCoordinatorConfiguration.java  | 20 +++++++--
 .../checkpoint/CheckpointCoordinatorTest.java      | 52 ++++++++++++++++++++++
 4 files changed, 76 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 24a0886c88c..bff3baa54c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -258,6 +258,8 @@ public class CheckpointCoordinator {
 
     private final long initialTriggeringDelay;
 
+    private long triggerDelay;
+
     // 
--------------------------------------------------------------------------------------------
 
     public CheckpointCoordinator(
@@ -376,6 +378,7 @@ public class CheckpointCoordinator {
         this.statsTracker = checkNotNull(statsTracker, "Statistic tracker can 
not be null");
         this.vertexFinishedStateCheckerFactory = 
checkNotNull(vertexFinishedStateCheckerFactory);
         this.initialTriggeringDelay = chkConfig.getInitialTriggeringDelay();
+        this.triggerDelay = initialTriggeringDelay;
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -2044,7 +2047,9 @@ public class CheckpointCoordinator {
             }
 
             periodicScheduling = true;
-            scheduleTriggerWithDelay(clock.relativeTimeMillis(), 
initialTriggeringDelay);
+            scheduleTriggerWithDelay(clock.relativeTimeMillis(), triggerDelay);
+            // minimize delay for subsequent restarts of checkpoint scheduler
+            triggerDelay = minPauseBetweenCheckpoints;
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java
index 8552a1644af..c5584c05dad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -46,7 +47,7 @@ public enum DeactivatedCheckpointCompletedCheckpointStore 
implements CompletedCh
 
     @Override
     public List<CompletedCheckpoint> getAllCheckpoints() {
-        throw unsupportedOperationException();
+        return Collections.emptyList();
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
index fccbc92f4ae..1f4702213e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
@@ -297,10 +297,22 @@ public class CheckpointCoordinatorConfiguration 
implements Serializable {
     }
 
     public long getInitialTriggeringDelay() {
-        return ThreadLocalRandom.current()
-                .nextLong(
-                        minPauseBetweenCheckpoints,
-                        checkpointInterval + (checkpointInterval == 
Long.MAX_VALUE ? 0L : 1L));
+        return pauseSourcesUntilFirstCheckpoint
+                // if sources a waiting for a checkpoint then trigger a 
checkpoint ASAP (not waiting
+                // for checkpoint interval) to unblock them; however, we still 
need to:
+                // 1. honor the min pause between checkpoints
+                // 2. avoid checkpointing all the jobs on this JM 
simultaneously (hence *2)
+                // 3. account for 0 configured min pause (hence +1)
+                ? ThreadLocalRandom.current()
+                        .nextLong(minPauseBetweenCheckpoints, 
minPauseBetweenCheckpoints * 2 + 1)
+                // otherwise, we can delay the first checkpoint by checkpoint 
interval
+                // additionally, increase the upper bound by 1 to avoid equal 
lower/upper bounds -
+                // unless it might cause an overflow
+                : ThreadLocalRandom.current()
+                        .nextLong(
+                                minPauseBetweenCheckpoints,
+                                checkpointInterval
+                                        + (checkpointInterval == 
Long.MAX_VALUE ? 0L : 1L));
     }
 
     /** {@link CheckpointCoordinatorConfiguration} builder. */
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index f64672fc329..2b5e6653dc3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -92,6 +92,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.verification.VerificationMode;
 
 import javax.annotation.Nullable;
@@ -2571,6 +2573,44 @@ class CheckpointCoordinatorTest {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testTriggeringDelay(boolean pauseSourcesUntilFirstCheckpoint) throws 
Exception {
+        final long minPause = 100L;
+        final long interval = 10000L;
+        CheckpointCoordinatorConfiguration chkConfig =
+                new 
CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder()
+                        .setMinPauseBetweenCheckpoints(minPause)
+                        
.setPauseSourcesUntilFirstCheckpoint(pauseSourcesUntilFirstCheckpoint)
+                        .setCheckpointInterval(interval)
+                        .build();
+        CheckpointCoordinator checkpointCoordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setCheckpointCoordinatorConfiguration(chkConfig)
+                        .setCompletedCheckpointStore(new 
StandaloneCompletedCheckpointStore(2))
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+        // initial start and trigger - delay should be random
+        if (pauseSourcesUntilFirstCheckpoint) {
+            assertThat(startSchedulerAndGetTriggerDelay(checkpointCoordinator))
+                    .as("initial trigger delay should be random between min 
pause and interval")
+                    .isBetween(minPause, minPause * 2);
+        } else {
+            assertThat(startSchedulerAndGetTriggerDelay(checkpointCoordinator))
+                    .as("initial trigger delay should be random between min 
pause and interval")
+                    .isBetween(minPause, interval);
+        }
+
+        // restart scheduler and re-trigger - delay should be minimum (pause)
+        checkpointCoordinator.stopCheckpointScheduler();
+        manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks();
+        manuallyTriggeredScheduledExecutor.triggerAll();
+
+        assertThat(startSchedulerAndGetTriggerDelay(checkpointCoordinator))
+                .as("subsequent trigger delay should be equal to min pause")
+                .isEqualTo(minPause);
+    }
+
     /** Tests that no minimum delay between savepoints is enforced. */
     @Test
     void testMinDelayBetweenSavepoints() throws Exception {
@@ -4515,4 +4555,16 @@ class CheckpointCoordinatorTest {
             super.reportCompletedCheckpoint(completed);
         }
     }
+
+    private long startSchedulerAndGetTriggerDelay(CheckpointCoordinator 
checkpointCoordinator) {
+        checkpointCoordinator.startCheckpointScheduler();
+        checkState(checkpointCoordinator.getNumberOfPendingCheckpoints() == 0);
+        long delay =
+                
Iterables.getOnlyElement(manuallyTriggeredScheduledExecutor.getAllScheduledTasks())
+                        .getDelay(TimeUnit.MILLISECONDS);
+        manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks();
+        manuallyTriggeredScheduledExecutor.triggerAll();
+        checkState(checkpointCoordinator.getNumberOfPendingCheckpoints() >= 1);
+        return delay;
+    }
 }

Reply via email to