Repository: flink
Updated Branches:
  refs/heads/master f11352167 -> 9f736d192


[FLINK-9352] Choose initial checkpoint delay randomly to reduce I/O pressure

By choosing the initial checkpoint delay randomly from
[minPauseBetweenCheckpoints, baseInterval] we avoid that multiple restarting 
jobs
have synchronized checkpoints. This can cause otherwise significant I/O 
pressure.

This closes #6092.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f736d19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f736d19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f736d19

Branch: refs/heads/master
Commit: 9f736d1927c62d220a82931c4f5ffa4955910f27
Parents: f113521
Author: yanghua <yanghua1...@gmail.com>
Authored: Tue May 29 15:59:48 2018 +0800
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Sat Jul 7 11:10:08 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/checkpoint/CheckpointCoordinator.java      | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f736d19/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 55e1ffe..803b2ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -59,6 +59,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -133,7 +134,7 @@ public class CheckpointCoordinator {
        /** The max time (in ms) that a checkpoint may take */
        private final long checkpointTimeout;
 
-       /** The min time(in ms) to delay after a checkpoint could be triggered. 
Allows to
+       /** The min time(in ns) to delay after a checkpoint could be triggered. 
Allows to
         * enforce minimum processing time between checkpoint attempts */
        private final long minPauseBetweenCheckpointsNanos;
 
@@ -1173,9 +1174,10 @@ public class CheckpointCoordinator {
                        stopCheckpointScheduler();
 
                        periodicScheduling = true;
+                       long initialDelay = 
ThreadLocalRandom.current().nextLong(
+                               minPauseBetweenCheckpointsNanos / 1_000_000L, 
baseInterval + 1L);
                        currentPeriodicTrigger = timer.scheduleAtFixedRate(
-                                       new ScheduledTrigger(),
-                                       baseInterval, baseInterval, 
TimeUnit.MILLISECONDS);
+                                       new ScheduledTrigger(), initialDelay, 
baseInterval, TimeUnit.MILLISECONDS);
                }
        }
 

Reply via email to