[FLINK-4322] [checkpointing] Add and fix tests for unified Checkpoint/Savepoint Coordinator
This closes #2366 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47acdead Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47acdead Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47acdead Branch: refs/heads/master Commit: 47acdeadf3ccd326578306453dd10ed3c147a4e6 Parents: 76ca1a7 Author: Ufuk Celebi <[email protected]> Authored: Thu Aug 11 19:40:07 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Aug 17 19:18:07 2016 +0200 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 138 +-- .../checkpoint/CheckpointTriggerResult.java | 17 +- .../runtime/checkpoint/PendingSavepoint.java | 11 +- .../StandaloneCheckpointIDCounter.java | 9 + .../StandaloneCompletedCheckpointStore.java | 2 +- .../checkpoint/savepoint/SavepointLoader.java | 4 +- .../checkpoint/savepoint/SavepointV0.java | 7 +- .../runtime/executiongraph/ExecutionGraph.java | 6 - .../flink/runtime/jobmanager/JobManager.scala | 20 +- .../checkpoint/CheckpointCoordinatorTest.java | 598 +++++++--- .../checkpoint/CheckpointStateRestoreTest.java | 80 +- .../CompletedCheckpointStoreTest.java | 2 +- .../checkpoint/CompletedCheckpointTest.java | 58 + ...ExecutionGraphCheckpointCoordinatorTest.java | 33 +- .../checkpoint/PendingCheckpointTest.java | 134 +++ .../checkpoint/PendingSavepointTest.java | 141 +++ .../savepoint/HeapSavepointStoreTest.java | 25 - .../savepoint/SavepointCoordinatorTest.java | 1119 ------------------ .../savepoint/SavepointLoaderTest.java | 110 ++ .../stats/SimpleCheckpointStatsTrackerTest.java | 5 +- .../runtime/jobmanager/JobManagerITCase.scala | 32 +- .../test/checkpointing/SavepointITCase.java | 297 +---- 22 files changed, 1033 insertions(+), 1815 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 6d44e61..2a1ece0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.dispatch.Futures; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; @@ -33,7 +34,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; @@ -45,9 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; -import scala.concurrent.Promise; -import javax.annotation.Nullable; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; @@ -69,10 +67,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * It triggers the checkpoint by sending the messages to the relevant tasks and collects the * checkpoint acknowledgements. It also collects and maintains the overview of the state handles * reported by the tasks that acknowledge the checkpoint. - * - * <p>Depending on the configured {@link RecoveryMode}, the behaviour of the {@link - * CompletedCheckpointStore} and {@link CheckpointIDCounter} change. The default standalone - * implementations don't support any recovery. */ public class CheckpointCoordinator { @@ -156,9 +150,6 @@ public class CheckpointCoordinator { /** Flag marking the coordinator as shut down (not accepting any messages any more) */ private volatile boolean shutdown; - /** Shutdown hook thread to clean up state handles. */ - private final Thread shutdownHook; - /** Helper for tracking checkpoint statistics */ private final CheckpointStatsTracker statsTracker; @@ -180,7 +171,6 @@ public class CheckpointCoordinator { CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, SavepointStore savepointStore, - RecoveryMode recoveryMode, CheckpointStatsTracker statsTracker) throws Exception { // sanity checks @@ -188,7 +178,6 @@ public class CheckpointCoordinator { checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero"); checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0"); checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1"); - checkArgument(numberKeyGroups >= 1, "numberKeyGroups must be >= 1"); this.job = checkNotNull(job); this.baseInterval = baseInterval; @@ -198,52 +187,21 @@ public class CheckpointCoordinator { this.tasksToTrigger = checkNotNull(tasksToTrigger); this.tasksToWaitFor = checkNotNull(tasksToWaitFor); this.tasksToCommitTo = checkNotNull(tasksToCommitTo); - this.pendingCheckpoints = new LinkedHashMap<Long, PendingCheckpoint>(); + this.pendingCheckpoints = new LinkedHashMap<>(); this.checkpointIdCounter = checkNotNull(checkpointIDCounter); this.completedCheckpointStore = checkNotNull(completedCheckpointStore); this.savepointStore = checkNotNull(savepointStore); - this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS); + this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS); this.userClassLoader = checkNotNull(userClassLoader); this.statsTracker = checkNotNull(statsTracker); this.numberKeyGroups = numberKeyGroups; this.timer = new Timer("Checkpoint Timer", true); - if (recoveryMode == RecoveryMode.STANDALONE) { - // Add shutdown hook to clean up state handles when no checkpoint recovery is - // possible. In case of another configured recovery mode, the checkpoints need to be - // available for the standby job managers. - this.shutdownHook = new Thread(new Runnable() { - @Override - public void run() { - try { - CheckpointCoordinator.this.shutdown(); - } - catch (Throwable t) { - LOG.error("Error during shutdown of checkpoint coordinator via " + - "JVM shutdown hook: " + t.getMessage(), t); - } - } - }); - - try { - // Add JVM shutdown hook to call shutdown of service - Runtime.getRuntime().addShutdownHook(shutdownHook); - } - catch (IllegalStateException ignored) { - // JVM is already shutting down. No need to do anything. - } - catch (Throwable t) { - LOG.error("Cannot register checkpoint coordinator shutdown hook.", t); - } - } - else { - this.shutdownHook = null; - } - - // make sure the checkpoint ID enumerator is running try { - checkpointIdCounter.start(); + // Make sure the checkpoint ID enumerator is running. Possibly + // issues a blocking call to ZooKeeper. + checkpointIDCounter.start(); } catch (Exception e) { throw new Exception("Failed to start checkpoint ID counter: " + e.getMessage(), e); } @@ -285,50 +243,34 @@ public class CheckpointCoordinator { */ private void shutdown(boolean shutdownStoreAndCounter) throws Exception { synchronized (lock) { - try { - if (!shutdown) { - shutdown = true; - LOG.info("Stopping checkpoint coordinator for job " + job); + if (!shutdown) { + shutdown = true; + LOG.info("Stopping checkpoint coordinator for job " + job); - periodicScheduling = false; - triggerRequestQueued = false; + periodicScheduling = false; + triggerRequestQueued = false; - // shut down the thread that handles the timeouts and pending triggers - timer.cancel(); + // shut down the thread that handles the timeouts and pending triggers + timer.cancel(); - // make sure that the actor does not linger - if (jobStatusListener != null) { - jobStatusListener.tell(PoisonPill.getInstance()); - jobStatusListener = null; - } + // make sure that the actor does not linger + if (jobStatusListener != null) { + jobStatusListener.tell(PoisonPill.getInstance()); + jobStatusListener = null; + } - // clear and discard all pending checkpoints - for (PendingCheckpoint pending : pendingCheckpoints.values()) { - pending.abortError(new Exception("Checkpoint Coordinator is shutting down")); - } - pendingCheckpoints.clear(); - - if (shutdownStoreAndCounter) { - completedCheckpointStore.shutdown(); - checkpointIdCounter.shutdown(); - } else { - completedCheckpointStore.suspend(); - checkpointIdCounter.suspend(); - } + // clear and discard all pending checkpoints + for (PendingCheckpoint pending : pendingCheckpoints.values()) { + pending.abortError(new Exception("Checkpoint Coordinator is shutting down")); } - } finally { - // Remove shutdown hook to prevent resource leaks, unless this is invoked by the - // shutdown hook itself. - if (shutdownHook != null && shutdownHook != Thread.currentThread()) { - try { - Runtime.getRuntime().removeShutdownHook(shutdownHook); - } - catch (IllegalStateException ignored) { - // race, JVM is in shutdown already, we can safely ignore this - } - catch (Throwable t) { - LOG.warn("Error unregistering checkpoint coordinator shutdown hook.", t); - } + pendingCheckpoints.clear(); + + if (shutdownStoreAndCounter) { + completedCheckpointStore.shutdown(); + checkpointIdCounter.shutdown(); + } else { + completedCheckpointStore.suspend(); + checkpointIdCounter.suspend(); } } } @@ -350,10 +292,7 @@ public class CheckpointCoordinator { return savepoint.getCompletionFuture(); } else { - final Promise<String> promise = new scala.concurrent.impl.Promise.DefaultPromise<>(); - promise.failure( - new Exception("Failed to trigger savepoint: " + result.getFailureReason().message())); - return promise.future(); + return Futures.failed(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message())); } } @@ -586,9 +525,8 @@ public class CheckpointCoordinator { rememberRecentCheckpointId(checkpointId); boolean haveMoreRecentPending = false; - Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator(); - while (entries.hasNext()) { - PendingCheckpoint p = entries.next().getValue(); + + for (PendingCheckpoint p : pendingCheckpoints.values()) { if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) { haveMoreRecentPending = true; break; @@ -746,7 +684,7 @@ public class CheckpointCoordinator { while (entries.hasNext()) { PendingCheckpoint p = entries.next().getValue(); - if (p.getCheckpointTimestamp() < timestamp && p.canBeSubsumed()) { + if (p.getCheckpointTimestamp() <= timestamp && p.canBeSubsumed()) { rememberRecentCheckpointId(p.getCheckpointId()); p.abortSubsumed(); entries.remove(); @@ -895,7 +833,7 @@ public class CheckpointCoordinator { public Map<Long, PendingCheckpoint> getPendingCheckpoints() { synchronized (lock) { - return new HashMap<Long, PendingCheckpoint>(this.pendingCheckpoints); + return new HashMap<>(this.pendingCheckpoints); } } @@ -913,14 +851,6 @@ public class CheckpointCoordinator { return checkpointIdCounter; } - protected ActorGateway getJobStatusListener() { - return jobStatusListener; - } - - protected void setJobStatusListener(ActorGateway jobStatusListener) { - this.jobStatusListener = jobStatusListener; - } - // -------------------------------------------------------------------------------------------- // Periodic scheduling of checkpoints // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java index 3f91407..65dc73f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java @@ -21,8 +21,8 @@ package org.apache.flink.runtime.checkpoint; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The result of triggering a checkpoint. May be a declined checkpoint trigger attempt, - * or a pending checkpoint. + * The result of triggering a checkpoint. May either be a declined checkpoint + * trigger attempt, or a pending checkpoint. */ class CheckpointTriggerResult { @@ -68,7 +68,7 @@ class CheckpointTriggerResult { if (success != null) { return success; } else { - throw new IllegalStateException(); + throw new IllegalStateException("Checkpoint triggering failed"); } } @@ -76,7 +76,7 @@ class CheckpointTriggerResult { if (failure != null) { return failure; } else { - throw new IllegalStateException(); + throw new IllegalStateException("Checkpoint triggering was successful"); } } @@ -84,8 +84,9 @@ class CheckpointTriggerResult { @Override public String toString() { - return isSuccess() ? - ("success: " + success) : - ("failure: " + failure.message()); + return "CheckpointTriggerResult(" + + (isSuccess() ? + ("success: " + success) : + ("failure: " + failure.message())) + ")"; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java index 92cdd04..460ff8e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java @@ -25,9 +25,7 @@ import org.apache.flink.runtime.checkpoint.savepoint.SavepointV0; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.util.ExceptionUtils; - import org.slf4j.Logger; - import scala.concurrent.Future; import scala.concurrent.Promise; @@ -107,7 +105,14 @@ public class PendingSavepoint extends PendingCheckpoint { @Override public void abortSubsumed() throws Exception { - throw new Exception("Bug: Savepoints must never be subsumed"); + try { + Exception e = new Exception("Bug: Savepoints must never be subsumed"); + onCompletionPromise.failure(e); + throw e; + } + finally { + dispose(true); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java index c0ea93d..0a235bc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java @@ -50,4 +50,13 @@ public class StandaloneCheckpointIDCounter implements CheckpointIDCounter { public void setCount(long newCount) { checkpointIdCounter.set(newCount); } + + /** + * Returns the last checkpoint ID (current - 10. + * + * @return Last checkpoint ID. + */ + public long getLast() { + return checkpointIdCounter.get() - 1; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index 0f6cf33..bc111cd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -30,7 +30,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#STANDALONE}. */ -class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore { +public class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore { /** The maximum number of checkpoints to retain (at least 1). */ private final int maxNumberOfCheckpointsToRetain; http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java index 0b7b0c2..1be7a58 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java @@ -27,8 +27,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import java.util.HashMap; import java.util.Map; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** * The SavepointLoader is a utility to load and verify a Savepoint, and to create a checkpoint from it. */ @@ -54,7 +52,7 @@ public class SavepointLoader { String savepointPath) throws Exception { // (1) load the savepoint - Savepoint savepoint = savepointStore.loadSavepoint(checkNotNull(savepointPath)); + Savepoint savepoint = savepointStore.loadSavepoint(savepointPath); final Map<JobVertexID, TaskState> taskStates = new HashMap<>(savepoint.getTaskStates().size()); // (2) validate it (parallelism, etc) http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java index 9fd950d..d60d80e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint.savepoint; import org.apache.flink.runtime.checkpoint.TaskState; import org.apache.flink.util.Preconditions; +import java.util.ArrayList; import java.util.Collection; /** @@ -37,11 +38,11 @@ public class SavepointV0 implements Savepoint { private final long checkpointId; /** The task states */ - private final Collection<TaskState> taskStates; + private final Collection<TaskState> taskStates = new ArrayList(); - SavepointV0(long checkpointId, Collection<TaskState> taskStates) { + public SavepointV0(long checkpointId, Collection<TaskState> taskStates) { this.checkpointId = checkpointId; - this.taskStates = Preconditions.checkNotNull(taskStates, "Task States"); + this.taskStates.addAll(taskStates); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 82826dd..12d8e66 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.executiongraph; import akka.actor.ActorSystem; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; @@ -49,7 +48,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; -import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.ExecutionGraphMessages; @@ -59,10 +57,8 @@ import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.concurrent.ExecutionContext; import scala.concurrent.duration.FiniteDuration; @@ -353,7 +349,6 @@ public class ExecutionGraph { UUID leaderSessionID, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, - RecoveryMode recoveryMode, SavepointStore savepointStore, CheckpointStatsTracker statsTracker) throws Exception { @@ -389,7 +384,6 @@ public class ExecutionGraph { checkpointIDCounter, checkpointStore, savepointStore, - recoveryMode, checkpointStatsTracker); // the periodic checkpoint scheduler is activated and deactivated as a result of http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 9fb01bf..a82e89a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1093,10 +1093,10 @@ class JobManager( Option(jobGraph.getSerializedExecutionConfig() .deserializeValue(userCodeLoader) .getRestartStrategy()) - .map(RestartStrategyFactory.createRestartStrategy(_)) match { - case Some(strategy) => strategy - case None => restartStrategyFactory.createRestartStrategy() - } + .map(RestartStrategyFactory.createRestartStrategy(_)) match { + case Some(strategy) => strategy + case None => restartStrategyFactory.createRestartStrategy() + } log.info(s"Using restart strategy $restartStrategy for $jobId.") @@ -1253,7 +1253,6 @@ class JobManager( leaderSessionID.orNull, checkpointIdCounter, completedCheckpoints, - recoveryMode, savepointStore, checkpointStatsTracker) } @@ -1294,7 +1293,6 @@ class JobManager( // because it is a blocking operation future { try { - if (isRecovery) { // this is a recovery of a master failure (this master takes over) executionGraph.restoreLatestCheckpointedState() @@ -1305,7 +1303,7 @@ class JobManager( val snapshotSettings = jobGraph.getSnapshotSettings if (snapshotSettings != null) { val savepointPath = snapshotSettings.getSavepointPath() - + if (savepointPath != null) { // got a savepoint try { @@ -1316,14 +1314,14 @@ class JobManager( jobId, executionGraph.getAllVertices, savepointStore, savepointPath) executionGraph.getCheckpointCoordinator.getCheckpointStore - .addCheckpoint(savepoint) - + .addCheckpoint(savepoint) + // Reset the checkpoint ID counter val nextCheckpointId: Long = savepoint.getCheckpointID + 1 log.info(s"Reset the checkpoint ID to $nextCheckpointId") executionGraph.getCheckpointCoordinator.getCheckpointIdCounter - .setCount(nextCheckpointId) - + .setCount(nextCheckpointId) + executionGraph.restoreLatestCheckpointedState() } catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 62af42b..3341095 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -19,13 +19,13 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore; import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; @@ -33,6 +33,7 @@ import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import scala.concurrent.Future; import java.io.Serializable; import java.util.Iterator; @@ -80,17 +81,19 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 42, - new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, - new ExecutionVertex[] { ackVertex1, ackVertex2 }, - new ExecutionVertex[] {}, - cl, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - RecoveryMode.STANDALONE); + jid, + 600000, + 600000, + 0, Integer.MAX_VALUE, + 42, + new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, + new ExecutionVertex[] { ackVertex1, ackVertex2 }, + new ExecutionVertex[] {}, + cl, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -131,17 +134,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 42, - new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, - new ExecutionVertex[] { ackVertex1, ackVertex2 }, - new ExecutionVertex[] {}, - cl, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - RecoveryMode.STANDALONE); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + 42, + new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, + new ExecutionVertex[] { ackVertex1, ackVertex2 }, + new ExecutionVertex[] {}, + cl, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -180,17 +186,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 42, - new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, - new ExecutionVertex[] { ackVertex1, ackVertex2 }, - new ExecutionVertex[] {}, - cl, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - RecoveryMode.STANDALONE); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + 42, + new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, + new ExecutionVertex[] { ackVertex1, ackVertex2 }, + new ExecutionVertex[] {}, + cl, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -230,17 +239,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 42, - new ExecutionVertex[] { vertex1, vertex2 }, - new ExecutionVertex[] { vertex1, vertex2 }, - new ExecutionVertex[] { vertex1, vertex2 }, - cl, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - RecoveryMode.STANDALONE); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + 42, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + cl, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -353,17 +365,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 42, - new ExecutionVertex[] { vertex1, vertex2 }, - new ExecutionVertex[] { vertex1, vertex2 }, - new ExecutionVertex[] { vertex1, vertex2 }, - cl, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - RecoveryMode.STANDALONE); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + 42, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + cl, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -472,17 +487,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 42, - new ExecutionVertex[] { vertex1, vertex2 }, - new ExecutionVertex[] { vertex1, vertex2 }, - new ExecutionVertex[] { vertex1, vertex2 }, - cl, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - RecoveryMode.STANDALONE); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + 42, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + cl, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -621,17 +639,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 42, - new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, - new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 }, - new ExecutionVertex[] { commitVertex }, - cl, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2, cl), - RecoveryMode.STANDALONE); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + 42, + new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, + new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 }, + new ExecutionVertex[] { commitVertex }, + cl, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -755,17 +776,20 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 42, - new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, - new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 }, - new ExecutionVertex[] { commitVertex }, - cl, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(10, cl), - RecoveryMode.STANDALONE); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + 42, + new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, + new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 }, + new ExecutionVertex[] { commitVertex }, + cl, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(10, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -875,17 +899,20 @@ public class CheckpointCoordinatorTest { // the timeout for the checkpoint is a 200 milliseconds CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 200, - 42, - new ExecutionVertex[] { triggerVertex }, - new ExecutionVertex[] { ackVertex1, ackVertex2 }, - new ExecutionVertex[] { commitVertex }, - cl, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2, cl), - RecoveryMode.STANDALONE); + jid, + 600000, + 200, + 0, + Integer.MAX_VALUE, + 42, + new ExecutionVertex[] { triggerVertex }, + new ExecutionVertex[] { ackVertex1, ackVertex2 }, + new ExecutionVertex[] { commitVertex }, + cl, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); // trigger a checkpoint, partially acknowledged assertTrue(coord.triggerCheckpoint(timestamp)); @@ -942,17 +969,20 @@ public class CheckpointCoordinatorTest { ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID); CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 200000, - 200000, - 42, - new ExecutionVertex[] { triggerVertex }, - new ExecutionVertex[] { ackVertex1, ackVertex2 }, - new ExecutionVertex[] { commitVertex }, - cl, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2, cl), - RecoveryMode.STANDALONE); + jid, + 200000, + 200000, + 0, + Integer.MAX_VALUE, + 42, + new ExecutionVertex[] { triggerVertex }, + new ExecutionVertex[] { ackVertex1, ackVertex2 }, + new ExecutionVertex[] { commitVertex }, + cl, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); assertTrue(coord.triggerCheckpoint(timestamp)); @@ -1020,17 +1050,20 @@ public class CheckpointCoordinatorTest { }).when(triggerVertex).sendMessageToCurrentExecution(any(Serializable.class), any(ExecutionAttemptID.class)); CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 10, // periodic interval is 10 ms - 200000, // timeout is very long (200 s) - 42, - new ExecutionVertex[] { triggerVertex }, - new ExecutionVertex[] { ackVertex }, - new ExecutionVertex[] { commitVertex }, - cl, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2, cl), - RecoveryMode.STANDALONE); + jid, + 10, // periodic interval is 10 ms + 200000, // timeout is very long (200 s) + 0, + Integer.MAX_VALUE, + 42, + new ExecutionVertex[] { triggerVertex }, + new ExecutionVertex[] { ackVertex }, + new ExecutionVertex[] { commitVertex }, + cl, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); coord.startCheckpointScheduler(); @@ -1110,20 +1143,20 @@ public class CheckpointCoordinatorTest { }).when(vertex1).sendMessageToCurrentExecution(any(Serializable.class), any(ExecutionAttemptID.class)); CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 10, // periodic interval is 10 ms - 200000, // timeout is very long (200 s) - 500, // 500ms delay between checkpoints - 10, - 42, - new ExecutionVertex[] { vertex1 }, - new ExecutionVertex[] { vertex1 }, - new ExecutionVertex[] { vertex1 }, - cl, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2, cl), - RecoveryMode.STANDALONE, - new DisabledCheckpointStatsTracker()); + jid, + 10, // periodic interval is 10 ms + 200000, // timeout is very long (200 s) + 500, // 500ms delay between checkpoints + 10, + 42, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + cl, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); coord.startCheckpointScheduler(); @@ -1170,20 +1203,236 @@ public class CheckpointCoordinatorTest { @Test public void testMaxConcurrentAttempts1() { - testMaxConcurrentAttemps(1); + testMaxConcurrentAttempts(1); } @Test public void testMaxConcurrentAttempts2() { - testMaxConcurrentAttemps(2); + testMaxConcurrentAttempts(2); } @Test public void testMaxConcurrentAttempts5() { - testMaxConcurrentAttemps(5); + testMaxConcurrentAttempts(5); } - private void testMaxConcurrentAttemps(int maxConcurrentAttempts) { + @Test + public void testTriggerAndConfirmSimpleSavepoint() throws Exception { + final JobID jid = new JobID(); + final long timestamp = System.currentTimeMillis(); + + // create some mock Execution vertices that receive the checkpoint trigger messages + final ExecutionAttemptID attemptID1 = new ExecutionAttemptID(); + final ExecutionAttemptID attemptID2 = new ExecutionAttemptID(); + ExecutionVertex vertex1 = mockExecutionVertex(attemptID1); + ExecutionVertex vertex2 = mockExecutionVertex(attemptID2); + + // set up the coordinator and validate the initial state + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + 42, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + cl, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); + + assertEquals(0, coord.getNumberOfPendingCheckpoints()); + assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + + // trigger the first checkpoint. this should succeed + Future<String> savepointFuture = coord.triggerSavepoint(timestamp); + assertFalse(savepointFuture.isCompleted()); + + // validate that we have a pending savepoint + assertEquals(1, coord.getNumberOfPendingCheckpoints()); + + long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); + PendingCheckpoint pending = coord.getPendingCheckpoints().get(checkpointId); + + assertNotNull(pending); + assertEquals(checkpointId, pending.getCheckpointId()); + assertEquals(timestamp, pending.getCheckpointTimestamp()); + assertEquals(jid, pending.getJobId()); + assertEquals(2, pending.getNumberOfNonAcknowledgedTasks()); + assertEquals(0, pending.getNumberOfAcknowledgedTasks()); + assertEquals(0, pending.getTaskStates().size()); + assertFalse(pending.isDiscarded()); + assertFalse(pending.isFullyAcknowledged()); + assertFalse(pending.canBeSubsumed()); + assertTrue(pending instanceof PendingSavepoint); + + + // acknowledge from one of the tasks + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId)); + assertEquals(1, pending.getNumberOfAcknowledgedTasks()); + assertEquals(1, pending.getNumberOfNonAcknowledgedTasks()); + assertFalse(pending.isDiscarded()); + assertFalse(pending.isFullyAcknowledged()); + assertFalse(savepointFuture.isCompleted()); + + // acknowledge the same task again (should not matter) + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId)); + assertFalse(pending.isDiscarded()); + assertFalse(pending.isFullyAcknowledged()); + assertFalse(savepointFuture.isCompleted()); + + // acknowledge the other task. + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId)); + + // the checkpoint is internally converted to a successful checkpoint and the + // pending checkpoint object is disposed + assertTrue(pending.isDiscarded()); + assertTrue(savepointFuture.isCompleted()); + + // the now we should have a completed checkpoint + assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(0, coord.getNumberOfPendingCheckpoints()); + + // validate that the relevant tasks got a confirmation message + { + NotifyCheckpointComplete confirmMessage1 = new NotifyCheckpointComplete(jid, attemptID1, checkpointId, timestamp); + NotifyCheckpointComplete confirmMessage2 = new NotifyCheckpointComplete(jid, attemptID2, checkpointId, timestamp); + verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1)); + verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2)); + } + + CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0); + assertEquals(jid, success.getJobId()); + assertEquals(timestamp, success.getTimestamp()); + assertEquals(pending.getCheckpointId(), success.getCheckpointID()); + assertTrue(success.getTaskStates().isEmpty()); + + // --------------- + // trigger another checkpoint and see that this one replaces the other checkpoint + // --------------- + final long timestampNew = timestamp + 7; + savepointFuture = coord.triggerSavepoint(timestampNew); + assertFalse(savepointFuture.isCompleted()); + + long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew)); + + assertEquals(0, coord.getNumberOfPendingCheckpoints()); + assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); + + CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0); + assertEquals(jid, successNew.getJobId()); + assertEquals(timestampNew, successNew.getTimestamp()); + assertEquals(checkpointIdNew, successNew.getCheckpointID()); + assertTrue(successNew.getTaskStates().isEmpty()); + assertTrue(savepointFuture.isCompleted()); + + // validate that the relevant tasks got a confirmation message + { + TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointIdNew, timestampNew); + TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointIdNew, timestampNew); + verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1)); + verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2)); + + NotifyCheckpointComplete confirmMessage1 = new NotifyCheckpointComplete(jid, attemptID1, checkpointIdNew, timestampNew); + NotifyCheckpointComplete confirmMessage2 = new NotifyCheckpointComplete(jid, attemptID2, checkpointIdNew, timestampNew); + verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1)); + verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2)); + } + + coord.shutdown(); + } + + /** + * Triggers a savepoint and two checkpoints. The second checkpoint completes + * and subsumes the first checkpoint, but not the first savepoint. Then we + * trigger another checkpoint and savepoint. The 2nd savepoint completes and + * subsumes the last checkpoint, but not the first savepoint. + */ + @Test + public void testSavepointsAreNotSubsumed() throws Exception { + final JobID jid = new JobID(); + final long timestamp = System.currentTimeMillis(); + + // create some mock Execution vertices that receive the checkpoint trigger messages + final ExecutionAttemptID attemptID1 = new ExecutionAttemptID(); + final ExecutionAttemptID attemptID2 = new ExecutionAttemptID(); + ExecutionVertex vertex1 = mockExecutionVertex(attemptID1); + ExecutionVertex vertex2 = mockExecutionVertex(attemptID2); + + StandaloneCheckpointIDCounter counter = new StandaloneCheckpointIDCounter(); + + // set up the coordinator and validate the initial state + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + 42, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + cl, + counter, + new StandaloneCompletedCheckpointStore(10, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); + + // Trigger savepoint and checkpoint + Future<String> savepointFuture1 = coord.triggerSavepoint(timestamp); + long savepointId1 = counter.getLast(); + assertEquals(1, coord.getNumberOfPendingCheckpoints()); + + assertTrue(coord.triggerCheckpoint(timestamp + 1)); + assertEquals(2, coord.getNumberOfPendingCheckpoints()); + + assertTrue(coord.triggerCheckpoint(timestamp + 2)); + long checkpointId2 = counter.getLast(); + assertEquals(3, coord.getNumberOfPendingCheckpoints()); + + // 2nd checkpoint should subsume the 1st checkpoint, but not the savepoint + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2)); + + assertEquals(1, coord.getNumberOfPendingCheckpoints()); + assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); + + assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded()); + assertFalse(savepointFuture1.isCompleted()); + + assertTrue(coord.triggerCheckpoint(timestamp + 3)); + assertEquals(2, coord.getNumberOfPendingCheckpoints()); + + Future<String> savepointFuture2 = coord.triggerSavepoint(timestamp); + long savepointId2 = counter.getLast(); + assertEquals(3, coord.getNumberOfPendingCheckpoints()); + + // 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId2)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId2)); + + assertEquals(1, coord.getNumberOfPendingCheckpoints()); + assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded()); + + assertFalse(savepointFuture1.isCompleted()); + assertTrue(savepointFuture2.isCompleted()); + + // Ack first savepoint + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId1)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId1)); + + assertEquals(0, coord.getNumberOfPendingCheckpoints()); + assertEquals(3, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertTrue(savepointFuture1.isCompleted()); + } + + private void testMaxConcurrentAttempts(int maxConcurrentAttempts) { try { final JobID jid = new JobID(); @@ -1207,17 +1456,18 @@ public class CheckpointCoordinatorTest { }).when(triggerVertex).sendMessageToCurrentExecution(any(Serializable.class), any(ExecutionAttemptID.class)); CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 10, // periodic interval is 10 ms - 200000, // timeout is very long (200 s) - 0L, // no extra delay - maxConcurrentAttempts, - 42, - new ExecutionVertex[] { triggerVertex }, - new ExecutionVertex[] { ackVertex }, - new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter - (), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE, - new DisabledCheckpointStatsTracker()); + jid, + 10, // periodic interval is 10 ms + 200000, // timeout is very long (200 s) + 0L, // no extra delay + maxConcurrentAttempts, + 42, + new ExecutionVertex[] { triggerVertex }, + new ExecutionVertex[] { ackVertex }, + new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter + (), new StandaloneCompletedCheckpointStore(2, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); coord.startCheckpointScheduler(); @@ -1278,17 +1528,18 @@ public class CheckpointCoordinatorTest { ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID); CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 10, // periodic interval is 10 ms - 200000, // timeout is very long (200 s) - 0L, // no extra delay - maxConcurrentAttempts, // max two concurrent checkpoints - 42, - new ExecutionVertex[] { triggerVertex }, - new ExecutionVertex[] { ackVertex }, - new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter - (), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE, - new DisabledCheckpointStatsTracker()); + jid, + 10, // periodic interval is 10 ms + 200000, // timeout is very long (200 s) + 0L, // no extra delay + maxConcurrentAttempts, // max two concurrent checkpoints + 42, + new ExecutionVertex[] { triggerVertex }, + new ExecutionVertex[] { ackVertex }, + new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter + (), new StandaloneCompletedCheckpointStore(2, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); coord.startCheckpointScheduler(); @@ -1358,17 +1609,18 @@ public class CheckpointCoordinatorTest { }); CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 10, // periodic interval is 10 ms - 200000, // timeout is very long (200 s) - 0L, // no extra delay - 2, // max two concurrent checkpoints - 42, - new ExecutionVertex[] { triggerVertex }, - new ExecutionVertex[] { ackVertex }, - new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE, - new DisabledCheckpointStatsTracker()); + jid, + 10, // periodic interval is 10 ms + 200000, // timeout is very long (200 s) + 0L, // no extra delay + 2, // max two concurrent checkpoints + 42, + new ExecutionVertex[] { triggerVertex }, + new ExecutionVertex[] { ackVertex }, + new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); coord.startCheckpointScheduler(); http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 2b1b7e1..061059a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -19,13 +19,14 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore; +import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.LocalStateHandle; import org.apache.flink.runtime.state.StateHandle; @@ -39,7 +40,10 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests concerning the restoring of state from a checkpoint to the task executions. @@ -81,17 +85,20 @@ public class CheckpointStateRestoreTest { CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 200000L, - 200000L, - 42, - new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, - new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, - new ExecutionVertex[0], - cl, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - RecoveryMode.STANDALONE); + jid, + 200000L, + 200000L, + 0, + Integer.MAX_VALUE, + 42, + new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, + new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, + new ExecutionVertex[0], + cl, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); // create ourselves a checkpoint with state final long timestamp = 34623786L; @@ -158,17 +165,20 @@ public class CheckpointStateRestoreTest { CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 200000L, - 200000L, - 42, - new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, - new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, - new ExecutionVertex[0], - cl, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - RecoveryMode.STANDALONE); + jid, + 200000L, + 200000L, + 0, + Integer.MAX_VALUE, + 42, + new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, + new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, + new ExecutionVertex[0], + cl, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); // create ourselves a checkpoint with state final long timestamp = 34623786L; @@ -206,15 +216,19 @@ public class CheckpointStateRestoreTest { public void testNoCheckpointAvailable() { try { CheckpointCoordinator coord = new CheckpointCoordinator( - new JobID(), - 200000L, - 200000L, - 42, - new ExecutionVertex[] { mock(ExecutionVertex.class) }, - new ExecutionVertex[] { mock(ExecutionVertex.class) }, - new ExecutionVertex[0], cl, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE); + new JobID(), + 200000L, + 200000L, + 0, + Integer.MAX_VALUE, + 42, + new ExecutionVertex[] { mock(ExecutionVertex.class) }, + new ExecutionVertex[] { mock(ExecutionVertex.class) }, + new ExecutionVertex[0], cl, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); try { coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false); http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java index 84d809a..634e177 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java @@ -238,7 +238,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { long timestamp, Map<JobVertexID, TaskState> taskGroupStates) { - super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates); + super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates, true); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java new file mode 100644 index 0000000..90a6836 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; + +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class CompletedCheckpointTest { + + /** + * Tests that the `deleteStateWhenDisposed` flag is correctly forwarded. + */ + @Test + public void testDiscard() throws Exception { + TaskState state = mock(TaskState.class); + Map<JobVertexID, TaskState> taskStates = new HashMap<>(); + taskStates.put(new JobVertexID(), state); + + // Verify discard call is forwarded to state + CompletedCheckpoint checkpoint = new CompletedCheckpoint(new JobID(), 0, 0, 1, taskStates, true); + checkpoint.discard(ClassLoader.getSystemClassLoader()); + verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + + Mockito.reset(state); + + // Verify discard call is not forwarded to state + checkpoint = new CompletedCheckpoint(new JobID(), 0, 0, 1, taskStates, false); + checkpoint.discard(ClassLoader.getSystemClassLoader()); + verify(state, times(0)).discard(Matchers.any(ClassLoader.class)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index 0f2c2b2..7b05fd7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -27,26 +27,22 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator; import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; import org.junit.AfterClass; import org.junit.Test; import scala.concurrent.duration.FiniteDuration; -import java.lang.reflect.Field; import java.net.URL; import java.util.Collections; import java.util.UUID; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -59,28 +55,6 @@ public class ExecutionGraphCheckpointCoordinatorTest { public static void teardown() { JavaTestKit.shutdownActorSystem(system); } - - @Test - public void testCheckpointAndSavepointCoordinatorShareCheckpointIDCounter() throws Exception { - ExecutionGraph executionGraph = createExecutionGraphAndEnableCheckpointing( - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, ClassLoader.getSystemClassLoader())); - - CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); - SavepointCoordinator savepointCoordinator = executionGraph.getSavepointCoordinator(); - - // Both the checkpoint and savepoint coordinator need to operate - // with the same checkpoint ID counter. - Field counterField = CheckpointCoordinator.class.getDeclaredField("checkpointIdCounter"); - - CheckpointIDCounter counterCheckpointCoordinator = (CheckpointIDCounter) counterField - .get(checkpointCoordinator); - - CheckpointIDCounter counterSavepointCoordinator = (CheckpointIDCounter) counterField - .get(savepointCoordinator); - - assertEquals(counterCheckpointCoordinator, counterSavepointCoordinator); - } /** * Tests that a shut down checkpoint coordinator calls shutdown on @@ -94,8 +68,7 @@ public class ExecutionGraphCheckpointCoordinatorTest { ExecutionGraph graph = createExecutionGraphAndEnableCheckpointing(counter, store); graph.fail(new Exception("Test Exception")); - // Two times, because shared with savepoint coordinator - verify(counter, times(2)).shutdown(); + verify(counter, times(1)).shutdown(); verify(store, times(1)).shutdown(); } @@ -115,8 +88,7 @@ public class ExecutionGraphCheckpointCoordinatorTest { verify(counter, times(0)).shutdown(); verify(store, times(0)).shutdown(); - // Two times, because shared with savepoint coordinator - verify(counter, times(2)).suspend(); + verify(counter, times(1)).suspend(); verify(store, times(1)).suspend(); } @@ -149,7 +121,6 @@ public class ExecutionGraphCheckpointCoordinatorTest { UUID.randomUUID(), counter, store, - RecoveryMode.STANDALONE, new HeapSavepointStore(), new DisabledCheckpointStatsTracker()); http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java new file mode 100644 index 0000000..d235e61 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class PendingCheckpointTest { + + private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS = new HashMap<>(); + private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID(); + + static { + ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class)); + } + + /** + * Tests that pending checkpoints can be subsumed. + */ + @Test + public void testCanBeSubsumed() throws Exception { + PendingCheckpoint pending = createPendingCheckpoint(); + assertTrue(pending.canBeSubsumed()); + } + + /** + * Tests that abort discards state. + */ + @Test + @SuppressWarnings("unchecked") + public void testAbort() throws Exception { + TaskState state = mock(TaskState.class); + + // Abort declined + PendingCheckpoint pending = createPendingCheckpoint(); + setTaskState(pending, state); + + pending.abortDeclined(); + verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + + // Abort error + Mockito.reset(state); + + pending = createPendingCheckpoint(); + setTaskState(pending, state); + + pending.abortError(new Exception("Expected Test Exception")); + verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + + // Abort expired + Mockito.reset(state); + + pending = createPendingCheckpoint(); + setTaskState(pending, state); + + pending.abortExpired(); + verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + + // Abort subsumed + Mockito.reset(state); + + pending = createPendingCheckpoint(); + setTaskState(pending, state); + + pending.abortSubsumed(); + verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + } + + /** + * Tests that the CompletedCheckpoint `deleteStateWhenDisposed` flag is + * correctly set to true. + */ + @Test + public void testFinalizeCheckpoint() throws Exception { + TaskState state = mock(TaskState.class); + PendingCheckpoint pending = createPendingCheckpoint(); + PendingCheckpointTest.setTaskState(pending, state); + + pending.acknowledgeTask(ATTEMPT_ID, null, 0, null); + + CompletedCheckpoint checkpoint = pending.finalizeCheckpoint(); + + // Does discard state + checkpoint.discard(ClassLoader.getSystemClassLoader()); + verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + } + + // ------------------------------------------------------------------------ + + private static PendingCheckpoint createPendingCheckpoint() { + ClassLoader classLoader = ClassLoader.getSystemClassLoader(); + Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS); + return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, classLoader); + } + + @SuppressWarnings("unchecked") + static void setTaskState(PendingCheckpoint pending, TaskState state) throws NoSuchFieldException, IllegalAccessException { + Field field = PendingCheckpoint.class.getDeclaredField("taskStates"); + field.setAccessible(true); + Map<JobVertexID, TaskState> taskStates = (Map<JobVertexID, TaskState>) field.get(pending); + + taskStates.put(new JobVertexID(), state); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java new file mode 100644 index 0000000..6ae6e1c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class PendingSavepointTest { + + private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS = new HashMap<>(); + private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID(); + + static { + ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class)); + } + + /** + * Tests that pending savepoints cannot be subsumed. + */ + @Test + public void testCanBeSubsumed() throws Exception { + PendingSavepoint pending = createPendingSavepoint(); + assertFalse(pending.canBeSubsumed()); + } + + /** + * Tests that abort discards state fails the completeion future. + */ + @Test + @SuppressWarnings("unchecked") + public void testAbort() throws Exception { + TaskState state = mock(TaskState.class); + + // Abort declined + PendingSavepoint pending = createPendingSavepoint(); + PendingCheckpointTest.setTaskState(pending, state); + + pending.abortDeclined(); + verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + + // Abort error + Mockito.reset(state); + + pending = createPendingSavepoint(); + PendingCheckpointTest.setTaskState(pending, state); + Future<String> future = pending.getCompletionFuture(); + + pending.abortError(new Exception("Expected Test Exception")); + verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + assertTrue(future.failed().isCompleted()); + + // Abort expired + Mockito.reset(state); + + pending = createPendingSavepoint(); + PendingCheckpointTest.setTaskState(pending, state); + future = pending.getCompletionFuture(); + + pending.abortExpired(); + verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + assertTrue(future.failed().isCompleted()); + + // Abort subsumed + pending = createPendingSavepoint(); + + try { + pending.abortSubsumed(); + fail("Did not throw expected Exception"); + } catch (Throwable ignored) { // expected + } + } + + /** + * Tests that the CompletedCheckpoint `deleteStateWhenDisposed` flag is + * correctly set to false. + */ + @Test + public void testFinalizeCheckpoint() throws Exception { + TaskState state = mock(TaskState.class); + PendingSavepoint pending = createPendingSavepoint(); + PendingCheckpointTest.setTaskState(pending, state); + + Future<String> future = pending.getCompletionFuture(); + + pending.acknowledgeTask(ATTEMPT_ID, null, 0, null); + + CompletedCheckpoint checkpoint = pending.finalizeCheckpoint(); + + // Does _NOT_ discard state + checkpoint.discard(ClassLoader.getSystemClassLoader()); + verify(state, times(0)).discard(Matchers.any(ClassLoader.class)); + + // Future is completed + String path = Await.result(future, Duration.Zero()); + assertNotNull(path); + } + + // ------------------------------------------------------------------------ + + private static PendingSavepoint createPendingSavepoint() { + ClassLoader classLoader = ClassLoader.getSystemClassLoader(); + Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS); + return new PendingSavepoint(new JobID(), 0, 1, ackTasks, classLoader, new HeapSavepointStore()); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStoreTest.java deleted file mode 100644 index ec3dd0a..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStoreTest.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.savepoint; - -public class HeapSavepointStoreTest { - - - -}
