Repository: flink Updated Branches: refs/heads/master 81114d5f7 -> fcb13e1f5
[FLINK-6404] [FLINK-6400] Ensure PendingCheckpoint is registered when calling Checkpoint Hooks Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcb13e1f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcb13e1f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcb13e1f Branch: refs/heads/master Commit: fcb13e1f54cc8d634416b41d5fc41518806a1885 Parents: 81114d5 Author: Stephan Ewen <[email protected]> Authored: Thu Apr 27 22:31:10 2017 +0200 Committer: Stephan Ewen <[email protected]> Committed: Thu Apr 27 22:36:48 2017 +0200 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 24 ++++------ .../runtime/checkpoint/PendingCheckpoint.java | 28 +++++++----- .../CheckpointCoordinatorMasterHooksTest.java | 46 ++++++++++++++++++++ 3 files changed, 70 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fcb13e1f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 23a38d4..fb6cc72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -41,7 +41,6 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; -import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -540,20 +539,6 @@ public class CheckpointCoordinator { checkpoint.setStatsCallback(callback); } - // trigger the master hooks for the checkpoint - try { - List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(), - checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout)); - - for (MasterState s : masterStates) { - checkpoint.addMasterState(s); - } - } - catch (FlinkException e) { - checkpoint.abortError(e); - return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); - } - // schedule the timer that will clean up the expired checkpoints final Runnable canceller = new Runnable() { @Override @@ -628,6 +613,13 @@ public class CheckpointCoordinator { // checkpoint is already disposed! cancellerHandle.cancel(false); } + + // trigger the master hooks for the checkpoint + final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(), + checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout)); + for (MasterState s : masterStates) { + checkpoint.addMasterState(s); + } } // end of lock scope @@ -656,7 +648,7 @@ public class CheckpointCoordinator { LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); if (!checkpoint.isDiscarded()) { - checkpoint.abortError(new Exception("Failed to trigger checkpoint")); + checkpoint.abortError(new Exception("Failed to trigger checkpoint", t)); } return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } http://git-wip-us.apache.org/repos/asf/flink/blob/fcb13e1f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index ce97edc..cc3dce2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -351,17 +351,6 @@ public class PendingCheckpoint { } /** - * Adds a master state (state generated on the checkpoint coordinator) to - * the pending checkpoint. - * - * @param state The state to add - */ - public void addMasterState(MasterState state) { - checkNotNull(state); - masterState.add(state); - } - - /** * Acknowledges the task with the given execution attempt id and the given subtask state. * * @param executionAttemptId of the acknowledged task @@ -453,7 +442,22 @@ public class PendingCheckpoint { } } - + /** + * Adds a master state (state generated on the checkpoint coordinator) to + * the pending checkpoint. + * + * @param state The state to add + */ + public void addMasterState(MasterState state) { + checkNotNull(state); + + synchronized (lock) { + if (!discarded) { + masterState.add(state); + } + } + } + // ------------------------------------------------------------------------ // Cancellation http://git-wip-us.apache.org/repos/asf/flink/blob/fcb13e1f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java index 0ec4606..7c271a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; @@ -31,6 +32,9 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -46,6 +50,7 @@ import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.mock import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -316,6 +321,47 @@ public class CheckpointCoordinatorMasterHooksTest { // failure scenarios // ------------------------------------------------------------------------ + /** + * This test makes sure that the checkpoint is already registered by the time + * that the hooks are called + */ + @Test + public void ensureRegisteredAtHookTime() throws Exception { + final String id = "id"; + + // create the checkpoint coordinator + final JobID jid = new JobID(); + final ExecutionAttemptID execId = new ExecutionAttemptID(); + final ExecutionVertex ackVertex = mockExecutionVertex(execId); + final CheckpointCoordinator cc = instantiateCheckpointCoordinator(jid, ackVertex); + + final MasterTriggerRestoreHook<Void> hook = mockGeneric(MasterTriggerRestoreHook.class); + when(hook.getIdentifier()).thenReturn(id); + when(hook.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class))).thenAnswer( + new Answer<Future<Void>>() { + + @Override + public Future<Void> answer(InvocationOnMock invocation) throws Throwable { + assertEquals(1, cc.getNumberOfPendingCheckpoints()); + + long checkpointId = (Long) invocation.getArguments()[0]; + assertNotNull(cc.getPendingCheckpoints().get(checkpointId)); + return null; + } + } + ); + + cc.addMasterHook(hook); + + // trigger a checkpoint + assertTrue(cc.triggerCheckpoint(System.currentTimeMillis(), false)); + } + + + // ------------------------------------------------------------------------ + // failure scenarios + // ------------------------------------------------------------------------ + @Test public void testSerializationFailsOnTrigger() { }
