Repository: flink Updated Branches: refs/heads/master f79168052 -> 47acdeadf
[FLINK-4322] [checkpointing] Unify CheckpointCoordinator and SavepointCoordinator The CheckpointCoordinator now also takes over the role of the SavepointCoordinator. Savepoints are just like other checkpoints - they only store the metadata in addition. Restoring from a savepoint means loading it into the CheckpointStore at startup. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76ca1a79 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76ca1a79 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76ca1a79 Branch: refs/heads/master Commit: 76ca1a7955fedd8583a0af12289a14d0f1bcf868 Parents: f791680 Author: Stephan Ewen <[email protected]> Authored: Mon Aug 8 19:18:44 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Aug 17 19:06:02 2016 +0200 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 257 ++++++------ .../checkpoint/CheckpointDeclineReason.java | 50 +++ .../checkpoint/CheckpointProperties.java | 61 +++ .../checkpoint/CheckpointTriggerResult.java | 91 +++++ .../runtime/checkpoint/CompletedCheckpoint.java | 64 ++- .../runtime/checkpoint/PendingCheckpoint.java | 130 +++++-- .../runtime/checkpoint/PendingSavepoint.java | 155 ++++++++ .../savepoint/SavepointCoordinator.java | 386 ------------------- .../SavepointCoordinatorDeActivator.java | 64 --- .../checkpoint/savepoint/SavepointLoader.java | 98 +++++ .../runtime/executiongraph/ExecutionGraph.java | 92 +---- .../flink/runtime/jobmanager/JobManager.scala | 68 ++-- .../checkpoint/CheckpointPropertiesTest.java | 37 ++ 13 files changed, 768 insertions(+), 785 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 6aaa014..6d44e61 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -21,9 +21,10 @@ package org.apache.flink.runtime.checkpoint; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; + import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; -import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -39,9 +40,14 @@ import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.util.SerializedValue; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.Promise; + +import javax.annotation.Nullable; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; @@ -75,8 +81,10 @@ public class CheckpointCoordinator { /** The number of recent checkpoints whose IDs are remembered */ private static final int NUM_GHOST_CHECKPOINT_IDS = 16; + // ------------------------------------------------------------------------ + /** Coordinator-wide lock to safeguard the checkpoint updates */ - protected final Object lock = new Object(); + private final Object lock = new Object(); /** The job whose checkpoint this coordinator coordinates */ private final JobID job; @@ -97,12 +105,15 @@ public class CheckpointCoordinator { * accessing this don't block the job manager actor and run asynchronously. */ private final CompletedCheckpointStore completedCheckpointStore; + /** Store for savepoints. */ + private final SavepointStore savepointStore; + /** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */ private final ArrayDeque<Long> recentPendingCheckpoints; /** Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these * need to be ascending across job managers. */ - protected final CheckpointIDCounter checkpointIdCounter; + private final CheckpointIDCounter checkpointIdCounter; /** Class loader used to deserialize the state handles (as they may be user-defined) */ private final ClassLoader userClassLoader; @@ -151,7 +162,7 @@ public class CheckpointCoordinator { /** Helper for tracking checkpoint statistics */ private final CheckpointStatsTracker statsTracker; - protected final int numberKeyGroups; + private final int numberKeyGroups; // -------------------------------------------------------------------------------------------- @@ -159,25 +170,6 @@ public class CheckpointCoordinator { JobID job, long baseInterval, long checkpointTimeout, - int numberKeyGroups, - ExecutionVertex[] tasksToTrigger, - ExecutionVertex[] tasksToWaitFor, - ExecutionVertex[] tasksToCommitTo, - ClassLoader userClassLoader, - CheckpointIDCounter checkpointIDCounter, - CompletedCheckpointStore completedCheckpointStore, - RecoveryMode recoveryMode) throws Exception { - - this(job, baseInterval, checkpointTimeout, 0L, Integer.MAX_VALUE, numberKeyGroups, - tasksToTrigger, tasksToWaitFor, tasksToCommitTo, - userClassLoader, checkpointIDCounter, completedCheckpointStore, recoveryMode, - new DisabledCheckpointStatsTracker()); - } - - public CheckpointCoordinator( - JobID job, - long baseInterval, - long checkpointTimeout, long minPauseBetweenCheckpoints, int maxConcurrentCheckpointAttempts, int numberKeyGroups, @@ -187,14 +179,16 @@ public class CheckpointCoordinator { ClassLoader userClassLoader, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, + SavepointStore savepointStore, RecoveryMode recoveryMode, CheckpointStatsTracker statsTracker) throws Exception { - // Sanity check + // sanity checks checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero"); checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero"); checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0"); checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1"); + checkArgument(numberKeyGroups >= 1, "numberKeyGroups must be >= 1"); this.job = checkNotNull(job); this.baseInterval = baseInterval; @@ -205,17 +199,16 @@ public class CheckpointCoordinator { this.tasksToWaitFor = checkNotNull(tasksToWaitFor); this.tasksToCommitTo = checkNotNull(tasksToCommitTo); this.pendingCheckpoints = new LinkedHashMap<Long, PendingCheckpoint>(); + this.checkpointIdCounter = checkNotNull(checkpointIDCounter); this.completedCheckpointStore = checkNotNull(completedCheckpointStore); + this.savepointStore = checkNotNull(savepointStore); this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS); - this.userClassLoader = userClassLoader; - - // Started with the periodic scheduler - this.checkpointIdCounter = checkNotNull(checkpointIDCounter); + this.userClassLoader = checkNotNull(userClassLoader); + this.statsTracker = checkNotNull(statsTracker); + this.numberKeyGroups = numberKeyGroups; this.timer = new Timer("Checkpoint Timer", true); - this.statsTracker = checkNotNull(statsTracker); - if (recoveryMode == RecoveryMode.STANDALONE) { // Add shutdown hook to clean up state handles when no checkpoint recovery is // possible. In case of another configured recovery mode, the checkpoints need to be @@ -248,29 +241,12 @@ public class CheckpointCoordinator { this.shutdownHook = null; } - this.numberKeyGroups = numberKeyGroups; - } - - // -------------------------------------------------------------------------------------------- - // Callbacks - // -------------------------------------------------------------------------------------------- - - /** - * Callback on shutdown of the coordinator. Called in lock scope. - */ - protected void onShutdown() { - } - - /** - * Callback on cancellation of a checkpoint. Called in lock scope. - */ - protected void onCancelCheckpoint(long canceledCheckpointId) { - } - - /** - * Callback on full acknowledgement of a checkpoint. Called in lock scope. - */ - protected void onFullyAcknowledgedCheckpoint(CompletedCheckpoint checkpoint) { + // make sure the checkpoint ID enumerator is running + try { + checkpointIdCounter.start(); + } catch (Exception e) { + throw new Exception("Failed to start checkpoint ID counter: " + e.getMessage(), e); + } } // -------------------------------------------------------------------------------------------- @@ -328,7 +304,7 @@ public class CheckpointCoordinator { // clear and discard all pending checkpoints for (PendingCheckpoint pending : pendingCheckpoints.values()) { - pending.discard(userClassLoader); + pending.abortError(new Exception("Checkpoint Coordinator is shutting down")); } pendingCheckpoints.clear(); @@ -339,8 +315,6 @@ public class CheckpointCoordinator { completedCheckpointStore.suspend(); checkpointIdCounter.suspend(); } - - onShutdown(); } } finally { // Remove shutdown hook to prevent resource leaks, unless this is invoked by the @@ -368,6 +342,21 @@ public class CheckpointCoordinator { // Handling checkpoints and messages // -------------------------------------------------------------------------------------------- + public Future<String> triggerSavepoint(long timestamp) throws Exception { + CheckpointTriggerResult result = triggerCheckpoint(timestamp, CheckpointProperties.forStandardSavepoint()); + + if (result.isSuccess()) { + PendingSavepoint savepoint = (PendingSavepoint) result.getPendingCheckpoint(); + return savepoint.getCompletionFuture(); + } + else { + final Promise<String> promise = new scala.concurrent.impl.Promise.DefaultPromise<>(); + promise.failure( + new Exception("Failed to trigger savepoint: " + result.getFailureReason().message())); + return promise.future(); + } + } + /** * Triggers a new checkpoint and uses the given timestamp as the checkpoint * timestamp. @@ -375,42 +364,39 @@ public class CheckpointCoordinator { * @param timestamp The timestamp for the checkpoint. */ public boolean triggerCheckpoint(long timestamp) throws Exception { - return triggerCheckpoint(timestamp, -1); + return triggerCheckpoint(timestamp, CheckpointProperties.forStandardCheckpoint()).isSuccess(); } - /** - * Triggers a new checkpoint and uses the given timestamp as the checkpoint - * timestamp. - * - * @param timestamp The timestamp for the checkpoint. - * @param nextCheckpointId The checkpoint ID to use for this checkpoint or <code>-1</code> if - * the checkpoint ID counter should be queried. - */ - public boolean triggerCheckpoint(long timestamp, long nextCheckpointId) throws Exception { + CheckpointTriggerResult triggerCheckpoint(long timestamp, CheckpointProperties props) throws Exception { // make some eager pre-checks synchronized (lock) { // abort if the coordinator has been shutdown in the meantime if (shutdown) { - return false; + return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN); } - // sanity check: there should never be more than one trigger request queued - if (triggerRequestQueued) { - LOG.warn("Trying to trigger another checkpoint while one was queued already"); - return false; - } + // validate whether the checkpoint can be triggered, with respect to the limit of + // concurrent checkpoints, and the minimum time between checkpoints. + // these checks are not relevant for savepoints + if (!props.isSavepoint()) { + // sanity check: there should never be more than one trigger request queued + if (triggerRequestQueued) { + LOG.warn("Trying to trigger another checkpoint while one was queued already"); + return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED); + } - // if too many checkpoints are currently in progress, we need to mark that a request is queued - if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { - triggerRequestQueued = true; - if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); - currentPeriodicTrigger = null; + // if too many checkpoints are currently in progress, we need to mark that a request is queued + if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { + triggerRequestQueued = true; + if (currentPeriodicTrigger != null) { + currentPeriodicTrigger.cancel(); + currentPeriodicTrigger = null; + } + return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); } - return false; } - //make sure the minimum interval between checkpoints has passed + // make sure the minimum interval between checkpoints has passed if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) { if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(); @@ -418,11 +404,11 @@ public class CheckpointCoordinator { } ScheduledTrigger trigger = new ScheduledTrigger(); timer.scheduleAtFixedRate(trigger, minPauseBetweenCheckpoints, baseInterval); - return false; + return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } } - // first check if all tasks that we need to trigger are running. + // check if all tasks that we need to trigger are running. // if not, abort the checkpoint ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length]; for (int i = 0; i < tasksToTrigger.length; i++) { @@ -432,7 +418,7 @@ public class CheckpointCoordinator { } else { LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.", tasksToTrigger[i].getSimpleName()); - return false; + return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } } @@ -447,7 +433,7 @@ public class CheckpointCoordinator { } else { LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.", ev.getSimpleName()); - return false; + return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } } @@ -455,25 +441,22 @@ public class CheckpointCoordinator { lastTriggeredCheckpoint = timestamp; final long checkpointID; - if (nextCheckpointId < 0) { - try { - // this must happen outside the locked scope, because it communicates - // with external services (in HA mode) and may block for a while. - checkpointID = checkpointIdCounter.getAndIncrement(); - } - catch (Throwable t) { - int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers; - LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); - return false; - } + try { + // this must happen outside the locked scope, because it communicates + // with external services (in HA mode) and may block for a while. + checkpointID = checkpointIdCounter.getAndIncrement(); } - else { - checkpointID = nextCheckpointId; + catch (Throwable t) { + int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers; + LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); + return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp); - final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks); + final PendingCheckpoint checkpoint = props.isSavepoint() ? + new PendingSavepoint(job, checkpointID, timestamp, ackTasks, userClassLoader, savepointStore) : + new PendingCheckpoint(job, checkpointID, timestamp, ackTasks, userClassLoader); // schedule the timer that will clean up the expired checkpoints TimerTask canceller = new TimerTask() { @@ -486,12 +469,10 @@ public class CheckpointCoordinator { if (!checkpoint.isDiscarded()) { LOG.info("Checkpoint " + checkpointID + " expired before completing."); - checkpoint.discard(userClassLoader); + checkpoint.abortExpired(); pendingCheckpoints.remove(checkpointID); rememberRecentCheckpointId(checkpointID); - onCancelCheckpoint(checkpointID); - triggerQueuedRequests(); } } @@ -511,19 +492,21 @@ public class CheckpointCoordinator { // blocking progress, and still gives us early checks that skip work // if no checkpoint can happen anyways if (shutdown) { - return false; - } - else if (triggerRequestQueued) { - LOG.warn("Trying to trigger another checkpoint while one was queued already"); - return false; + return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN); } - else if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { - triggerRequestQueued = true; - if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); - currentPeriodicTrigger = null; + else if (!props.isSavepoint()) { + if (triggerRequestQueued) { + LOG.warn("Trying to trigger another checkpoint while one was queued already"); + return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED); + } + else if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { + triggerRequestQueued = true; + if (currentPeriodicTrigger != null) { + currentPeriodicTrigger.cancel(); + currentPeriodicTrigger = null; + } + return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); } - return false; } pendingCheckpoints.put(checkpointID, checkpoint); @@ -539,7 +522,7 @@ public class CheckpointCoordinator { } numUnsuccessfulCheckpointsTriggers = 0; - return true; + return new CheckpointTriggerResult(checkpoint); } catch (Throwable t) { // guard the map against concurrent modifications @@ -549,10 +532,11 @@ public class CheckpointCoordinator { int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers; LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); + if (!checkpoint.isDiscarded()) { - checkpoint.discard(userClassLoader); + checkpoint.abortError(new Exception("Failed to trigger checkpoint")); } - return false; + return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } } @@ -598,11 +582,9 @@ public class CheckpointCoordinator { + " because of checkpoint decline from task " + message.getTaskExecutionId()); pendingCheckpoints.remove(checkpointId); - checkpoint.discard(userClassLoader); + checkpoint.abortDeclined(); rememberRecentCheckpointId(checkpointId); - onCancelCheckpoint(checkpointId); - boolean haveMoreRecentPending = false; Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator(); while (entries.hasNext()) { @@ -683,6 +665,7 @@ public class CheckpointCoordinator { message.getState(), message.getStateSize(), null)) { // TODO: Give KV-state to the acknowledgeTask method + if (checkpoint.isFullyAcknowledged()) { completed = checkpoint.finalizeCheckpoint(); @@ -705,8 +688,6 @@ public class CheckpointCoordinator { dropSubsumedCheckpoints(completed.getTimestamp()); - onFullyAcknowledgedCheckpoint(completed); - triggerQueuedRequests(); } } @@ -762,15 +743,12 @@ public class CheckpointCoordinator { private void dropSubsumedCheckpoints(long timestamp) throws Exception { Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator(); + while (entries.hasNext()) { PendingCheckpoint p = entries.next().getValue(); - if (p.getCheckpointTimestamp() < timestamp) { + if (p.getCheckpointTimestamp() < timestamp && p.canBeSubsumed()) { rememberRecentCheckpointId(p.getCheckpointId()); - - p.discard(userClassLoader); - - onCancelCheckpoint(p.getCheckpointId()); - + p.abortSubsumed(); entries.remove(); } } @@ -927,17 +905,12 @@ public class CheckpointCoordinator { } } - protected long getAndIncrementCheckpointId() { - try { - // this must happen outside the locked scope, because it communicates - // with external services (in HA mode) and may block for a while. - return checkpointIdCounter.getAndIncrement(); - } - catch (Throwable t) { - int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers; - LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); - return -1; - } + public CompletedCheckpointStore getCheckpointStore() { + return completedCheckpointStore; + } + + public CheckpointIDCounter getCheckpointIdCounter() { + return checkpointIdCounter; } protected ActorGateway getJobStatusListener() { @@ -961,14 +934,6 @@ public class CheckpointCoordinator { // make sure all prior timers are cancelled stopCheckpointScheduler(); - try { - // Multiple start calls are OK - checkpointIdCounter.start(); - } catch (Exception e) { - String msg = "Failed to start checkpoint ID counter: " + e.getMessage(); - throw new RuntimeException(msg, e); - } - periodicScheduling = true; currentPeriodicTrigger = new ScheduledTrigger(); timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval); @@ -986,7 +951,7 @@ public class CheckpointCoordinator { } for (PendingCheckpoint p : pendingCheckpoints.values()) { - p.discard(userClassLoader); + p.abortError(new Exception("Checkpoint Coordinator is suspending.")); } pendingCheckpoints.clear(); http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java new file mode 100644 index 0000000..2cc9094 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +/** + * Various reasons why a checkpoint was declined. + */ +public enum CheckpointDeclineReason { + + COORDINATOR_SHUTDOWN("Checkpoint coordinator is shut down."), + + ALREADY_QUEUED("Another checkpoint request has already been queued."), + + TOO_MANY_CONCURRENT_CHECKPOINTS("The maximum number of concurrent checkpoints is exceeded"), + + MINIMUM_TIME_BETWEEN_CHECKPOINTS("The minimum time between checkpoints is still pending. " + + "Checkpoint will be triggered after the minimum time."), + + NOT_ALL_REQUIRED_TASKS_RUNNING("Not all required tasks are currently running."), + + EXCEPTION("An Exception occurred while triggering the checkpoint."); + + // ------------------------------------------------------------------------ + + private final String message; + + CheckpointDeclineReason(String message) { + this.message = message; + } + + public String message() { + return message; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java new file mode 100644 index 0000000..7ea645a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +/** + * The configuration of a checkpoint, such as whether + * <ul> + * <li>The checkpoint is a savepoint</li> + * <li>The checkpoint must be full, or may be incremental</li> + * <li>The checkpoint format must be the common (cross backend) format, or may be state-backend specific</li> + * </ul> + */ +public class CheckpointProperties { + + private final boolean isSavepoint; + + private CheckpointProperties(boolean isSavepoint) { + this.isSavepoint = isSavepoint; + } + + // ------------------------------------------------------------------------ + + public boolean isSavepoint() { + return isSavepoint; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "CheckpointProperties {" + + "isSavepoint=" + isSavepoint + + '}'; + } + + // ------------------------------------------------------------------------ + + public static CheckpointProperties forStandardSavepoint() { + return new CheckpointProperties(true); + } + + public static CheckpointProperties forStandardCheckpoint() { + return new CheckpointProperties(false); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java new file mode 100644 index 0000000..3f91407 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The result of triggering a checkpoint. May be a declined checkpoint trigger attempt, + * or a pending checkpoint. + */ +class CheckpointTriggerResult { + + /** If success, the pending checkpoint created after the successfully trigger, otherwise null */ + private final PendingCheckpoint success; + + /** If failure, the reason why the triggering was declined, otherwise null. */ + private final CheckpointDeclineReason failure; + + // ------------------------------------------------------------------------ + + /** + * Creates a successful checkpoint trigger result. + * + * @param success The pending checkpoint created after the successfully trigger. + */ + CheckpointTriggerResult(PendingCheckpoint success) { + this.success = checkNotNull(success); + this.failure = null; + } + + /** + * Creates a failed checkpoint trigger result. + * + * @param failure The reason why the checkpoint could not be triggered. + */ + CheckpointTriggerResult(CheckpointDeclineReason failure) { + this.success = null; + this.failure = checkNotNull(failure); + } + + // ------------------------------------------------------------------------ + + public boolean isSuccess() { + return success != null; + } + + public boolean isFailure() { + return failure != null; + } + + public PendingCheckpoint getPendingCheckpoint() { + if (success != null) { + return success; + } else { + throw new IllegalStateException(); + } + } + + public CheckpointDeclineReason getFailureReason() { + if (failure != null) { + return failure; + } else { + throw new IllegalStateException(); + } + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return isSuccess() ? + ("success: " + success) : + ("failure: " + failure.message()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index ecc67b4..d8c6c7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -20,11 +20,12 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.util.Preconditions; import java.io.Serializable; import java.util.Map; -import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state) @@ -47,20 +48,34 @@ public class CompletedCheckpoint implements Serializable { /** States of the different task groups belonging to this checkpoint */ private final Map<JobVertexID, TaskState> taskStates; - public CompletedCheckpoint( - JobID job, - long checkpointID, - long timestamp, - long completionTimestamp, - Map<JobVertexID, TaskState> taskStates) { + /** Flag to indicate whether the completed checkpoint data should be deleted when this + * handle to the checkpoint is disposed */ + private final boolean deleteStateWhenDisposed; - this.job = Preconditions.checkNotNull(job); + // ------------------------------------------------------------------------ + + public CompletedCheckpoint( + JobID job, + long checkpointID, + long timestamp, + long completionTimestamp, + Map<JobVertexID, TaskState> taskStates, + boolean deleteStateWhenDisposed) { + + checkArgument(checkpointID >= 0); + checkArgument(timestamp >= 0); + checkArgument(completionTimestamp >= 0); + + this.job = checkNotNull(job); this.checkpointID = checkpointID; this.timestamp = timestamp; this.duration = completionTimestamp - timestamp; - this.taskStates = Preconditions.checkNotNull(taskStates); + this.taskStates = checkNotNull(taskStates); + this.deleteStateWhenDisposed = deleteStateWhenDisposed; } + // ------------------------------------------------------------------------ + public JobID getJobId() { return job; } @@ -96,10 +111,12 @@ public class CompletedCheckpoint implements Serializable { } // -------------------------------------------------------------------------------------------- - + public void discard(ClassLoader userClassLoader) throws Exception { - for (TaskState state: taskStates.values()) { - state.discard(userClassLoader); + if (deleteStateWhenDisposed) { + for (TaskState state: taskStates.values()) { + state.discard(userClassLoader); + } } taskStates.clear(); @@ -108,27 +125,6 @@ public class CompletedCheckpoint implements Serializable { // -------------------------------------------------------------------------------------------- @Override - public boolean equals(Object obj) { - if (obj instanceof CompletedCheckpoint) { - CompletedCheckpoint other = (CompletedCheckpoint) obj; - - return job.equals(other.job) && checkpointID == other.checkpointID && - timestamp == other.timestamp && duration == other.duration && - taskStates.equals(other.taskStates); - } else { - return false; - } - } - - @Override - public int hashCode() { - return (int) (this.checkpointID ^ this.checkpointID >>> 32) + - 31 * ((int) (this.timestamp ^ this.timestamp >>> 32) + - 31 * ((int) (this.duration ^ this.duration >>> 32) + - 31 * Objects.hash(job, taskStates))); - } - - @Override public String toString() { return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job); } http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index cd1f6c4..ab3fe0a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -28,54 +28,73 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.util.SerializedValue; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A pending checkpoint is a checkpoint that has been started, but has not been * acknowledged by all tasks that need to acknowledge it. Once all tasks have * acknowledged it, it becomes a {@link CompletedCheckpoint}. * * <p>Note that the pending checkpoint, as well as the successful checkpoint keep the - * state handles always as serialized values, never as actual values.</p> + * state handles always as serialized values, never as actual values. */ public class PendingCheckpoint { - + private final Object lock = new Object(); - + private final JobID jobId; - + private final long checkpointId; - + private final long checkpointTimestamp; private final Map<JobVertexID, TaskState> taskStates; private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks; - + + private final ClassLoader userCodeClassLoader; + + private final boolean disposeWhenSubsumed; + private int numAcknowledgedTasks; - + private boolean discarded; - + // -------------------------------------------------------------------------------------------- - - public PendingCheckpoint(JobID jobId, long checkpointId, long checkpointTimestamp, - Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm) + + public PendingCheckpoint( + JobID jobId, + long checkpointId, + long checkpointTimestamp, + Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm, + ClassLoader userCodeClassLoader) { + this(jobId, checkpointId, checkpointTimestamp, verticesToConfirm, userCodeClassLoader, true); + } + + PendingCheckpoint( + JobID jobId, + long checkpointId, + long checkpointTimestamp, + Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm, + ClassLoader userCodeClassLoader, + boolean disposeWhenSubsumed) { - if (jobId == null || verticesToConfirm == null) { - throw new NullPointerException(); - } - if (verticesToConfirm.size() == 0) { - throw new IllegalArgumentException("Checkpoint needs at least one vertex that commits the checkpoint"); - } - - this.jobId = jobId; + this.jobId = checkNotNull(jobId); this.checkpointId = checkpointId; this.checkpointTimestamp = checkpointTimestamp; - - this.notYetAcknowledgedTasks = verticesToConfirm; + this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm); + this.userCodeClassLoader = checkNotNull(userCodeClassLoader); + this.disposeWhenSubsumed = disposeWhenSubsumed; this.taskStates = new HashMap<>(); + + checkArgument(verticesToConfirm.size() > 0, + "Checkpoint needs at least one vertex that commits the checkpoint"); } - - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ public JobID getJobId() { return jobId; @@ -92,7 +111,7 @@ public class PendingCheckpoint { public int getNumberOfNonAcknowledgedTasks() { return notYetAcknowledgedTasks.size(); } - + public int getNumberOfAcknowledgedTasks() { return numAcknowledgedTasks; } @@ -104,11 +123,25 @@ public class PendingCheckpoint { public boolean isFullyAcknowledged() { return this.notYetAcknowledgedTasks.isEmpty() && !discarded; } - + public boolean isDiscarded() { return discarded; } - + + /** + * Checks whether this checkpoint can be subsumed or whether it should always continue, regardless + * of newer checkpoints in progress. + * + * @return True if the checkpoint can be subsumed, false otherwise. + */ + public boolean canBeSubsumed() { + return true; + } + + // ------------------------------------------------------------------------ + // Progress and Completion + // ------------------------------------------------------------------------ + public CompletedCheckpoint finalizeCheckpoint() throws Exception { synchronized (lock) { if (discarded) { @@ -120,9 +153,11 @@ public class PendingCheckpoint { checkpointId, checkpointTimestamp, System.currentTimeMillis(), - new HashMap<>(taskStates)); - dispose(null, false); - + new HashMap<>(taskStates), + disposeWhenSubsumed); + + dispose(false); + return completed; } else { @@ -190,22 +225,45 @@ public class PendingCheckpoint { } } } - + + // ------------------------------------------------------------------------ + // Cancellation + // ------------------------------------------------------------------------ + + /** + * Aborts a checkpoint because it expired (took too long). + */ + public void abortExpired() throws Exception { + dispose(true); + } + + /** + * Aborts the pending checkpoint because a newer completed checkpoint subsumed it. + */ + public void abortSubsumed() throws Exception { + dispose(true); + } + + public void abortDeclined() throws Exception { + dispose(true); + } + /** - * Discards the pending checkpoint, releasing all held resources. + * Aborts the pending checkpoint due to an error. + * @param cause The error's exception. */ - public void discard(ClassLoader userClassLoader) throws Exception { - dispose(userClassLoader, true); + public void abortError(Throwable cause) throws Exception { + dispose(true); } - private void dispose(ClassLoader userClassLoader, boolean releaseState) throws Exception { + protected void dispose(boolean releaseState) throws Exception { synchronized (lock) { discarded = true; numAcknowledgedTasks = -1; try { if (releaseState) { for (TaskState taskState : taskStates.values()) { - taskState.discard(userClassLoader); + taskState.discard(userCodeClassLoader); } } } finally { @@ -219,7 +277,7 @@ public class PendingCheckpoint { @Override public String toString() { - return String.format("PendingCheckpoint %d @ %d - confirmed=%d, pending=%d", + return String.format("Pending Checkpoint %d @ %d - confirmed=%d, pending=%d", checkpointId, checkpointTimestamp, getNumberOfAcknowledgedTasks(), getNumberOfNonAcknowledgedTasks()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java new file mode 100644 index 0000000..92cdd04 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV0; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.util.ExceptionUtils; + +import org.slf4j.Logger; + +import scala.concurrent.Future; +import scala.concurrent.Promise; + +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A pending savepoint is like a pending checkpoint, but it additionally performs some + * actions upon completion, like notifying the triggerer. + */ +public class PendingSavepoint extends PendingCheckpoint { + + private static final Logger LOG = CheckpointCoordinator.LOG; + + private final SavepointStore store; + + /** The promise to fulfill once the savepoint is complete */ + private final Promise<String> onCompletionPromise; + + // -------------------------------------------------------------------------------------------- + + public PendingSavepoint( + JobID jobId, + long checkpointId, + long checkpointTimestamp, + Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm, + ClassLoader userCodeClassLoader, + SavepointStore store) + { + super(jobId, checkpointId, checkpointTimestamp, verticesToConfirm, userCodeClassLoader, false); + + this.store = checkNotNull(store); + this.onCompletionPromise = new scala.concurrent.impl.Promise.DefaultPromise<>(); + } + + // -------------------------------------------------------------------------------------------- + // Savepoint completion + // -------------------------------------------------------------------------------------------- + + public Future<String> getCompletionFuture() { + return onCompletionPromise.future(); + } + + @Override + public CompletedCheckpoint finalizeCheckpoint() throws Exception { + // finalize checkpoint (this also disposes this pending checkpoint) + CompletedCheckpoint completedCheckpoint = super.finalizeCheckpoint(); + + // now store the checkpoint externally as a savepoint + try { + Savepoint savepoint = new SavepointV0( + completedCheckpoint.getCheckpointID(), + completedCheckpoint.getTaskStates().values()); + + String path = store.storeSavepoint(savepoint); + onCompletionPromise.success(path); + } + catch (Throwable t) { + LOG.warn("Failed to store savepoint.", t); + onCompletionPromise.failure(t); + + ExceptionUtils.rethrow(t, "Failed to store savepoint."); + } + + return completedCheckpoint; + } + + // ------------------------------------------------------------------------ + // Cancellation / Disposal + // ------------------------------------------------------------------------ + + @Override + public boolean canBeSubsumed() { + return false; + } + + @Override + public void abortSubsumed() throws Exception { + throw new Exception("Bug: Savepoints must never be subsumed"); + } + + @Override + public void abortExpired() throws Exception { + try { + LOG.info("Savepoint with checkpoint ID " + getCheckpointId() + " expired before completing."); + onCompletionPromise.failure(new Exception("Savepoint expired before completing")); + } + finally { + dispose(true); + } + } + + @Override + public void abortDeclined() throws Exception { + try { + LOG.info("Savepoint with checkpoint ID " + getCheckpointId() + " was declined (tasks not ready)."); + onCompletionPromise.failure(new Exception("Savepoint was declined (tasks not ready)")); + } + finally { + dispose(true); + } + } + + @Override + public void abortError(Throwable cause) throws Exception { + try { + LOG.info("Savepoint with checkpoint ID " + getCheckpointId() + " failed due to an error", cause); + onCompletionPromise.failure( + new Exception("Savepoint could not be completed: " + cause.getMessage(), cause)); + } + finally { + dispose(true); + } + } + + // -------------------------------------------------------------------------------------------- + + @Override + public String toString() { + return String.format("Pending Savepoint %d @ %d - confirmed=%d, pending=%d", + getCheckpointId(), getCheckpointTimestamp(), + getNumberOfAcknowledgedTasks(), getNumberOfNonAcknowledgedTasks()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java deleted file mode 100644 index cd9eb0c..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java +++ /dev/null @@ -1,386 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.savepoint; - -import akka.actor.ActorSystem; -import akka.actor.Props; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; -import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; -import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; -import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; -import org.apache.flink.runtime.checkpoint.SubtaskState; -import org.apache.flink.runtime.checkpoint.TaskState; -import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.Execution; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.RecoveryMode; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.util.SerializedValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.Future; -import scala.concurrent.Promise; - -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * The savepoint coordinator is a slightly modified variant of the regular - * checkpoint coordinator. Checkpoints are not triggered periodically, but - * manually. The actual checkpointing mechanism is the same as for periodic - * checkpoints, only the control flow is modified. - * - * <p>The savepoint coordinator is meant to be used as a separate coordinator - * instance. Otherwise, there can be unwanted queueing effects like discarding - * savepoints, because of in-progress periodic checkpoints. - * - * <p>The savepoint coordinator registers callbacks on the regular checkpoint - * life-cycle and manages a map of promises, which are completed/failed as soon - * as the trigged checkpoint is done. - * - * <p><strong>Important</strong>: it's necessary that both the periodic - * checkpoint coordinator and the savepoint coordinator <em>share</em> the same - * instance of the {@link CheckpointIDCounter} to ensure that all task managers - * see ascending checkpoints IDs. - */ -public class SavepointCoordinator extends CheckpointCoordinator { - - private static final Logger LOG = LoggerFactory.getLogger(SavepointCoordinator.class); - - /** Store for savepoints. */ - private final SavepointStore savepointStore; - - /** Mapping from checkpoint ID to promises for savepoints. */ - private final Map<Long, Promise<String>> savepointPromises; - - // TODO(uce) Temporary work around to restore initial state on - // failure during recovery. Will be superseded by FLINK-3397. - private volatile String savepointRestorePath; - - public SavepointCoordinator( - JobID jobId, - long baseInterval, - long checkpointTimeout, - int numberKeyGroups, - ExecutionVertex[] tasksToTrigger, - ExecutionVertex[] tasksToWaitFor, - ExecutionVertex[] tasksToCommitTo, - ClassLoader userClassLoader, - CheckpointIDCounter checkpointIDCounter, - SavepointStore savepointStore, - CheckpointStatsTracker statsTracker) throws Exception { - - super(jobId, - baseInterval, - checkpointTimeout, - 0L, - Integer.MAX_VALUE, - numberKeyGroups, - tasksToTrigger, - tasksToWaitFor, - tasksToCommitTo, - userClassLoader, - checkpointIDCounter, - IgnoreCheckpointsStore.INSTANCE, - RecoveryMode.STANDALONE, - statsTracker); - - this.savepointStore = checkNotNull(savepointStore); - this.savepointPromises = new ConcurrentHashMap<>(); - } - - public String getSavepointRestorePath() { - return savepointRestorePath; - } - - // ------------------------------------------------------------------------ - // Savepoint trigger and reset - // ------------------------------------------------------------------------ - - /** - * Triggers a new savepoint using the current system time as the checkpoint timestamp. - */ - public Future<String> triggerSavepoint(long timestamp) throws Exception { - final Promise<String> promise = new scala.concurrent.impl.Promise.DefaultPromise<>(); - - try { - // Get the checkpoint ID up front. If we fail to trigger the checkpoint, - // the ID will have changed, but this is OK as long as the checkpoint ID - // generates ascending IDs. - final long checkpointId = getAndIncrementCheckpointId(); - - if (checkpointId == -1) { - throw new IllegalStateException("Failed to get checkpoint Id"); - } - - LOG.info("Triggering savepoint with ID " + checkpointId); - - // Important: make sure to add the promise to the map before calling - // any methods that might trigger callbacks, which require the promise. - // Otherwise, the might be race conditions. - if (savepointPromises.put(checkpointId, promise) == null) { - boolean success = false; - - try { - // All good. The future will be completed as soon as the - // triggered checkpoint is done. - success = triggerCheckpoint(timestamp, checkpointId); - } - finally { - if (!success) { - savepointPromises.remove(checkpointId); - promise.failure(new Exception("Failed to trigger savepoint")); - } - } - } - else { - throw new IllegalStateException("Duplicate checkpoint ID"); - } - } - catch (Throwable t) { - promise.failure(new Exception("Failed to trigger savepoint", t)); - } - - return promise.future(); - } - - /** - * Resets the state of {@link Execution} instances back to the state of a savepoint. - * - * <p>The execution vertices need to be in state {@link ExecutionState#CREATED} when calling - * this method. The operation might block. Make sure that calls don't block the job manager - * actor. - * - * @param tasks Tasks that will possibly be reset - * @param savepointPath The path of the savepoint to rollback to - * @throws IllegalStateException If coordinator is shut down - * @throws IllegalStateException If mismatch between program and savepoint state - * @throws Exception If savepoint store failure - */ - public void restoreSavepoint( - Map<JobVertexID, ExecutionJobVertex> tasks, - String savepointPath) throws Exception { - - checkNotNull(savepointPath, "Savepoint path"); - - synchronized (lock) { - if (isShutdown()) { - throw new IllegalStateException("CheckpointCoordinator is shut down"); - } - - LOG.info("Rolling back to savepoint '{}'.", savepointPath); - - Savepoint savepoint = savepointStore.loadSavepoint(savepointPath); - - for (TaskState taskState : savepoint.getTaskStates()) { - ExecutionJobVertex executionJobVertex = tasks.get(taskState.getJobVertexID()); - - if (executionJobVertex != null) { - if (executionJobVertex.getParallelism() != taskState.getParallelism()) { - String msg = String.format("Failed to rollback to savepoint %s. " + - "Parallelism mismatch between savepoint state and new program. " + - "Cannot map operator %s with parallelism %d to new program with " + - "parallelism %d. This indicates that the program has been changed " + - "in a non-compatible way after the savepoint.", - savepoint, - taskState.getJobVertexID(), - taskState.getParallelism(), - executionJobVertex.getParallelism()); - - throw new IllegalStateException(msg); - } - - List<Set<Integer>> keyGroupPartitions = createKeyGroupPartitions( - numberKeyGroups, - executionJobVertex.getParallelism()); - - for (int i = 0; i < executionJobVertex.getTaskVertices().length; i++) { - SubtaskState subtaskState = taskState.getState(i); - SerializedValue<StateHandle<?>> state = null; - - if (subtaskState != null) { - state = subtaskState.getState(); - } - - Map<Integer, SerializedValue<StateHandle<?>>> kvStateForTaskMap = taskState - .getUnwrappedKvStates(keyGroupPartitions.get(i)); - - Execution currentExecutionAttempt = executionJobVertex - .getTaskVertices()[i] - .getCurrentExecutionAttempt(); - - currentExecutionAttempt.setInitialState(state, kvStateForTaskMap); - } - } else { - String msg = String.format("Failed to rollback to savepoint %s. " + - "Cannot map old state for task %s to the new program. " + - "This indicates that the program has been changed in a " + - "non-compatible way after the savepoint.", savepointPath, - taskState.getJobVertexID()); - throw new IllegalStateException(msg); - } - } - - // Reset the checkpoint ID counter - long nextCheckpointId = savepoint.getCheckpointId() + 1; - checkpointIdCounter.start(); - checkpointIdCounter.setCount(nextCheckpointId); - LOG.info("Reset the checkpoint ID to {}", nextCheckpointId); - - if (savepointRestorePath == null) { - savepointRestorePath = savepointPath; - } - } - } - - // ------------------------------------------------------------------------ - // Checkpoint coordinator callbacks - // ------------------------------------------------------------------------ - - @Override - protected void onShutdown() { - // Fail all outstanding savepoint futures - for (Promise<String> promise : savepointPromises.values()) { - promise.failure(new Exception("Checkpoint coordinator shutdown")); - } - savepointPromises.clear(); - } - - @Override - protected void onCancelCheckpoint(long canceledCheckpointId) { - LOG.info("Cancelling savepoint with checkpoint ID " + canceledCheckpointId); - Promise<String> promise = savepointPromises.remove(canceledCheckpointId); - - if (promise != null) { - promise.failure(new Exception("Savepoint expired before completing")); - } - } - - @Override - protected void onFullyAcknowledgedCheckpoint(CompletedCheckpoint checkpoint) { - // Sanity check - Promise<String> promise = savepointPromises.remove(checkpoint.getCheckpointID()); - - if (promise == null) { - LOG.warn("Pending savepoint with ID " + checkpoint.getCheckpointID() + " has been " + - "removed before receiving acknowledgment."); - return; - } - - // Sanity check - if (promise.isCompleted()) { - throw new IllegalStateException("Savepoint promise completed"); - } - - try { - Savepoint savepoint = new SavepointV0( - checkpoint.getCheckpointID(), - checkpoint.getTaskStates().values()); - String path = savepointStore.storeSavepoint(savepoint); - promise.success(path); - } - catch (Exception e) { - LOG.warn("Failed to store savepoint.", e); - promise.failure(e); - } - } - - // ------------------------------------------------------------------------ - // Job status listener - // ------------------------------------------------------------------------ - - @Override - public ActorGateway createActivatorDeactivator( - ActorSystem actorSystem, - UUID leaderSessionID) { - - synchronized (lock) { - if (isShutdown()) { - throw new IllegalArgumentException("Checkpoint coordinator is shut down"); - } - - if (getJobStatusListener() == null) { - Props props = Props.create( - SavepointCoordinatorDeActivator.class, - this, - leaderSessionID); - - // wrap the ActorRef in a AkkaActorGateway to support message decoration - setJobStatusListener(new AkkaActorGateway( - actorSystem.actorOf(props), - leaderSessionID)); - } - - return getJobStatusListener(); - } - } - - // ------------------------------------------------------------------------ - // Completed checkpoints - // ------------------------------------------------------------------------ - - private static class IgnoreCheckpointsStore implements CompletedCheckpointStore { - - private static final CompletedCheckpointStore INSTANCE = new IgnoreCheckpointsStore(); - - @Override - public void recover() throws Exception { - } - - @Override - public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { - } - - @Override - public CompletedCheckpoint getLatestCheckpoint() throws Exception { - return null; - } - - @Override - public void shutdown() throws Exception { - } - - @Override - public void suspend() throws Exception { - } - - @Override - public List<CompletedCheckpoint> getAllCheckpoints() throws Exception { - return Collections.emptyList(); - } - - @Override - public int getNumberOfRetainedCheckpoints() { - return 0; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorDeActivator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorDeActivator.java deleted file mode 100644 index 7bd5797..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorDeActivator.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.savepoint; - -import org.apache.flink.runtime.akka.FlinkUntypedActor; -import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.messages.ExecutionGraphMessages; -import org.apache.flink.util.Preconditions; - -import java.util.UUID; - -/** - * This actor listens to changes in the JobStatus and deactivates the - * savepoint scheduler and discards all pending checkpoints. - */ -public class SavepointCoordinatorDeActivator extends FlinkUntypedActor { - - private final CheckpointCoordinator coordinator; - private final UUID leaderSessionID; - - public SavepointCoordinatorDeActivator( - SavepointCoordinator coordinator, - UUID leaderSessionID) { - - LOG.info("Create SavepointCoordinatorDeActivator"); - - this.coordinator = Preconditions.checkNotNull(coordinator, "The checkpointCoordinator must not be null."); - this.leaderSessionID = leaderSessionID; - } - - @Override - public void handleMessage(Object message) throws Exception { - if (message instanceof ExecutionGraphMessages.JobStatusChanged) { - JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus(); - - if (status != JobStatus.RUNNING) { - // anything other than RUNNING should stop the trigger for now - coordinator.stopCheckpointScheduler(); - } - } - } - - @Override - public UUID getLeaderSessionID() { - return leaderSessionID; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java new file mode 100644 index 0000000..0b7b0c2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.savepoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The SavepointLoader is a utility to load and verify a Savepoint, and to create a checkpoint from it. + */ +public class SavepointLoader { + + /** + * Loads a savepoint back as a {@link CompletedCheckpoint}. + * + * <p>This method verifies that tasks and parallelism still match the savepoint parameters. + * + * @param jobId The JobID of the job to load the savepoint for. + * @param tasks Tasks that will possibly be reset + * @param savepointStore The store that holds the savepoint. + * @param savepointPath The path of the savepoint to rollback to + * + * @throws IllegalStateException If mismatch between program and savepoint state + * @throws Exception If savepoint store failure + */ + public static CompletedCheckpoint loadAndValidateSavepoint( + JobID jobId, + Map<JobVertexID, ExecutionJobVertex> tasks, + SavepointStore savepointStore, + String savepointPath) throws Exception { + + // (1) load the savepoint + Savepoint savepoint = savepointStore.loadSavepoint(checkNotNull(savepointPath)); + final Map<JobVertexID, TaskState> taskStates = new HashMap<>(savepoint.getTaskStates().size()); + + // (2) validate it (parallelism, etc) + for (TaskState taskState : savepoint.getTaskStates()) { + ExecutionJobVertex executionJobVertex = tasks.get(taskState.getJobVertexID()); + + if (executionJobVertex != null) { + if (executionJobVertex.getParallelism() == taskState.getParallelism()) { + taskStates.put(taskState.getJobVertexID(), taskState); + } + else { + String msg = String.format("Failed to rollback to savepoint %s. " + + "Parallelism mismatch between savepoint state and new program. " + + "Cannot map operator %s with parallelism %d to new program with " + + "parallelism %d. This indicates that the program has been changed " + + "in a non-compatible way after the savepoint.", + savepoint, + taskState.getJobVertexID(), + taskState.getParallelism(), + executionJobVertex.getParallelism()); + + throw new IllegalStateException(msg); + } + } else { + String msg = String.format("Failed to rollback to savepoint %s. " + + "Cannot map old state for task %s to the new program. " + + "This indicates that the program has been changed in a " + + "non-compatible way after the savepoint.", + savepointPath, taskState.getJobVertexID()); + throw new IllegalStateException(msg); + } + } + + // (3) convert to checkpoint so the system can fall back to it + return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L, taskStates, false); + } + + // ------------------------------------------------------------------------ + + private SavepointLoader() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index e6ae6ce..82826dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -37,7 +37,6 @@ import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator; import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; import org.apache.flink.runtime.execution.ExecutionState; @@ -215,9 +214,6 @@ public class ExecutionGraph { /** The coordinator for checkpoints, if snapshot checkpoints are enabled */ private CheckpointCoordinator checkpointCoordinator; - /** The coordinator for savepoints, if snapshot checkpoints are enabled */ - private transient SavepointCoordinator savepointCoordinator; - /** Checkpoint stats tracker separate from the coordinator in order to be * available after archiving. */ private CheckpointStatsTracker checkpointStatsTracker; @@ -226,7 +222,7 @@ public class ExecutionGraph { private ExecutionContext executionContext; /** Registered KvState instances reported by the TaskManagers. */ - private transient KvStateLocationRegistry kvStateLocationRegistry; + private KvStateLocationRegistry kvStateLocationRegistry; // ------ Fields that are only relevant for archived execution graphs ------------ private String jsonPlan; @@ -392,6 +388,7 @@ public class ExecutionGraph { userClassLoader, checkpointIDCounter, checkpointStore, + savepointStore, recoveryMode, checkpointStatsTracker); @@ -399,25 +396,6 @@ public class ExecutionGraph { // job status changes (running -> on, all other states -> off) registerJobStatusListener( checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID)); - - // Savepoint Coordinator - savepointCoordinator = new SavepointCoordinator( - jobID, - interval, - checkpointTimeout, - numberKeyGroups, - tasksToTrigger, - tasksToWaitFor, - tasksToCommitTo, - userClassLoader, - // Important: this counter needs to be shared with the periodic - // checkpoint coordinator. - checkpointIDCounter, - savepointStore, - checkpointStatsTracker); - - registerJobStatusListener(savepointCoordinator - .createActivatorDeactivator(actorSystem, leaderSessionID)); } /** @@ -432,25 +410,16 @@ public class ExecutionGraph { } if (checkpointCoordinator != null) { - checkpointCoordinator.shutdown(); + checkpointCoordinator.suspend(); checkpointCoordinator = null; checkpointStatsTracker = null; } - - if (savepointCoordinator != null) { - savepointCoordinator.shutdown(); - savepointCoordinator = null; - } } public CheckpointCoordinator getCheckpointCoordinator() { return checkpointCoordinator; } - public SavepointCoordinator getSavepointCoordinator() { - return savepointCoordinator; - } - public KvStateLocationRegistry getKvStateLocationRegistry() { return kvStateLocationRegistry; } @@ -924,17 +893,7 @@ public class ExecutionGraph { // if we have checkpointed state, reload it into the executions if (checkpointCoordinator != null) { - boolean restored = checkpointCoordinator - .restoreLatestCheckpointedState(getAllVertices(), false, false); - - // TODO(uce) Temporary work around to restore initial state on - // failure during recovery. Will be superseded by FLINK-3397. - if (!restored && savepointCoordinator != null) { - String savepointPath = savepointCoordinator.getSavepointRestorePath(); - if (savepointPath != null) { - savepointCoordinator.restoreSavepoint(getAllVertices(), savepointPath); - } - } + checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false); } } @@ -961,33 +920,6 @@ public class ExecutionGraph { } /** - * Restores the execution state back to a savepoint. - * - * <p>The execution vertices need to be in state {@link ExecutionState#CREATED} when calling - * this method. The operation might block. Make sure that calls don't block the job manager - * actor. - * - * @param savepointPath The path of the savepoint to rollback to. - * @throws IllegalStateException If checkpointing is disabled - * @throws IllegalStateException If checkpoint coordinator is shut down - * @throws Exception If failure during rollback - */ - public void restoreSavepoint(String savepointPath) throws Exception { - synchronized (progressLock) { - if (savepointCoordinator != null) { - LOG.info("Restoring savepoint: " + savepointPath + "."); - - savepointCoordinator.restoreSavepoint( - getAllVertices(), savepointPath); - } - else { - // Sanity check - throw new IllegalStateException("Checkpointing disabled."); - } - } - } - - /** * This method cleans fields that are irrelevant for the archived execution attempt. */ public void prepareForArchiving() { @@ -1000,6 +932,7 @@ public class ExecutionGraph { scheduler = null; checkpointCoordinator = null; executionContext = null; + kvStateLocationRegistry = null; for (ExecutionJobVertex vertex : verticesInCreationOrder) { vertex.prepareForArchiving(); @@ -1131,21 +1064,6 @@ public class ExecutionGraph { } catch (Exception e) { LOG.error("Error while cleaning up after execution", e); } - - try { - CheckpointCoordinator coord = this.savepointCoordinator; - this.savepointCoordinator = null; - - if (coord != null) { - if (state.isGloballyTerminalState()) { - coord.shutdown(); - } else { - coord.suspend(); - } - } - } catch (Exception e) { - LOG.error("Error while cleaning up after execution", e); - } } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 01af3c1..9fb01bf 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -41,7 +41,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.checkpoint._ -import org.apache.flink.runtime.checkpoint.savepoint.{SavepointStoreFactory, SavepointStore} +import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStoreFactory, SavepointStore} import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, SimpleCheckpointStatsTracker, DisabledCheckpointStatsTracker} import org.apache.flink.runtime.client._ import org.apache.flink.runtime.execution.SuppressRestartsException @@ -290,7 +290,7 @@ class JobManager( // failsafe shutdown of the metrics registry try { - metricsRegistry.map(_.shutdown()) + metricsRegistry.foreach(_.shutdown()) } catch { case t: Exception => log.error("MetricRegistry did not shutdown properly.", t) } @@ -686,9 +686,9 @@ class JobManager( case TriggerSavepoint(jobId) => currentJobs.get(jobId) match { case Some((graph, _)) => - val savepointCoordinator = graph.getSavepointCoordinator() + val checkpointCoordinator = graph.getCheckpointCoordinator() - if (savepointCoordinator != null) { + if (checkpointCoordinator != null) { // Immutable copy for the future val senderRef = sender() @@ -696,7 +696,7 @@ class JobManager( try { // Do this async, because checkpoint coordinator operations can // contain blocking calls to the state backend or ZooKeeper. - val savepointFuture = savepointCoordinator.triggerSavepoint( + val savepointFuture = checkpointCoordinator.triggerSavepoint( System.currentTimeMillis()) savepointFuture.onComplete { @@ -1294,17 +1294,37 @@ class JobManager( // because it is a blocking operation future { try { + if (isRecovery) { + // this is a recovery of a master failure (this master takes over) executionGraph.restoreLatestCheckpointedState() - } else { + } + else { + // load a savepoint only if this is not starting from a newer checkpoint + // as part of an master failure recovery val snapshotSettings = jobGraph.getSnapshotSettings if (snapshotSettings != null) { val savepointPath = snapshotSettings.getSavepointPath() - - // Reset state back to savepoint + if (savepointPath != null) { + // got a savepoint try { - executionGraph.restoreSavepoint(savepointPath) + log.info(s"Starting job from savepoint '$savepointPath'.") + + // load the savepoint as a checkpoint into the system + val savepoint: CompletedCheckpoint = SavepointLoader.loadAndValidateSavepoint( + jobId, executionGraph.getAllVertices, savepointStore, savepointPath) + + executionGraph.getCheckpointCoordinator.getCheckpointStore + .addCheckpoint(savepoint) + + // Reset the checkpoint ID counter + val nextCheckpointId: Long = savepoint.getCheckpointID + 1 + log.info(s"Reset the checkpoint ID to $nextCheckpointId") + executionGraph.getCheckpointCoordinator.getCheckpointIdCounter + .setCount(nextCheckpointId) + + executionGraph.restoreLatestCheckpointedState() } catch { case e: Exception => throw new SuppressRestartsException(e) @@ -1367,21 +1387,13 @@ class JobManager( currentJobs.get(jid) match { case Some((graph, _)) => val checkpointCoordinator = graph.getCheckpointCoordinator() - val savepointCoordinator = graph.getSavepointCoordinator() - if (checkpointCoordinator != null && savepointCoordinator != null) { + if (checkpointCoordinator != null) { future { try { - if (checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) { - // OK, this is the common case - } - else { - // Try the savepoint coordinator if the message was not addressed - // to the periodic checkpoint coordinator. - if (!savepointCoordinator.receiveAcknowledgeMessage(ackMessage)) { - log.info("Received message for non-existing checkpoint " + - ackMessage.getCheckpointId) - } + if (!checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) { + log.info("Received message for non-existing checkpoint " + + ackMessage.getCheckpointId) } } catch { @@ -1404,21 +1416,13 @@ class JobManager( currentJobs.get(jid) match { case Some((graph, _)) => val checkpointCoordinator = graph.getCheckpointCoordinator() - val savepointCoordinator = graph.getSavepointCoordinator() - if (checkpointCoordinator != null && savepointCoordinator != null) { + if (checkpointCoordinator != null) { future { try { if (checkpointCoordinator.receiveDeclineMessage(declineMessage)) { - // OK, this is the common case - } - else { - // Try the savepoint coordinator if the message was not addressed - // to the periodic checkpoint coordinator. - if (!savepointCoordinator.receiveDeclineMessage(declineMessage)) { - log.info("Received message for non-existing checkpoint " + - declineMessage.getCheckpointId) - } + log.info("Received message for non-existing checkpoint " + + declineMessage.getCheckpointId) } } catch { http://git-wip-us.apache.org/repos/asf/flink/blob/76ca1a79/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java new file mode 100644 index 0000000..5772fae --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class CheckpointPropertiesTest { + + @Test + public void testCheckpointProperties() { + assertFalse(CheckpointProperties.forStandardCheckpoint().isSavepoint()); + } + + @Test + public void testSavepointProperties() { + assertTrue(CheckpointProperties.forStandardSavepoint().isSavepoint()); + } +}
