This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b2b044dc66814498f0cdfb4e9249f446e2c6fce9
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Apr 4 18:38:46 2025 +0200

    [hotfix][runtime] Move checkpointing configuration code to 
CheckpointCoordinatorConfiguration
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 27 +++++-----------------
 .../tasks/CheckpointCoordinatorConfiguration.java  | 19 ++++++++++++++-
 2 files changed, 24 insertions(+), 22 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index c86fe67ec1f..24a0886c88c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -79,7 +79,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.function.Predicate;
@@ -257,6 +256,8 @@ public class CheckpointCoordinator {
 
     private boolean forceFullSnapshot;
 
+    private final long initialTriggeringDelay;
+
     // 
--------------------------------------------------------------------------------------------
 
     public CheckpointCoordinator(
@@ -314,25 +315,12 @@ public class CheckpointCoordinator {
         // sanity checks
         checkNotNull(checkpointStorage);
 
-        // max "in between duration" can be one year - this is to prevent 
numeric overflows
-        long minPauseBetweenCheckpoints = 
chkConfig.getMinPauseBetweenCheckpoints();
-        if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
-            minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
-        }
-
-        // it does not make sense to schedule checkpoints more often then the 
desired
-        // time between checkpoints
-        long baseInterval = chkConfig.getCheckpointInterval();
-        if (baseInterval < minPauseBetweenCheckpoints) {
-            baseInterval = minPauseBetweenCheckpoints;
-        }
-
         this.job = checkNotNull(job);
-        this.baseInterval = baseInterval;
+        this.baseInterval = chkConfig.getCheckpointInterval();
         this.baseIntervalDuringBacklog = 
chkConfig.getCheckpointIntervalDuringBacklog();
         this.nextCheckpointTriggeringRelativeTime = Long.MAX_VALUE;
         this.checkpointTimeout = chkConfig.getCheckpointTimeout();
-        this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+        this.minPauseBetweenCheckpoints = 
chkConfig.getMinPauseBetweenCheckpoints();
         this.coordinatorsToCheckpoint =
                 Collections.unmodifiableCollection(coordinatorsToCheckpoint);
         this.pendingCheckpoints = new LinkedHashMap<>();
@@ -387,6 +375,7 @@ public class CheckpointCoordinator {
                         
this.checkpointsCleaner::getNumberOfCheckpointsToClean);
         this.statsTracker = checkNotNull(statsTracker, "Statistic tracker can 
not be null");
         this.vertexFinishedStateCheckerFactory = 
checkNotNull(vertexFinishedStateCheckerFactory);
+        this.initialTriggeringDelay = chkConfig.getInitialTriggeringDelay();
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -2055,7 +2044,7 @@ public class CheckpointCoordinator {
             }
 
             periodicScheduling = true;
-            scheduleTriggerWithDelay(clock.relativeTimeMillis(), 
getRandomInitDelay());
+            scheduleTriggerWithDelay(clock.relativeTimeMillis(), 
initialTriggeringDelay);
         }
     }
 
@@ -2117,10 +2106,6 @@ public class CheckpointCoordinator {
         }
     }
 
-    private long getRandomInitDelay() {
-        return 
ThreadLocalRandom.current().nextLong(minPauseBetweenCheckpoints, baseInterval + 
1L);
-    }
-
     private void scheduleTriggerWithDelay(long currentRelativeTime, long 
initDelay) {
         nextCheckpointTriggeringRelativeTime = currentRelativeTime + initDelay;
         currentPeriodicTrigger = new ScheduledTrigger();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
index c0f4c07bba4..fccbc92f4ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
@@ -25,6 +25,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.util.Objects;
+import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * Configuration settings for the {@link CheckpointCoordinator}. This includes 
the checkpoint
@@ -140,10 +141,19 @@ public class CheckpointCoordinatorConfiguration 
implements Serializable {
                 !isUnalignedCheckpointsEnabled || maxConcurrentCheckpoints <= 
1,
                 "maxConcurrentCheckpoints can't be > 1 if UnalignedCheckpoints 
enabled");
 
+        // max "in between duration" can be one year - this is to prevent 
numeric overflows
+        if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
+            minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
+        }
+        this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+        // it does not make sense to schedule checkpoints more often then the 
desired
+        // time between checkpoints
+        if (checkpointInterval < minPauseBetweenCheckpoints) {
+            checkpointInterval = minPauseBetweenCheckpoints;
+        }
         this.checkpointInterval = checkpointInterval;
         this.checkpointIntervalDuringBacklog = checkpointIntervalDuringBacklog;
         this.checkpointTimeout = checkpointTimeout;
-        this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
         this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
         this.checkpointRetentionPolicy = 
Preconditions.checkNotNull(checkpointRetentionPolicy);
         this.isExactlyOnce = isExactlyOnce;
@@ -286,6 +296,13 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
         return new CheckpointCoordinatorConfigurationBuilder();
     }
 
+    public long getInitialTriggeringDelay() {
+        return ThreadLocalRandom.current()
+                .nextLong(
+                        minPauseBetweenCheckpoints,
+                        checkpointInterval + (checkpointInterval == 
Long.MAX_VALUE ? 0L : 1L));
+    }
+
     /** {@link CheckpointCoordinatorConfiguration} builder. */
     public static class CheckpointCoordinatorConfigurationBuilder {
         private long checkpointInterval = MINIMAL_CHECKPOINT_TIME;

Reply via email to