Repository: flink
Updated Branches:
  refs/heads/master 7d66aaeb0 -> b181662be


[FLINK-5216] [checkpoints] 'Min Time Between Checkpoints' references timestamp 
after checkpoint


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b181662b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b181662b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b181662b

Branch: refs/heads/master
Commit: b181662be378652d6c4405841ccda6145968d145
Parents: 7d66aae
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 30 20:31:07 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Dec 1 15:55:27 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  45 ++++----
 .../checkpoint/CheckpointCoordinatorTest.java   | 108 +++++++++----------
 2 files changed, 74 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b181662b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 2242c14..8ca4b2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -117,7 +117,7 @@ public class CheckpointCoordinator {
 
        /** The min time(in ms) to delay after a checkpoint could be triggered. 
Allows to
         * enforce minimum processing time between checkpoint attempts */
-       private final long minPauseBetweenCheckpoints;
+       private final long minPauseBetweenCheckpointsNanos;
 
        /** The maximum number of checkpoints that may be in progress at the 
same time */
        private final int maxConcurrentCheckpointAttempts;
@@ -133,7 +133,8 @@ public class CheckpointCoordinator {
 
        private ScheduledTrigger currentPeriodicTrigger;
 
-       private long lastTriggeredCheckpoint;
+       /** The timestamp (via {@link System#nanoTime()}) when the last 
checkpoint completed */
+       private long lastCheckpointCompletionNanos;
 
        /** Flag whether a triggered checkpoint should immediately schedule the 
next checkpoint.
         * Non-volatile, because only accessed in synchronized scope */
@@ -184,6 +185,11 @@ public class CheckpointCoordinator {
                                        "configure configure one via key '" + 
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
                }
 
+               // max "in between duration" can be one year - this is to 
prevent numeric overflows
+               if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
+                       minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 
1_000;
+               }
+
                // it does not make sense to schedule checkpoints more often 
then the desired
                // time between checkpoints
                if (baseInterval < minPauseBetweenCheckpoints) {
@@ -193,7 +199,7 @@ public class CheckpointCoordinator {
                this.job = checkNotNull(job);
                this.baseInterval = baseInterval;
                this.checkpointTimeout = checkpointTimeout;
-               this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+               this.minPauseBetweenCheckpointsNanos = 
minPauseBetweenCheckpoints * 1_000_000;
                this.maxConcurrentCheckpointAttempts = 
maxConcurrentCheckpointAttempts;
                this.tasksToTrigger = checkNotNull(tasksToTrigger);
                this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
@@ -352,13 +358,10 @@ public class CheckpointCoordinator {
                                }
 
                                // make sure the minimum interval between 
checkpoints has passed
-                               long nextCheckpointEarliest = 
lastTriggeredCheckpoint + minPauseBetweenCheckpoints;
-                               if (nextCheckpointEarliest < 0) {
-                                       // overflow
-                                       nextCheckpointEarliest = Long.MAX_VALUE;
-                               }
+                               final long earliestNext = 
lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
+                               final long durationTillNextMillis = 
(earliestNext - System.nanoTime()) / 1_000_000;
 
-                               if (nextCheckpointEarliest > timestamp) {
+                               if (durationTillNextMillis > 0) {
                                        if (currentPeriodicTrigger != null) {
                                                currentPeriodicTrigger.cancel();
                                                currentPeriodicTrigger = null;
@@ -366,8 +369,7 @@ public class CheckpointCoordinator {
                                        ScheduledTrigger trigger = new 
ScheduledTrigger();
                                        // Reassign the new trigger to the 
currentPeriodicTrigger
                                        currentPeriodicTrigger = trigger;
-                                       long delay = nextCheckpointEarliest - 
timestamp;
-                                       timer.scheduleAtFixedRate(trigger, 
delay, baseInterval);
+                                       timer.scheduleAtFixedRate(trigger, 
durationTillNextMillis, baseInterval);
                                        return new 
CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                                }
                        }
@@ -475,29 +477,25 @@ public class CheckpointCoordinator {
                                                }
 
                                                // make sure the minimum 
interval between checkpoints has passed
-                                               long nextCheckpointEarliest = 
lastTriggeredCheckpoint + minPauseBetweenCheckpoints;
-                                               if (nextCheckpointEarliest < 0) 
{
-                                                       // overflow
-                                                       nextCheckpointEarliest 
= Long.MAX_VALUE;
-                                               }
+                                               final long earliestNext = 
lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
+                                               final long 
durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
 
-                                               if (nextCheckpointEarliest > 
timestamp) {
+                                               if (durationTillNextMillis > 0) 
{
                                                        if 
(currentPeriodicTrigger != null) {
                                                                
currentPeriodicTrigger.cancel();
                                                                
currentPeriodicTrigger = null;
                                                        }
+
                                                        ScheduledTrigger 
trigger = new ScheduledTrigger();
                                                        // Reassign the new 
trigger to the currentPeriodicTrigger
                                                        currentPeriodicTrigger 
= trigger;
-                                                       long delay = 
nextCheckpointEarliest - timestamp;
-                                                       
timer.scheduleAtFixedRate(trigger, delay, baseInterval);
+                                                       
timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
                                                        return new 
CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                                                }
                                        }
 
                                        LOG.info("Triggering checkpoint " + 
checkpointID + " @ " + timestamp);
 
-                                       lastTriggeredCheckpoint = 
Math.max(timestamp, lastTriggeredCheckpoint);
                                        pendingCheckpoints.put(checkpointID, 
checkpoint);
                                        timer.schedule(canceller, 
checkpointTimeout);
                                }
@@ -644,8 +642,13 @@ public class CheckpointCoordinator {
                                switch 
(checkpoint.acknowledgeTask(message.getTaskExecutionId(), 
message.getSubtaskState())) {
                                        case SUCCESS:
                                                if 
(checkpoint.isFullyAcknowledged()) {
-                                                       completed = 
checkpoint.finalizeCheckpoint();
 
+                                                       // record the time when 
this was completed, to calculate
+                                                       // the 'min delay 
between checkpoints'
+                                                       
lastCheckpointCompletionNanos = System.nanoTime();
+
+                                                       // complete the 
checkpoint structure
+                                                       completed = 
checkpoint.finalizeCheckpoint();
                                                        
completedCheckpointStore.addCheckpoint(completed);
 
                                                        LOG.info("Completed 
checkpoint " + checkpointId + " (in " +

http://git-wip-us.apache.org/repos/asf/flink/blob/b181662b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 8e46f4c..73eebcf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -70,7 +70,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -1229,82 +1231,72 @@ public class CheckpointCoordinatorTest {
         * another is triggered.
         */
        @Test
-       public void testMinInterval() {
-               try {
-                       final JobID jid = new JobID();
+       public void testMinTimeBetweenCheckpointsInterval() throws Exception {
+               final JobID jid = new JobID();
 
-                       // create some mock execution vertices and trigger some 
checkpoint
-                       final ExecutionAttemptID attemptID1 = new 
ExecutionAttemptID();
-                       ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1);
+               // create some mock execution vertices and trigger some 
checkpoint
+               final ExecutionAttemptID attemptID = new ExecutionAttemptID();
+               final ExecutionVertex vertex = mockExecutionVertex(attemptID);
+               final Execution executionAttempt = 
vertex.getCurrentExecutionAttempt();
 
-                       final AtomicInteger numCalls = new AtomicInteger();
-                       final Execution execution = 
vertex1.getCurrentExecutionAttempt();
+               final BlockingQueue<Long> triggerCalls = new 
LinkedBlockingQueue<>();
 
-                       doAnswer(new Answer<Void>() {
-                               @Override
-                               public Void answer(InvocationOnMock invocation) 
throws Throwable {
-                                       numCalls.incrementAndGet();
-                                       return null;
-                               }
-                       }).when(execution).triggerCheckpoint(anyLong(), 
anyLong());
+               doAnswer(new Answer<Void>() {
+                       @Override
+                       public Void answer(InvocationOnMock invocation) throws 
Throwable {
+                               triggerCalls.add((Long) 
invocation.getArguments()[0]);
+                               return null;
+                       }
+               }).when(executionAttempt).triggerCheckpoint(anyLong(), 
anyLong());
 
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
+               final long delay = 50;
+
+               final CheckpointCoordinator coord = new CheckpointCoordinator(
                                jid,
-                               10,        // periodic interval is 10 ms
-                               200000,    // timeout is very long (200 s)
-                               500,    // 500ms delay between checkpoints
-                               10,
+                               2,           // periodic interval is 2 ms
+                               200_000,     // timeout is very long (200 s)
+                               delay,       // 50 ms delay between checkpoints
+                               1,
                                ExternalizedCheckpointSettings.none(),
-                               new ExecutionVertex[] { vertex1 },
-                               new ExecutionVertex[] { vertex1 },
-                               new ExecutionVertex[] { vertex1 },
+                               new ExecutionVertex[] { vertex },
+                               new ExecutionVertex[] { vertex },
+                               new ExecutionVertex[] { vertex },
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2),
-                               null,
+                               "dummy-path",
                                new DisabledCheckpointStatsTracker(),
                                Executors.directExecutor());
 
+               try {
                        coord.startCheckpointScheduler();
 
-                       //wait until the first checkpoint was triggered
-                       for (int x=0; x<20; x++) {
-                               Thread.sleep(100);
-                               if (numCalls.get() > 0) {
-                                       break;
-                               }
-                       }
+                       // wait until the first checkpoint was triggered
+                       Long firstCallId = triggerCalls.take();
+                       assertEquals(1L, firstCallId.longValue());
 
-                       if (numCalls.get() == 0) {
-                               fail("No checkpoint was triggered within the 
first 2000 ms.");
-                       }
-                       
-                       long start = System.currentTimeMillis();
-
-                       for (int x = 0; x < 20; x++) {
-                               Thread.sleep(100);
-                               int triggeredCheckpoints = numCalls.get();
-                               long curT = System.currentTimeMillis();
-
-                               /**
-                                * Within a given time-frame T only T/500 
checkpoints may be triggered due to the configured minimum
-                                * interval between checkpoints. This value 
however does not not take the first triggered checkpoint
-                                * into account (=> +1). Furthermore we have to 
account for the mis-alignment between checkpoints
-                                * being triggered and our time measurement (=> 
+1); for T=1200 a total of 3-4 checkpoints may have been
-                                * triggered depending on whether the end of 
the minimum interval for the first checkpoints ends before
-                                * or after T=200.
-                                */
-                               long maxAllowedCheckpoints = (curT - start) / 
500 + 2;
-                               assertTrue(maxAllowedCheckpoints >= 
triggeredCheckpoints);
-                       }
+                       AcknowledgeCheckpoint ackMsg = new 
AcknowledgeCheckpoint(
+                                       jid, attemptID, new 
CheckpointMetaData(1L, System.currentTimeMillis()));
 
-                       coord.stopCheckpointScheduler();
+                       // tell the coordinator that the checkpoint is done
+                       final long ackTime = System.nanoTime();
+                       coord.receiveAcknowledgeMessage(ackMsg);
+
+                       // wait until the next checkpoint is triggered
+                       Long nextCallId = triggerCalls.take();
+                       final long nextCheckpointTime = System.nanoTime();
+                       assertEquals(2L, nextCallId.longValue());
+
+                       final long delayMillis = (nextCheckpointTime - ackTime) 
/ 1_000_000;
 
+                       // we need to add one ms here to account for rounding 
errors
+                       if (delayMillis + 1 < delay) {
+                               fail("checkpoint came too early: delay was " + 
delayMillis + " but should have been at least " + delay);
+                       }
+               }
+               finally {
+                       coord.stopCheckpointScheduler();
                        coord.shutdown(JobStatus.FINISHED);
                }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }               
        }
 
        @Test

Reply via email to