This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1175aa908dd45260910d1255789a14eced68cb97 Author: Roman Khachatryan <[email protected]> AuthorDate: Sun Jan 18 21:03:40 2026 +0100 [FLINK-38939] Minimize checkpoint trigger delay if sources are waiting for a checkpoint --- .../runtime/checkpoint/CheckpointCoordinator.java | 7 ++- ...ctivatedCheckpointCompletedCheckpointStore.java | 3 +- .../tasks/CheckpointCoordinatorConfiguration.java | 20 +++++++-- .../checkpoint/CheckpointCoordinatorTest.java | 52 ++++++++++++++++++++++ 4 files changed, 76 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 24a0886c88c..bff3baa54c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -258,6 +258,8 @@ public class CheckpointCoordinator { private final long initialTriggeringDelay; + private long triggerDelay; + // -------------------------------------------------------------------------------------------- public CheckpointCoordinator( @@ -376,6 +378,7 @@ public class CheckpointCoordinator { this.statsTracker = checkNotNull(statsTracker, "Statistic tracker can not be null"); this.vertexFinishedStateCheckerFactory = checkNotNull(vertexFinishedStateCheckerFactory); this.initialTriggeringDelay = chkConfig.getInitialTriggeringDelay(); + this.triggerDelay = initialTriggeringDelay; } // -------------------------------------------------------------------------------------------- @@ -2044,7 +2047,9 @@ public class CheckpointCoordinator { } periodicScheduling = true; - scheduleTriggerWithDelay(clock.relativeTimeMillis(), initialTriggeringDelay); + scheduleTriggerWithDelay(clock.relativeTimeMillis(), triggerDelay); + // minimize delay for subsequent restarts of checkpoint scheduler + triggerDelay = minPauseBetweenCheckpoints; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java index 8552a1644af..c5584c05dad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.state.SharedStateRegistry; +import java.util.Collections; import java.util.List; /** @@ -46,7 +47,7 @@ public enum DeactivatedCheckpointCompletedCheckpointStore implements CompletedCh @Override public List<CompletedCheckpoint> getAllCheckpoints() { - throw unsupportedOperationException(); + return Collections.emptyList(); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java index fccbc92f4ae..1f4702213e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java @@ -297,10 +297,22 @@ public class CheckpointCoordinatorConfiguration implements Serializable { } public long getInitialTriggeringDelay() { - return ThreadLocalRandom.current() - .nextLong( - minPauseBetweenCheckpoints, - checkpointInterval + (checkpointInterval == Long.MAX_VALUE ? 0L : 1L)); + return pauseSourcesUntilFirstCheckpoint + // if sources a waiting for a checkpoint then trigger a checkpoint ASAP (not waiting + // for checkpoint interval) to unblock them; however, we still need to: + // 1. honor the min pause between checkpoints + // 2. avoid checkpointing all the jobs on this JM simultaneously (hence *2) + // 3. account for 0 configured min pause (hence +1) + ? ThreadLocalRandom.current() + .nextLong(minPauseBetweenCheckpoints, minPauseBetweenCheckpoints * 2 + 1) + // otherwise, we can delay the first checkpoint by checkpoint interval + // additionally, increase the upper bound by 1 to avoid equal lower/upper bounds - + // unless it might cause an overflow + : ThreadLocalRandom.current() + .nextLong( + minPauseBetweenCheckpoints, + checkpointInterval + + (checkpointInterval == Long.MAX_VALUE ? 0L : 1L)); } /** {@link CheckpointCoordinatorConfiguration} builder. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index f64672fc329..2b5e6653dc3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -92,6 +92,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.verification.VerificationMode; import javax.annotation.Nullable; @@ -2571,6 +2573,44 @@ class CheckpointCoordinatorTest { } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testTriggeringDelay(boolean pauseSourcesUntilFirstCheckpoint) throws Exception { + final long minPause = 100L; + final long interval = 10000L; + CheckpointCoordinatorConfiguration chkConfig = + new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder() + .setMinPauseBetweenCheckpoints(minPause) + .setPauseSourcesUntilFirstCheckpoint(pauseSourcesUntilFirstCheckpoint) + .setCheckpointInterval(interval) + .build(); + CheckpointCoordinator checkpointCoordinator = + new CheckpointCoordinatorBuilder() + .setCheckpointCoordinatorConfiguration(chkConfig) + .setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)) + .setTimer(manuallyTriggeredScheduledExecutor) + .build(EXECUTOR_RESOURCE.getExecutor()); + // initial start and trigger - delay should be random + if (pauseSourcesUntilFirstCheckpoint) { + assertThat(startSchedulerAndGetTriggerDelay(checkpointCoordinator)) + .as("initial trigger delay should be random between min pause and interval") + .isBetween(minPause, minPause * 2); + } else { + assertThat(startSchedulerAndGetTriggerDelay(checkpointCoordinator)) + .as("initial trigger delay should be random between min pause and interval") + .isBetween(minPause, interval); + } + + // restart scheduler and re-trigger - delay should be minimum (pause) + checkpointCoordinator.stopCheckpointScheduler(); + manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(); + manuallyTriggeredScheduledExecutor.triggerAll(); + + assertThat(startSchedulerAndGetTriggerDelay(checkpointCoordinator)) + .as("subsequent trigger delay should be equal to min pause") + .isEqualTo(minPause); + } + /** Tests that no minimum delay between savepoints is enforced. */ @Test void testMinDelayBetweenSavepoints() throws Exception { @@ -4515,4 +4555,16 @@ class CheckpointCoordinatorTest { super.reportCompletedCheckpoint(completed); } } + + private long startSchedulerAndGetTriggerDelay(CheckpointCoordinator checkpointCoordinator) { + checkpointCoordinator.startCheckpointScheduler(); + checkState(checkpointCoordinator.getNumberOfPendingCheckpoints() == 0); + long delay = + Iterables.getOnlyElement(manuallyTriggeredScheduledExecutor.getAllScheduledTasks()) + .getDelay(TimeUnit.MILLISECONDS); + manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(); + manuallyTriggeredScheduledExecutor.triggerAll(); + checkState(checkpointCoordinator.getNumberOfPendingCheckpoints() >= 1); + return delay; + } }
