Repository: flink Updated Branches: refs/heads/master f11352167 -> 9f736d192
[FLINK-9352] Choose initial checkpoint delay randomly to reduce I/O pressure By choosing the initial checkpoint delay randomly from [minPauseBetweenCheckpoints, baseInterval] we avoid that multiple restarting jobs have synchronized checkpoints. This can cause otherwise significant I/O pressure. This closes #6092. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f736d19 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f736d19 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f736d19 Branch: refs/heads/master Commit: 9f736d1927c62d220a82931c4f5ffa4955910f27 Parents: f113521 Author: yanghua <yanghua1...@gmail.com> Authored: Tue May 29 15:59:48 2018 +0800 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Sat Jul 7 11:10:08 2018 +0200 ---------------------------------------------------------------------- .../flink/runtime/checkpoint/CheckpointCoordinator.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9f736d19/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 55e1ffe..803b2ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -59,6 +59,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -133,7 +134,7 @@ public class CheckpointCoordinator { /** The max time (in ms) that a checkpoint may take */ private final long checkpointTimeout; - /** The min time(in ms) to delay after a checkpoint could be triggered. Allows to + /** The min time(in ns) to delay after a checkpoint could be triggered. Allows to * enforce minimum processing time between checkpoint attempts */ private final long minPauseBetweenCheckpointsNanos; @@ -1173,9 +1174,10 @@ public class CheckpointCoordinator { stopCheckpointScheduler(); periodicScheduling = true; + long initialDelay = ThreadLocalRandom.current().nextLong( + minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L); currentPeriodicTrigger = timer.scheduleAtFixedRate( - new ScheduledTrigger(), - baseInterval, baseInterval, TimeUnit.MILLISECONDS); + new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS); } }