This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b2b044dc66814498f0cdfb4e9249f446e2c6fce9 Author: Roman Khachatryan <[email protected]> AuthorDate: Fri Apr 4 18:38:46 2025 +0200 [hotfix][runtime] Move checkpointing configuration code to CheckpointCoordinatorConfiguration --- .../runtime/checkpoint/CheckpointCoordinator.java | 27 +++++----------------- .../tasks/CheckpointCoordinatorConfiguration.java | 19 ++++++++++++++- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index c86fe67ec1f..24a0886c88c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -79,7 +79,6 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Predicate; @@ -257,6 +256,8 @@ public class CheckpointCoordinator { private boolean forceFullSnapshot; + private final long initialTriggeringDelay; + // -------------------------------------------------------------------------------------------- public CheckpointCoordinator( @@ -314,25 +315,12 @@ public class CheckpointCoordinator { // sanity checks checkNotNull(checkpointStorage); - // max "in between duration" can be one year - this is to prevent numeric overflows - long minPauseBetweenCheckpoints = chkConfig.getMinPauseBetweenCheckpoints(); - if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) { - minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000; - } - - // it does not make sense to schedule checkpoints more often then the desired - // time between checkpoints - long baseInterval = chkConfig.getCheckpointInterval(); - if (baseInterval < minPauseBetweenCheckpoints) { - baseInterval = minPauseBetweenCheckpoints; - } - this.job = checkNotNull(job); - this.baseInterval = baseInterval; + this.baseInterval = chkConfig.getCheckpointInterval(); this.baseIntervalDuringBacklog = chkConfig.getCheckpointIntervalDuringBacklog(); this.nextCheckpointTriggeringRelativeTime = Long.MAX_VALUE; this.checkpointTimeout = chkConfig.getCheckpointTimeout(); - this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; + this.minPauseBetweenCheckpoints = chkConfig.getMinPauseBetweenCheckpoints(); this.coordinatorsToCheckpoint = Collections.unmodifiableCollection(coordinatorsToCheckpoint); this.pendingCheckpoints = new LinkedHashMap<>(); @@ -387,6 +375,7 @@ public class CheckpointCoordinator { this.checkpointsCleaner::getNumberOfCheckpointsToClean); this.statsTracker = checkNotNull(statsTracker, "Statistic tracker can not be null"); this.vertexFinishedStateCheckerFactory = checkNotNull(vertexFinishedStateCheckerFactory); + this.initialTriggeringDelay = chkConfig.getInitialTriggeringDelay(); } // -------------------------------------------------------------------------------------------- @@ -2055,7 +2044,7 @@ public class CheckpointCoordinator { } periodicScheduling = true; - scheduleTriggerWithDelay(clock.relativeTimeMillis(), getRandomInitDelay()); + scheduleTriggerWithDelay(clock.relativeTimeMillis(), initialTriggeringDelay); } } @@ -2117,10 +2106,6 @@ public class CheckpointCoordinator { } } - private long getRandomInitDelay() { - return ThreadLocalRandom.current().nextLong(minPauseBetweenCheckpoints, baseInterval + 1L); - } - private void scheduleTriggerWithDelay(long currentRelativeTime, long initDelay) { nextCheckpointTriggeringRelativeTime = currentRelativeTime + initDelay; currentPeriodicTrigger = new ScheduledTrigger(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java index c0f4c07bba4..fccbc92f4ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java @@ -25,6 +25,7 @@ import org.apache.flink.util.Preconditions; import java.io.Serializable; import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; /** * Configuration settings for the {@link CheckpointCoordinator}. This includes the checkpoint @@ -140,10 +141,19 @@ public class CheckpointCoordinatorConfiguration implements Serializable { !isUnalignedCheckpointsEnabled || maxConcurrentCheckpoints <= 1, "maxConcurrentCheckpoints can't be > 1 if UnalignedCheckpoints enabled"); + // max "in between duration" can be one year - this is to prevent numeric overflows + if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) { + minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000; + } + this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; + // it does not make sense to schedule checkpoints more often then the desired + // time between checkpoints + if (checkpointInterval < minPauseBetweenCheckpoints) { + checkpointInterval = minPauseBetweenCheckpoints; + } this.checkpointInterval = checkpointInterval; this.checkpointIntervalDuringBacklog = checkpointIntervalDuringBacklog; this.checkpointTimeout = checkpointTimeout; - this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; this.checkpointRetentionPolicy = Preconditions.checkNotNull(checkpointRetentionPolicy); this.isExactlyOnce = isExactlyOnce; @@ -286,6 +296,13 @@ public class CheckpointCoordinatorConfiguration implements Serializable { return new CheckpointCoordinatorConfigurationBuilder(); } + public long getInitialTriggeringDelay() { + return ThreadLocalRandom.current() + .nextLong( + minPauseBetweenCheckpoints, + checkpointInterval + (checkpointInterval == Long.MAX_VALUE ? 0L : 1L)); + } + /** {@link CheckpointCoordinatorConfiguration} builder. */ public static class CheckpointCoordinatorConfigurationBuilder { private long checkpointInterval = MINIMAL_CHECKPOINT_TIME;
