Repository: flink
Updated Branches:
  refs/heads/master 81114d5f7 -> fcb13e1f5


[FLINK-6404] [FLINK-6400] Ensure PendingCheckpoint is registered when calling 
Checkpoint Hooks


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcb13e1f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcb13e1f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcb13e1f

Branch: refs/heads/master
Commit: fcb13e1f54cc8d634416b41d5fc41518806a1885
Parents: 81114d5
Author: Stephan Ewen <[email protected]>
Authored: Thu Apr 27 22:31:10 2017 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Thu Apr 27 22:36:48 2017 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 24 ++++------
 .../runtime/checkpoint/PendingCheckpoint.java   | 28 +++++++-----
 .../CheckpointCoordinatorMasterHooksTest.java   | 46 ++++++++++++++++++++
 3 files changed, 70 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fcb13e1f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 23a38d4..fb6cc72 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -41,7 +41,6 @@ import 
org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
@@ -540,20 +539,6 @@ public class CheckpointCoordinator {
                                checkpoint.setStatsCallback(callback);
                        }
 
-                       // trigger the master hooks for the checkpoint
-                       try {
-                               List<MasterState> masterStates = 
MasterHooks.triggerMasterHooks(masterHooks.values(),
-                                               checkpointID, timestamp, 
executor, Time.milliseconds(checkpointTimeout));
-
-                               for (MasterState s : masterStates) {
-                                       checkpoint.addMasterState(s);
-                               }
-                       }
-                       catch (FlinkException e) {
-                               checkpoint.abortError(e);
-                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
-                       }
-
                        // schedule the timer that will clean up the expired 
checkpoints
                        final Runnable canceller = new Runnable() {
                                @Override
@@ -628,6 +613,13 @@ public class CheckpointCoordinator {
                                                // checkpoint is already 
disposed!
                                                cancellerHandle.cancel(false);
                                        }
+
+                                       // trigger the master hooks for the 
checkpoint
+                                       final List<MasterState> masterStates = 
MasterHooks.triggerMasterHooks(masterHooks.values(),
+                                                       checkpointID, 
timestamp, executor, Time.milliseconds(checkpointTimeout));
+                                       for (MasterState s : masterStates) {
+                                               checkpoint.addMasterState(s);
+                                       }
                                }
                                // end of lock scope
 
@@ -656,7 +648,7 @@ public class CheckpointCoordinator {
                                LOG.warn("Failed to trigger checkpoint (" + 
numUnsuccessful + " consecutive failed attempts so far)", t);
 
                                if (!checkpoint.isDiscarded()) {
-                                       checkpoint.abortError(new 
Exception("Failed to trigger checkpoint"));
+                                       checkpoint.abortError(new 
Exception("Failed to trigger checkpoint", t));
                                }
                                return new 
CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/fcb13e1f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index ce97edc..cc3dce2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -351,17 +351,6 @@ public class PendingCheckpoint {
        }
 
        /**
-        * Adds a master state (state generated on the checkpoint coordinator) 
to
-        * the pending checkpoint.
-        * 
-        * @param state The state to add
-        */
-       public void addMasterState(MasterState state) {
-               checkNotNull(state);
-               masterState.add(state);
-       }
-
-       /**
         * Acknowledges the task with the given execution attempt id and the 
given subtask state.
         *
         * @param executionAttemptId of the acknowledged task
@@ -453,7 +442,22 @@ public class PendingCheckpoint {
                }
        }
 
-       
+       /**
+        * Adds a master state (state generated on the checkpoint coordinator) 
to
+        * the pending checkpoint.
+        *
+        * @param state The state to add
+        */
+       public void addMasterState(MasterState state) {
+               checkNotNull(state);
+
+               synchronized (lock) {
+                       if (!discarded) {
+                               masterState.add(state);
+                       }
+               }
+       }
+
 
        // 
------------------------------------------------------------------------
        //  Cancellation

http://git-wip-us.apache.org/repos/asf/flink/blob/fcb13e1f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 0ec4606..7c271a7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -31,6 +32,9 @@ import 
org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 
 import org.junit.Test;
 
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -46,6 +50,7 @@ import static 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.mock
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -316,6 +321,47 @@ public class CheckpointCoordinatorMasterHooksTest {
        //  failure scenarios
        // 
------------------------------------------------------------------------
 
+       /**
+        * This test makes sure that the checkpoint is already registered by 
the time
+        * that the hooks are called
+        */
+       @Test
+       public void ensureRegisteredAtHookTime() throws Exception {
+               final String id = "id";
+
+               // create the checkpoint coordinator
+               final JobID jid = new JobID();
+               final ExecutionAttemptID execId = new ExecutionAttemptID();
+               final ExecutionVertex ackVertex = mockExecutionVertex(execId);
+               final CheckpointCoordinator cc = 
instantiateCheckpointCoordinator(jid, ackVertex);
+
+               final MasterTriggerRestoreHook<Void> hook = 
mockGeneric(MasterTriggerRestoreHook.class);
+               when(hook.getIdentifier()).thenReturn(id);
+               when(hook.triggerCheckpoint(anyLong(), anyLong(), 
any(Executor.class))).thenAnswer(
+                               new Answer<Future<Void>>() {
+
+                                       @Override
+                                       public Future<Void> 
answer(InvocationOnMock invocation) throws Throwable {
+                                               assertEquals(1, 
cc.getNumberOfPendingCheckpoints());
+
+                                               long checkpointId = (Long) 
invocation.getArguments()[0];
+                                               
assertNotNull(cc.getPendingCheckpoints().get(checkpointId));
+                                               return null;
+                                       }
+                               }
+               );
+
+               cc.addMasterHook(hook);
+
+               // trigger a checkpoint
+               assertTrue(cc.triggerCheckpoint(System.currentTimeMillis(), 
false));
+       }
+
+
+       // 
------------------------------------------------------------------------
+       //  failure scenarios
+       // 
------------------------------------------------------------------------
+
        @Test
        public void testSerializationFailsOnTrigger() {
        }

Reply via email to