Repository: flink
Updated Branches:
  refs/heads/release-1.1 7b5d769ad -> 59f61bf6c


[FLINK-5216] [checkpoints] 'Min Time Between Checkpoints' references timestamp 
after checkpoint


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e475eb2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e475eb2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e475eb2d

Branch: refs/heads/release-1.1
Commit: e475eb2d9705b8948ce862f25adf91e25a4948b0
Parents: 7b5d769
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 30 20:31:07 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Dec 1 14:01:40 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  43 ++++--
 .../checkpoint/CheckpointCoordinatorTest.java   | 133 +++++++++----------
 2 files changed, 97 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e475eb2d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 24cc3cb..0d09922 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -117,7 +117,7 @@ public class CheckpointCoordinator {
 
        /** The min time(in ms) to delay after a checkpoint could be triggered. 
Allows to
         * enforce minimum processing time between checkpoint attempts */
-       private final long minPauseBetweenCheckpoints;
+       private final long minPauseBetweenCheckpointsNanos;
 
        /** The maximum number of checkpoints that may be in progress at the 
same time */
        private final int maxConcurrentCheckpointAttempts;
@@ -133,7 +133,8 @@ public class CheckpointCoordinator {
 
        private ScheduledTrigger currentPeriodicTrigger;
 
-       private long lastTriggeredCheckpoint;
+       /** The timestamp (via {@link System#nanoTime()}) when the last 
checkpoint completed */
+       private long lastCheckpointCompletionNanos;
 
        /** Flag whether a triggered checkpoint should immediately schedule the 
next checkpoint.
         * Non-volatile, because only accessed in synchronized scope */
@@ -201,10 +202,16 @@ public class CheckpointCoordinator {
                checkArgument(minPauseBetweenCheckpoints >= 0, 
"minPauseBetweenCheckpoints must be >= 0");
                checkArgument(maxConcurrentCheckpointAttempts >= 1, 
"maxConcurrentCheckpointAttempts must be >= 1");
 
+               // max "in between duration" can be one year - this is to 
prevent numeric overflows
+               if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
+                       minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 
1_000;
+                       LOG.warn("Reducing minimum pause between checkpoints to 
" + minPauseBetweenCheckpoints + " ms (1 year)");
+               }
+
                this.job = checkNotNull(job);
                this.baseInterval = baseInterval;
                this.checkpointTimeout = checkpointTimeout;
-               this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+               this.minPauseBetweenCheckpointsNanos = 
minPauseBetweenCheckpoints * 1_000_000;
                this.maxConcurrentCheckpointAttempts = 
maxConcurrentCheckpointAttempts;
                this.tasksToTrigger = checkNotNull(tasksToTrigger);
                this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
@@ -417,13 +424,16 @@ public class CheckpointCoordinator {
                                return false;
                        }
 
-                       //make sure the minimum interval between checkpoints 
has passed
-                       if (lastTriggeredCheckpoint + 
minPauseBetweenCheckpoints > timestamp && baseInterval != Long.MAX_VALUE) {
+                       // make sure the minimum interval between checkpoints 
has passed
+                       final long earliestNext = lastCheckpointCompletionNanos 
+ minPauseBetweenCheckpointsNanos;
+                       final long durationTillNextMillis = (earliestNext - 
System.nanoTime()) / 1_000_000;
+
+                       if (durationTillNextMillis > 0 && baseInterval != 
Long.MAX_VALUE) {
                                if (currentPeriodicTrigger != null) {
                                        currentPeriodicTrigger.cancel();
                                }
                                currentPeriodicTrigger = new ScheduledTrigger();
-                               
timer.scheduleAtFixedRate(currentPeriodicTrigger, minPauseBetweenCheckpoints, 
baseInterval);
+                               
timer.scheduleAtFixedRate(currentPeriodicTrigger, durationTillNextMillis, 
baseInterval);
                                return false;
                        }
                }
@@ -458,8 +468,6 @@ public class CheckpointCoordinator {
                }
 
                // we will actually trigger this checkpoint!
-
-               lastTriggeredCheckpoint = timestamp;
                final long checkpointID;
                if (nextCheckpointId < 0) {
                        try {
@@ -532,6 +540,19 @@ public class CheckpointCoordinator {
                                        return false;
                                }
 
+                               // make sure the minimum interval between 
checkpoints has passed
+                               final long earliestNext = 
lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
+                               final long durationTillNextMillis = 
(earliestNext - System.nanoTime()) / 1_000_000;
+
+                               if (durationTillNextMillis > 0 && baseInterval 
!= Long.MAX_VALUE) {
+                                       if (currentPeriodicTrigger != null) {
+                                               currentPeriodicTrigger.cancel();
+                                       }
+                                       currentPeriodicTrigger = new 
ScheduledTrigger();
+                                       
timer.scheduleAtFixedRate(currentPeriodicTrigger, durationTillNextMillis, 
baseInterval);
+                                       return false;
+                               }
+
                                pendingCheckpoints.put(checkpointID, 
checkpoint);
                                timer.schedule(canceller, checkpointTimeout);
                        }
@@ -682,8 +703,10 @@ public class CheckpointCoordinator {
 
                                switch 
(checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), 
message.getStateSize(), null)) {
                                        case SUCCESS:
-                                               // TODO: Give KV-state to the 
acknowledgeTask method
+
                                                if 
(checkpoint.isFullyAcknowledged()) {
+
+                                                       
lastCheckpointCompletionNanos = System.nanoTime();
                                                        completed = 
checkpoint.finalizeCheckpoint();
 
                                                        
completedCheckpointStore.addCheckpoint(completed);
@@ -805,6 +828,8 @@ public class CheckpointCoordinator {
         * <p>NOTE: The caller of this method must hold the lock when invoking 
the method!
         */
        private void triggerQueuedRequests() {
+               assert Thread.holdsLock(lock);
+
                if (triggerRequestQueued) {
                        triggerRequestQueued = false;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e475eb2d/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 35cce85..9159711 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -41,6 +41,8 @@ import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -50,6 +52,7 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -1193,102 +1196,92 @@ public class CheckpointCoordinatorTest {
         * another is triggered.
         */
        @Test
-       public void testMinInterval() {
-               try {
-                       final JobID jid = new JobID();
+       public void testMinTimeBetweenCheckpointsInterval() throws Exception {
+               final JobID jid = new JobID();
 
-                       // create some mock execution vertices and trigger some 
checkpoint
-                       final ExecutionAttemptID attemptID1 = new 
ExecutionAttemptID();
-                       ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1);
+               // create some mock execution vertices and trigger some 
checkpoint
+               final ExecutionAttemptID attemptID = new ExecutionAttemptID();
+               ExecutionVertex vertex = mockExecutionVertex(attemptID);
 
-                       final AtomicInteger numCalls = new AtomicInteger();
+               final BlockingQueue<TriggerCheckpoint> calls = new 
LinkedBlockingQueue<>();
 
-                       doAnswer(new Answer<Void>() {
-                               @Override
-                               public Void answer(InvocationOnMock invocation) 
throws Throwable {
-                                       if (invocation.getArguments()[0] 
instanceof TriggerCheckpoint) {
-                                               numCalls.incrementAndGet();
-                                       }
-                                       return null;
-                               }
-                       
}).when(vertex1).sendMessageToCurrentExecution(any(Serializable.class), 
any(ExecutionAttemptID.class));
+               doAnswer(new Answer<Void>() {
+                       @Override
+                       public Void answer(InvocationOnMock invocation) throws 
Throwable {
+                               calls.add((TriggerCheckpoint) 
invocation.getArguments()[0]);
+                               return null;
+                       }
+               
}).when(vertex).sendMessageToCurrentExecution(isA(TriggerCheckpoint.class), 
eq(attemptID));
 
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               10,             // periodic interval is 10 ms
-                               200000, // timeout is very long (200 s)
-                               500,    // 500ms delay between checkpoints
-                               10,
-                               42,
-                               new ExecutionVertex[] { vertex1 },
-                               new ExecutionVertex[] { vertex1 },
-                               new ExecutionVertex[] { vertex1 },
-                               cl,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(2, cl),
-                               RecoveryMode.STANDALONE,
-                               new DisabledCheckpointStatsTracker(),
-                               TestExecutors.directExecutor());
+               final long delay = 50;
 
+               final CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jid,
+                       2,           // periodic interval is 2 ms
+                       200_000,     // timeout is very long (200 s)
+                       delay,       // 50ms delay between checkpoints
+                       1,
+                       42,
+                       new ExecutionVertex[] { vertex },
+                       new ExecutionVertex[] { vertex },
+                       new ExecutionVertex[] { vertex },
+                       cl,
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(2, cl),
+                       RecoveryMode.STANDALONE,
+                       new DisabledCheckpointStatsTracker(),
+                       TestExecutors.directExecutor());
+
+               try {
                        coord.startCheckpointScheduler();
 
-                       //wait until the first checkpoint was triggered
-                       for (int x=0; x<20; x++) {
-                               Thread.sleep(100);
-                               if (numCalls.get() > 0) {
-                                       break;
-                               }
-                       }
+                       // wait until the first checkpoint was triggered
+                       TriggerCheckpoint call = calls.take();
+                       assertEquals(1L, call.getCheckpointId());
+                       assertEquals(jid, call.getJob());
+                       assertEquals(attemptID, call.getTaskExecutionId());
 
-                       if (numCalls.get() == 0) {
-                               fail("No checkpoint was triggered within the 
first 2000 ms.");
-                       }
-                       
-                       long start = System.currentTimeMillis();
-
-                       for (int x = 0; x < 20; x++) {
-                               Thread.sleep(100);
-                               int triggeredCheckpoints = numCalls.get();
-                               long curT = System.currentTimeMillis();
-
-                               /**
-                                * Within a given time-frame T only T/500 
checkpoints may be triggered due to the configured minimum
-                                * interval between checkpoints. This value 
however does not not take the first triggered checkpoint
-                                * into account (=> +1). Furthermore we have to 
account for the mis-alignment between checkpoints
-                                * being triggered and our time measurement (=> 
+1); for T=1200 a total of 3-4 checkpoints may have been
-                                * triggered depending on whether the end of 
the minimum interval for the first checkpoints ends before
-                                * or after T=200.
-                                */
-                               long maxAllowedCheckpoints = (curT - start) / 
500 + 2;
-                               assertTrue(maxAllowedCheckpoints >= 
triggeredCheckpoints);
-                       }
+                       // tell the coordinator that the checkpoint is done
+                       final long ackTime = System.nanoTime();
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID, 1L));
 
-                       coord.stopCheckpointScheduler();
+                       // wait until the next checkpoint is triggered
+                       TriggerCheckpoint nextCall = calls.take();
+                       final long nextCheckpointTime = System.nanoTime();
 
+                       assertEquals(2L, nextCall.getCheckpointId());
+                       assertEquals(jid, nextCall.getJob());
+                       assertEquals(attemptID, nextCall.getTaskExecutionId());
+
+                       final long delayMillis = (nextCheckpointTime - ackTime) 
/ 1_000_000;
+
+                       // we need to add one ms here to account for rounding 
errors
+                       if (delayMillis + 1 < delay) {
+                               fail("checkpoint came too early: delay was " + 
delayMillis + " but should have been at least " + delay);
+                       }
+               }
+               finally {
+                       coord.stopCheckpointScheduler();
                        coord.shutdown();
                }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }               
        }
 
        @Test
        public void testMaxConcurrentAttempts1() {
-               testMaxConcurrentAttemps(1);
+               testMaxConcurrentAttempts(1);
        }
 
        @Test
        public void testMaxConcurrentAttempts2() {
-               testMaxConcurrentAttemps(2);
+               testMaxConcurrentAttempts(2);
        }
 
        @Test
        public void testMaxConcurrentAttempts5() {
-               testMaxConcurrentAttemps(5);
+               testMaxConcurrentAttempts(5);
        }
        
-       private void testMaxConcurrentAttemps(int maxConcurrentAttempts) {
+       private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
                try {
                        final JobID jid = new JobID();
 

Reply via email to