[FLINK-2354] [runtime] Add job graph and checkpoint recovery This closes #1153.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73c73e92 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73c73e92 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73c73e92 Branch: refs/heads/master Commit: 73c73e92750ab8fb068d0a3cb37afcb642084fc0 Parents: 3aaee1e Author: Ufuk Celebi <[email protected]> Authored: Tue Sep 1 17:25:46 2015 +0200 Committer: Till Rohrmann <[email protected]> Committed: Tue Oct 20 00:16:51 2015 +0200 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 21 + .../checkpoint/CheckpointCoordinator.java | 223 +++--- .../runtime/checkpoint/CheckpointIDCounter.java | 43 ++ .../checkpoint/CheckpointRecoveryFactory.java | 61 ++ .../runtime/checkpoint/CompletedCheckpoint.java | 81 +++ .../checkpoint/CompletedCheckpointStore.java | 69 ++ .../runtime/checkpoint/PendingCheckpoint.java | 6 +- .../StandaloneCheckpointIDCounter.java | 47 ++ .../StandaloneCheckpointRecoveryFactory.java | 52 ++ .../StandaloneCompletedCheckpointStore.java | 100 +++ .../flink/runtime/checkpoint/StateForTask.java | 21 +- .../checkpoint/SuccessfulCheckpoint.java | 82 --- .../ZooKeeperCheckpointIDCounter.java | 130 ++++ .../ZooKeeperCheckpointRecoveryFactory.java | 66 ++ .../ZooKeeperCompletedCheckpointStore.java | 293 ++++++++ .../runtime/executiongraph/ExecutionGraph.java | 51 +- .../apache/flink/runtime/jobgraph/JobGraph.java | 5 + .../flink/runtime/jobmanager/RecoveryMode.java | 13 + .../StandaloneSubmittedJobGraphStore.java | 65 ++ .../runtime/jobmanager/SubmittedJobGraph.java | 77 ++ .../jobmanager/SubmittedJobGraphStore.java | 93 +++ .../ZooKeeperSubmittedJobGraphStore.java | 379 ++++++++++ .../leaderelection/LeaderElectionService.java | 1 + .../ZooKeeperLeaderElectionService.java | 1 + .../flink/runtime/state/StateBackend.java | 39 + .../state/StateHandleProviderFactory.java | 61 ++ .../flink/runtime/util/LeaderElectionUtils.java | 67 -- .../flink/runtime/util/ZooKeeperUtils.java | 138 +++- .../zookeeper/ZooKeeperStateHandleStore.java | 384 ++++++++++ .../flink/runtime/jobmanager/JobInfo.scala | 25 +- .../flink/runtime/jobmanager/JobManager.scala | 594 ++++++++++----- .../runtime/messages/JobManagerMessages.scala | 16 + .../runtime/minicluster/FlinkMiniCluster.scala | 4 +- .../checkpoint/CheckpointCoordinatorTest.java | 190 ++--- .../checkpoint/CheckpointIDCounterTest.java | 194 +++++ .../checkpoint/CheckpointStateRestoreTest.java | 19 +- .../CompletedCheckpointStoreTest.java | 297 ++++++++ .../StandaloneCompletedCheckpointStoreTest.java | 33 + ...ZooKeeperCompletedCheckpointStoreITCase.java | 101 +++ .../BlobLibraryCacheManagerTest.java | 4 + .../PartitionRequestClientFactoryTest.java | 2 +- ...ManagerSubmittedJobGraphsRecoveryITCase.java | 460 ++++++++++++ .../StandaloneSubmittedJobGraphStoreTest.java | 53 ++ .../ZooKeeperSubmittedJobGraphsStoreITCase.java | 283 ++++++++ .../JobManagerLeaderElectionTest.java | 27 +- .../TestingLeaderElectionService.java | 4 + .../ZooKeeperLeaderRetrievalTest.java | 21 +- .../messages/CheckpointMessagesTest.java | 2 +- .../runtime/taskmanager/TaskCancelTest.java | 67 +- .../runtime/testutils/CommonTestUtils.java | 75 +- .../testutils/JobManagerActorTestUtils.java | 166 +++++ .../runtime/testutils/JobManagerProcess.java | 226 ++++++ .../runtime/testutils/TaskManagerProcess.java | 133 ++++ .../flink/runtime/testutils/TestJvmProcess.java | 267 +++++++ .../runtime/testutils/ZooKeeperTestUtils.java | 94 +++ .../ZooKeeperStateHandleStoreITCase.java | 591 +++++++++++++++ .../zookeeper/ZooKeeperTestEnvironment.java | 133 ++++ .../ExecutionGraphRestartTest.scala | 21 +- .../runtime/testingUtils/TestingCluster.scala | 44 +- .../testingUtils/TestingJobManager.scala | 32 +- .../streaming/runtime/tasks/StreamTask.java | 2 +- .../test/util/ForkableFlinkMiniCluster.scala | 36 +- .../checkpointing/StateCheckpoinedITCase.java | 391 ---------- .../checkpointing/StateCheckpointedITCase.java | 391 ++++++++++ ...tJobManagerProcessFailureRecoveryITCase.java | 289 ++++++++ .../AbstractProcessFailureRecoveryTest.java | 444 ------------ ...ctTaskManagerProcessFailureRecoveryTest.java | 397 +++++++++++ .../flink/test/recovery/ChaosMonkeyITCase.java | 713 +++++++++++++++++++ .../JobManagerCheckpointRecoveryITCase.java | 395 ++++++++++ ...anagerProcessFailureBatchRecoveryITCase.java | 140 ++++ .../ProcessFailureBatchRecoveryITCase.java | 115 --- .../recovery/ProcessFailureCancelingITCase.java | 4 +- .../ProcessFailureStreamingRecoveryITCase.java | 234 ------ ...anagerProcessFailureBatchRecoveryITCase.java | 115 +++ ...erProcessFailureStreamingRecoveryITCase.java | 234 ++++++ .../ZooKeeperLeaderElectionITCase.java | 61 +- .../org/apache/flink/yarn/YarnJobManager.scala | 11 +- 77 files changed, 8901 insertions(+), 1918 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 5d6f1c7..be730a0 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -407,6 +407,12 @@ public final class ConfigConstants { */ public static final String STATE_BACKEND = "state.backend"; + /** + * File system state backend base path for recoverable state handles. Recovery state is written + * to this path and the file state handles are persisted for recovery. + */ + public static final String STATE_BACKEND_FS_RECOVERY_PATH = "state.backend.fs.dir.recovery"; + // ----------------------------- Miscellaneous ---------------------------- /** @@ -433,6 +439,15 @@ public final class ConfigConstants { public static final String ZOOKEEPER_LEADER_PATH = "ha.zookeeper.dir.leader"; + /** ZooKeeper root path (ZNode) for job graphs. */ + public static final String ZOOKEEPER_JOBGRAPHS_PATH = "ha.zookeeper.dir.jobgraphs"; + + /** ZooKeeper root path (ZNode) for completed checkpoints. */ + public static final String ZOOKEEPER_CHECKPOINTS_PATH = "ha.zookeeper.dir.checkpoints"; + + /** ZooKeeper root path (ZNode) for checkpoint counters. */ + public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "ha.zookeeper.dir.checkpoint-counter"; + public static final String ZOOKEEPER_SESSION_TIMEOUT = "ha.zookeeper.client.session-timeout"; public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "ha.zookeeper.client.connection-timeout"; @@ -699,6 +714,12 @@ public final class ConfigConstants { public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader"; + public static final String DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH = "/jobgraphs"; + + public static final String DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH = "/checkpoints"; + + public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "/checkpoint-counter"; + public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 60000; public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT = 15000; http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 8f0b19b..fdb59d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -21,16 +21,16 @@ package org.apache.flink.runtime.checkpoint; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; - import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; @@ -38,7 +38,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; @@ -48,13 +47,19 @@ import java.util.Timer; import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; /** * The checkpoint coordinator coordinates the distributed snapshots of operators and state. * It triggers the checkpoint by sending the messages to the relevant tasks and collects the * checkpoint acknowledgements. It also collects and maintains the overview of the state handles * reported by the tasks that acknowledge the checkpoint. + * + * <p>Depending on the configured {@link RecoveryMode}, the behaviour of the {@link + * CompletedCheckpointStore} and {@link CheckpointIDCounter} change. The default standalone + * implementations don't support any recovery. */ public class CheckpointCoordinator { @@ -79,12 +84,20 @@ public class CheckpointCoordinator { private final ExecutionVertex[] tasksToCommitTo; private final Map<Long, PendingCheckpoint> pendingCheckpoints; - - private final ArrayDeque<SuccessfulCheckpoint> completedCheckpoints; + + /** + * Completed checkpoints. Implementations can be blocking. Make sure calls to methods + * accessing this don't block the job manager actor and run asynchronously. + */ + private final CompletedCheckpointStore completedCheckpointStore; private final ArrayDeque<Long> recentPendingCheckpoints; - private final AtomicLong checkpointIdCounter = new AtomicLong(1); + /** + * Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these + * need to be ascending across job managers. + */ + private final CheckpointIDCounter checkpointIdCounter; private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(); @@ -93,8 +106,6 @@ public class CheckpointCoordinator { private final long checkpointTimeout; - private final int numSuccessfulCheckpointsToRetain; - private TimerTask periodicScheduler; private ActorGateway jobStatusListener; @@ -110,61 +121,62 @@ public class CheckpointCoordinator { public CheckpointCoordinator( JobID job, - int numSuccessfulCheckpointsToRetain, long checkpointTimeout, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, ExecutionVertex[] tasksToCommitTo, - ClassLoader userClassLoader) { + ClassLoader userClassLoader, + CheckpointIDCounter checkpointIDCounter, + CompletedCheckpointStore completedCheckpointStore, + RecoveryMode recoveryMode) throws Exception { - // some sanity checks - if (job == null || tasksToTrigger == null || - tasksToWaitFor == null || tasksToCommitTo == null) { - throw new NullPointerException(); - } - if (numSuccessfulCheckpointsToRetain < 1) { - throw new IllegalArgumentException("Must retain at least one successful checkpoint"); - } - if (checkpointTimeout < 1) { - throw new IllegalArgumentException("Checkpoint timeout must be larger than zero"); - } + // Sanity check + checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero"); - this.job = job; - this.numSuccessfulCheckpointsToRetain = numSuccessfulCheckpointsToRetain; + this.job = checkNotNull(job); this.checkpointTimeout = checkpointTimeout; - this.tasksToTrigger = tasksToTrigger; - this.tasksToWaitFor = tasksToWaitFor; - this.tasksToCommitTo = tasksToCommitTo; + this.tasksToTrigger = checkNotNull(tasksToTrigger); + this.tasksToWaitFor = checkNotNull(tasksToWaitFor); + this.tasksToCommitTo = checkNotNull(tasksToCommitTo); this.pendingCheckpoints = new LinkedHashMap<Long, PendingCheckpoint>(); - this.completedCheckpoints = new ArrayDeque<SuccessfulCheckpoint>(numSuccessfulCheckpointsToRetain + 1); + this.completedCheckpointStore = checkNotNull(completedCheckpointStore); this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS); this.userClassLoader = userClassLoader; + this.checkpointIdCounter = checkNotNull(checkpointIDCounter); + checkpointIDCounter.start(); - timer = new Timer("Checkpoint Timer", true); + this.timer = new Timer("Checkpoint Timer", true); - // Add shutdown hook to clean up state handles - shutdownHook = new Thread(new Runnable() { - @Override - public void run() { - try { - CheckpointCoordinator.this.shutdown(); - } - catch (Throwable t) { - LOG.error("Error during shutdown of blob service via JVM shutdown hook: " + - t.getMessage(), t); + if (recoveryMode == RecoveryMode.STANDALONE) { + // Add shutdown hook to clean up state handles when no checkpoint recovery is + // possible. In case of another configured recovery mode, the checkpoints need to be + // available for the standby job managers. + this.shutdownHook = new Thread(new Runnable() { + @Override + public void run() { + try { + CheckpointCoordinator.this.shutdown(); + } + catch (Throwable t) { + LOG.error("Error during shutdown of checkpoint coordniator via " + + "JVM shutdown hook: " + t.getMessage(), t); + } } - } - }); + }); - try { - // Add JVM shutdown hook to call shutdown of service - Runtime.getRuntime().addShutdownHook(shutdownHook); - } - catch (IllegalStateException ignored) { - // JVM is already shutting down. No need to do anything. + try { + // Add JVM shutdown hook to call shutdown of service + Runtime.getRuntime().addShutdownHook(shutdownHook); + } + catch (IllegalStateException ignored) { + // JVM is already shutting down. No need to do anything. + } + catch (Throwable t) { + LOG.error("Cannot register checkpoint coordinator shutdown hook.", t); + } } - catch (Throwable t) { - LOG.error("Cannot register checkpoint coordinator shutdown hook.", t); + else { + this.shutdownHook = null; } } @@ -178,41 +190,39 @@ public class CheckpointCoordinator { * After this method has been called, the coordinator does not accept and further * messages and cannot trigger any further checkpoints. */ - public void shutdown() { + public void shutdown() throws Exception { synchronized (lock) { - try { - if (shutdown) { - return; - } - shutdown = true; - LOG.info("Stopping checkpoint coordinator for job " + job); - - // shut down the thread that handles the timeouts - timer.cancel(); - - // make sure that the actor does not linger - if (jobStatusListener != null) { - jobStatusListener.tell(PoisonPill.getInstance()); - jobStatusListener = null; - } - - // the scheduling thread needs also to go away - if (periodicScheduler != null) { - periodicScheduler.cancel(); - periodicScheduler = null; - } - - // clear and discard all pending checkpoints - for (PendingCheckpoint pending : pendingCheckpoints.values()) { - pending.discard(userClassLoader, true); - } - pendingCheckpoints.clear(); - - // clean and discard all successful checkpoints - for (SuccessfulCheckpoint checkpoint : completedCheckpoints) { - checkpoint.discard(userClassLoader); + try { + if (!shutdown) { + shutdown = true; + LOG.info("Stopping checkpoint coordinator for job " + job); + + // shut down the thread that handles the timeouts + timer.cancel(); + + // make sure that the actor does not linger + if (jobStatusListener != null) { + jobStatusListener.tell(PoisonPill.getInstance()); + jobStatusListener = null; + } + + // the scheduling thread needs also to go away + if (periodicScheduler != null) { + periodicScheduler.cancel(); + periodicScheduler = null; + } + + checkpointIdCounter.stop(); + + // clear and discard all pending checkpoints + for (PendingCheckpoint pending : pendingCheckpoints.values()) { + pending.discard(userClassLoader, true); + } + pendingCheckpoints.clear(); + + // clean and discard all successful checkpoints + completedCheckpointStore.discardAllCheckpoints(); } - completedCheckpoints.clear(); } finally { // Remove shutdown hook to prevent resource leaks, unless this is invoked by the @@ -244,7 +254,7 @@ public class CheckpointCoordinator { * Triggers a new checkpoint and uses the current system time as the * checkpoint time. */ - public void triggerCheckpoint() { + public void triggerCheckpoint() throws Exception { triggerCheckpoint(System.currentTimeMillis()); } @@ -254,7 +264,7 @@ public class CheckpointCoordinator { * * @param timestamp The timestamp for the checkpoint. */ - public boolean triggerCheckpoint(final long timestamp) { + public boolean triggerCheckpoint(final long timestamp) throws Exception { if (shutdown) { LOG.error("Cannot trigger checkpoint, checkpoint coordinator has been shutdown."); return false; @@ -354,7 +364,7 @@ public class CheckpointCoordinator { } } - public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message) { + public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws Exception { if (shutdown || message == null) { return; } @@ -365,7 +375,7 @@ public class CheckpointCoordinator { final long checkpointId = message.getCheckpointId(); - SuccessfulCheckpoint completed = null; + CompletedCheckpoint completed = null; PendingCheckpoint checkpoint; synchronized (lock) { // we need to check inside the lock for being shutdown as well, otherwise we @@ -380,13 +390,13 @@ public class CheckpointCoordinator { if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState())) { if (checkpoint.isFullyAcknowledged()) { + completed = checkpoint.toCompletedCheckpoint(); + + completedCheckpointStore.addCheckpoint(completed); + LOG.info("Completed checkpoint " + checkpointId); + LOG.debug(completed.getStates().toString()); - completed = checkpoint.toCompletedCheckpoint(); - completedCheckpoints.addLast(completed); - if (completedCheckpoints.size() > numSuccessfulCheckpointsToRetain) { - completedCheckpoints.removeFirst().discard(userClassLoader); - } pendingCheckpoints.remove(checkpointId); rememberRecentCheckpointId(checkpointId); @@ -456,25 +466,30 @@ public class CheckpointCoordinator { // Checkpoint State Restoring // -------------------------------------------------------------------------------------------- - public void restoreLatestCheckpointedState(Map<JobVertexID, ExecutionJobVertex> tasks, - boolean errorIfNoCheckpoint, - boolean allOrNothingState) throws Exception { + public void restoreLatestCheckpointedState( + Map<JobVertexID, ExecutionJobVertex> tasks, + boolean errorIfNoCheckpoint, + boolean allOrNothingState) throws Exception { + synchronized (lock) { if (shutdown) { throw new IllegalStateException("CheckpointCoordinator is shut down"); } - - if (completedCheckpoints.isEmpty()) { + + // Recover the checkpoints + completedCheckpointStore.recover(); + + // restore from the latest checkpoint + CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(); + + if (latest == null) { if (errorIfNoCheckpoint) { throw new IllegalStateException("No completed checkpoint available"); } else { return; } } - - // restore from the latest checkpoint - SuccessfulCheckpoint latest = completedCheckpoints.getLast(); - + if (allOrNothingState) { Map<ExecutionJobVertex, Integer> stateCounts = new HashMap<ExecutionJobVertex, Integer>(); @@ -519,7 +534,9 @@ public class CheckpointCoordinator { } public int getNumberOfRetainedSuccessfulCheckpoints() { - return this.completedCheckpoints.size(); + synchronized (lock) { + return completedCheckpointStore.getNumberOfRetainedCheckpoints(); + } } public Map<Long, PendingCheckpoint> getPendingCheckpoints() { @@ -528,9 +545,9 @@ public class CheckpointCoordinator { } } - public List<SuccessfulCheckpoint> getSuccessfulCheckpoints() { + public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws Exception { synchronized (lock) { - return new ArrayList<SuccessfulCheckpoint>(this.completedCheckpoints); + return completedCheckpointStore.getAllCheckpoints(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java new file mode 100644 index 0000000..34b7946 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +/** + * A checkpoint ID counter. + */ +public interface CheckpointIDCounter { + + /** + * Starts the {@link CheckpointIDCounter} service. + */ + void start() throws Exception; + + /** + * Stops the {@link CheckpointIDCounter} service. + */ + void stop() throws Exception; + + /** + * Atomically increments the current checkpoint ID. + * + * @return The previous checkpoint ID + */ + long getAndIncrement() throws Exception; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java new file mode 100644 index 0000000..aa6e94b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; + +/** + * A factory for per Job checkpoint recovery components. + */ +public interface CheckpointRecoveryFactory { + + /** + * The number of {@link CompletedCheckpoint} instances to retain. + */ + int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1; + + /** + * Starts the {@link CheckpointRecoveryFactory} service. + */ + void start(); + + /** + * Stops the {@link CheckpointRecoveryFactory} service. + */ + void stop(); + + /** + * Creates a {@link CompletedCheckpointStore} instance for a job. + * + * @param jobId Job ID to recover checkpoints for + * @param userClassLoader User code class loader of the job + * @return {@link CompletedCheckpointStore} instance for the job + */ + CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader) + throws Exception; + + /** + * Creates a {@link CheckpointIDCounter} instance for a job. + * + * @param jobId Job ID to recover checkpoints for + * @return {@link CheckpointIDCounter} instance for the job + */ + CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java new file mode 100644 index 0000000..ea3c26d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state) + * and that is considered completed. + */ +public class CompletedCheckpoint implements Serializable { + + private static final long serialVersionUID = -8360248179615702014L; + + private final JobID job; + + private final long checkpointID; + + private final long timestamp; + + private final ArrayList<StateForTask> states; + + public CompletedCheckpoint(JobID job, long checkpointID, long timestamp, ArrayList<StateForTask> states) { + this.job = job; + this.checkpointID = checkpointID; + this.timestamp = timestamp; + this.states = states; + } + + public JobID getJobId() { + return job; + } + + public long getCheckpointID() { + return checkpointID; + } + + public long getTimestamp() { + return timestamp; + } + + public List<StateForTask> getStates() { + return states; + } + + // -------------------------------------------------------------------------------------------- + + public void discard(ClassLoader userClassLoader) { + for(StateForTask state: states){ + state.discard(userClassLoader); + } + states.clear(); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public String toString() { + return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java new file mode 100644 index 0000000..d024aea --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import java.util.List; + +/** + * A bounded LIFO-queue of {@link CompletedCheckpoint} instances. + */ +public interface CompletedCheckpointStore { + + /** + * Recover available {@link CompletedCheckpoint} instances. + * + * <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest + * available checkpoint. + */ + void recover() throws Exception; + + /** + * Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints. + * + * <p>Only a bounded number of checkpoints is kept. When exceeding the maximum number of + * retained checkpoints, the oldest one will be discarded via {@link + * CompletedCheckpoint#discard(ClassLoader)}. + */ + void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception; + + /** + * Returns the latest {@link CompletedCheckpoint} instance or <code>null</code> if none was + * added. + */ + CompletedCheckpoint getLatestCheckpoint() throws Exception; + + /** + * Discards all added {@link CompletedCheckpoint} instances via {@link + * CompletedCheckpoint#discard(ClassLoader)}. + */ + void discardAllCheckpoints() throws Exception; + + /** + * Returns all {@link CompletedCheckpoint} instances. + * + * <p>Returns an empty list if no checkpoint has been added yet. + */ + List<CompletedCheckpoint> getAllCheckpoints() throws Exception; + + /** + * Returns the current number of retained checkpoints. + */ + int getNumberOfRetainedCheckpoints(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 370ae50..81159f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -31,7 +31,7 @@ import org.apache.flink.util.SerializedValue; /** * A pending checkpoint is a checkpoint that has been started, but has not been * acknowledged by all tasks that need to acknowledge it. Once all tasks have - * acknowledged it, it becomes a {@link SuccessfulCheckpoint}. + * acknowledged it, it becomes a {@link CompletedCheckpoint}. * * <p>Note that the pending checkpoint, as well as the successful checkpoint keep the * state handles always as serialized values, never as actual values.</p> @@ -109,13 +109,13 @@ public class PendingCheckpoint { return collectedStates; } - public SuccessfulCheckpoint toCompletedCheckpoint() { + public CompletedCheckpoint toCompletedCheckpoint() { synchronized (lock) { if (discarded) { throw new IllegalStateException("pending checkpoint is discarded"); } if (notYetAcknowledgedTasks.isEmpty()) { - SuccessfulCheckpoint completed = new SuccessfulCheckpoint(jobId, checkpointId, + CompletedCheckpoint completed = new CompletedCheckpoint(jobId, checkpointId, checkpointTimestamp, new ArrayList<StateForTask>(collectedStates)); discard(null, false); http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java new file mode 100644 index 0000000..052d743 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobmanager.RecoveryMode; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#STANDALONE}. + * + * <p>Simple wrapper of an {@link AtomicLong}. This is sufficient, because job managers are not + * recoverable in this recovery mode. + */ +public class StandaloneCheckpointIDCounter implements CheckpointIDCounter { + + private final AtomicLong checkpointIdCounter = new AtomicLong(1); + + @Override + public void start() throws Exception { + } + + @Override + public void stop() throws Exception { + } + + @Override + public long getAndIncrement() throws Exception { + return checkpointIdCounter.getAndIncrement(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java new file mode 100644 index 0000000..324a0be --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmanager.RecoveryMode; + +/** + * {@link CheckpointCoordinator} components in {@link RecoveryMode#STANDALONE}. + */ +public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFactory { + + @Override + public void start() { + // Nothing to do + } + + @Override + public void stop() { + // Nothing to do + } + + @Override + public CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader) + throws Exception { + + return new StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory + .NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader); + } + + @Override + public CheckpointIDCounter createCheckpointIDCounter(JobID ignored) { + return new StandaloneCheckpointIDCounter(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java new file mode 100644 index 0000000..c31606a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobmanager.RecoveryMode; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#STANDALONE}. + */ +class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore { + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** The completed checkpoints. */ + private final ArrayDeque<CompletedCheckpoint> checkpoints; + + /** + * Creates {@link StandaloneCompletedCheckpointStore}. + * + * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at + * least 1). Adding more checkpoints than this results + * in older checkpoints being discarded. + * @param userClassLoader The user class loader used to discard checkpoints + */ + public StandaloneCompletedCheckpointStore( + int maxNumberOfCheckpointsToRetain, + ClassLoader userClassLoader) { + + checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint."); + + this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain; + this.userClassLoader = checkNotNull(userClassLoader, "User class loader"); + + this.checkpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1); + } + + @Override + public void recover() throws Exception { + // Nothing to do + } + + @Override + public void addCheckpoint(CompletedCheckpoint checkpoint) { + checkpoints.addLast(checkpoint); + if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { + checkpoints.removeFirst().discard(userClassLoader); + } + } + + @Override + public CompletedCheckpoint getLatestCheckpoint() { + return checkpoints.isEmpty() ? null : checkpoints.getLast(); + } + + @Override + public List<CompletedCheckpoint> getAllCheckpoints() { + return new ArrayList<>(checkpoints); + } + + @Override + public int getNumberOfRetainedCheckpoints() { + return checkpoints.size(); + } + + @Override + public void discardAllCheckpoints() { + for (CompletedCheckpoint checkpoint : checkpoints) { + checkpoint.discard(userClassLoader); + } + + checkpoints.clear(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java index 120c503..d1428f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java @@ -24,6 +24,11 @@ import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + /** * Simple bean to describe the state belonging to a parallel operator. * Since we hold the state across execution attempts, we identify a task by its @@ -34,8 +39,10 @@ import org.slf4j.LoggerFactory; * Furthermore, the state may involve user-defined classes that are not accessible without * the respective classloader. */ -public class StateForTask { - +public class StateForTask implements Serializable { + + private static final long serialVersionUID = -2394696997971923995L; + private static final Logger LOG = LoggerFactory.getLogger(StateForTask.class); /** The state of the parallel operator */ @@ -48,12 +55,10 @@ public class StateForTask { private final int subtask; public StateForTask(SerializedValue<StateHandle<?>> state, JobVertexID operatorId, int subtask) { - if (state == null || operatorId == null || subtask < 0) { - throw new IllegalArgumentException(); - } - - this.state = state; - this.operatorId = operatorId; + this.state = checkNotNull(state, "State"); + this.operatorId = checkNotNull(operatorId, "Operator ID"); + + checkArgument(subtask >= 0, "Negative subtask index"); this.subtask = subtask; } http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java deleted file mode 100644 index be0b301..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint; - -import org.apache.flink.api.common.JobID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -/** - * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state) - * and that is considered completed. - */ -public class SuccessfulCheckpoint { - - private static final Logger LOG = LoggerFactory.getLogger(SuccessfulCheckpoint.class); - - private final JobID job; - - private final long checkpointID; - - private final long timestamp; - - private final List<StateForTask> states; - - - public SuccessfulCheckpoint(JobID job, long checkpointID, long timestamp, List<StateForTask> states) { - this.job = job; - this.checkpointID = checkpointID; - this.timestamp = timestamp; - this.states = states; - } - - public JobID getJobId() { - return job; - } - - public long getCheckpointID() { - return checkpointID; - } - - public long getTimestamp() { - return timestamp; - } - - public List<StateForTask> getStates() { - return states; - } - - // -------------------------------------------------------------------------------------------- - - public void discard(ClassLoader userClassLoader) { - for(StateForTask state: states){ - state.discard(userClassLoader); - } - states.clear(); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public String toString() { - return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java new file mode 100644 index 0000000..6673050 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * <p>Each counter creates a ZNode: + * <pre> + * +----O /flink/checkpoint-counter/<job-id> 1 [persistent] + * . + * . + * . + * +----O /flink/checkpoint-counter/<job-id> N [persistent] + * </pre> + * + * <p>The checkpoints IDs are required to be ascending (per job). In order to guarantee this in case + * of job manager failures we use ZooKeeper to have a shared counter across job manager instances. + */ +public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCheckpointIDCounter.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Path of the shared count */ + private final String counterPath; + + /** Curator recipe for shared counts */ + private final SharedCount sharedCount; + + /** Connection state listener to monitor the client connection */ + private final SharedCountConnectionStateListener connStateListener = + new SharedCountConnectionStateListener(); + + /** + * Creates a {@link ZooKeeperCheckpointIDCounter} instance. + * + * @param client Curator ZooKeeper client + * @param counterPath ZooKeeper path for the counter. It's sufficient to have a path per-job. + * @throws Exception + */ + public ZooKeeperCheckpointIDCounter(CuratorFramework client, String counterPath) throws Exception { + this.client = checkNotNull(client, "Curator client"); + this.counterPath = checkNotNull(counterPath, "Counter path"); + this.sharedCount = new SharedCount(client, counterPath, 1); + } + + @Override + public void start() throws Exception { + sharedCount.start(); + client.getConnectionStateListenable().addListener(connStateListener); + } + + @Override + public void stop() throws Exception { + sharedCount.close(); + client.getConnectionStateListenable().removeListener(connStateListener); + + LOG.info("Removing {} from ZooKeeper", counterPath); + client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath); + } + + @Override + public long getAndIncrement() throws Exception { + while (true) { + ConnectionState connState = connStateListener.getLastState(); + + if (connState != null) { + throw new IllegalStateException("Connection state: " + connState); + } + + VersionedValue<Integer> current = sharedCount.getVersionedValue(); + + Integer newCount = current.getValue() + 1; + + if (sharedCount.trySetCount(current, newCount)) { + return current.getValue(); + } + } + } + + /** + * Connection state listener. In case of {@link ConnectionState#SUSPENDED} or {@link + * ConnectionState#LOST} we are not guaranteed to read a current count from ZooKeeper. + */ + private class SharedCountConnectionStateListener implements ConnectionStateListener { + + private volatile ConnectionState lastState; + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) { + lastState = newState; + } + } + + private ConnectionState getLastState() { + return lastState; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java new file mode 100644 index 0000000..2659e7e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.util.ZooKeeperUtils; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CheckpointCoordinator} components in {@link RecoveryMode#ZOOKEEPER}. + */ +public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFactory { + + private final CuratorFramework client; + + private final Configuration config; + + public ZooKeeperCheckpointRecoveryFactory(CuratorFramework client, Configuration config) { + this.client = checkNotNull(client, "Curator client"); + this.config = checkNotNull(config, "Configuration"); + } + + @Override + public void start() { + // Nothing to do + } + + @Override + public void stop() { + client.close(); + } + + @Override + public CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader) + throws Exception { + + return ZooKeeperUtils.createCompletedCheckpoints(client, config, jobId, + NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader); + } + + @Override + public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws Exception { + return ZooKeeperUtils.createCheckpointIDCounter(client, config, jobID); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java new file mode 100644 index 0000000..62ab440 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * <p>Checkpoints are added under a ZNode per job: + * <pre> + * +----O /flink/checkpoints/<job-id> [persistent] + * . | + * . +----O /flink/checkpoints/<job-id>/1 [persistent] + * . . . + * . . . + * . . . + * . +----O /flink/checkpoints/<job-id>/N [persistent] + * </pre> + * + * <p>During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, + * only the latest one is used and older ones are discarded (even if the maximum number + * of retained checkpoints is greater than one). + * + * <p>If there is a network partition and multiple JobManagers run concurrent checkpoints for the + * same program, it is OK to take any valid successful checkpoint as long as the "history" of + * checkpoints is consistent. Currently, after recovery we start out with only a single + * checkpoint to circumvent those situations. + */ +public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper; + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** Local completed checkpoints. */ + private final ArrayDeque<Tuple2<StateHandle<CompletedCheckpoint>, String>> checkpointStateHandles; + + /** + * Creates a {@link ZooKeeperCompletedCheckpointStore} instance. + * + * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at + * least 1). Adding more checkpoints than this results + * in older checkpoints being discarded. On recovery, + * we will only start with a single checkpoint. + * @param userClassLoader The user class loader used to discard checkpoints + * @param client The Curator ZooKeeper client + * @param checkpointsPath The ZooKeeper path for the checkpoints (needs to + * start with a '/') + * @param stateHandleProvider The state handle provider for checkpoints + * @throws Exception + */ + public ZooKeeperCompletedCheckpointStore( + int maxNumberOfCheckpointsToRetain, + ClassLoader userClassLoader, + CuratorFramework client, + String checkpointsPath, + StateHandleProvider<CompletedCheckpoint> stateHandleProvider) throws Exception { + + checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint."); + + this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain; + this.userClassLoader = checkNotNull(userClassLoader, "User class loader"); + + checkNotNull(client, "Curator client"); + checkNotNull(checkpointsPath, "Checkpoints path"); + checkNotNull(stateHandleProvider, "State handle provider"); + + // Ensure that the checkpoints path exists + client.newNamespaceAwareEnsurePath(checkpointsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + this.client = client.usingNamespace(client.getNamespace() + checkpointsPath); + + this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>( + this.client, stateHandleProvider); + + this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1); + + LOG.info("Initialized in '{}'.", checkpointsPath); + } + + /** + * Gets the latest checkpoint from ZooKeeper and removes all others. + * + * <p><strong>Important</strong>: Even if there are more than one checkpoint in ZooKeeper, + * this will only recover the latest and discard the others. Otherwise, there is no guarantee + * that the history of checkpoints is consistent. + */ + @Override + public void recover() throws Exception { + LOG.info("Recovering checkpoints from ZooKeeper."); + + // Get all there is first + List<Tuple2<StateHandle<CompletedCheckpoint>, String>> initialCheckpoints; + while (true) { + try { + initialCheckpoints = checkpointsInZooKeeper.getAllSortedByName(); + break; + } + catch (ConcurrentModificationException e) { + LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying."); + } + } + + int numberOfInitialCheckpoints = initialCheckpoints.size(); + + LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints); + + if (numberOfInitialCheckpoints > 0) { + // Take the last one. This is the latest checkpoints, because path names are strictly + // increasing (checkpoint ID). + Tuple2<StateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints + .get(numberOfInitialCheckpoints - 1); + + CompletedCheckpoint latestCheckpoint = latest.f0.getState(userClassLoader); + + checkpointStateHandles.add(latest); + + LOG.info("Initialized with {}. Removing all older checkpoints.", latestCheckpoint); + + for (int i = 0; i < numberOfInitialCheckpoints - 1; i++) { + try { + removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i)); + } + catch (Exception e) { + LOG.error("Failed to discard checkpoint", e); + } + } + } + } + + /** + * Synchronously writes the new checkpoints to ZooKeeper and asynchronously removes older ones. + * + * @param checkpoint Completed checkpoint to add. + */ + @Override + public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { + checkNotNull(checkpoint, "Checkpoint"); + + // First add the new one. If it fails, we don't want to loose existing data. + String path = String.format("/%s", checkpoint.getCheckpointID()); + + final StateHandle<CompletedCheckpoint> stateHandle = checkpointsInZooKeeper.add(path, checkpoint); + + checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path)); + + // Everything worked, let's remove a previous checkpoint if necessary. + if (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) { + removeFromZooKeeperAndDiscardCheckpoint(checkpointStateHandles.removeFirst()); + } + + LOG.debug("Added {} to {}.", checkpoint, path); + } + + @Override + public CompletedCheckpoint getLatestCheckpoint() throws Exception { + if (checkpointStateHandles.isEmpty()) { + return null; + } + else { + return checkpointStateHandles.getLast().f0.getState(userClassLoader); + } + } + + @Override + public List<CompletedCheckpoint> getAllCheckpoints() throws Exception { + List<CompletedCheckpoint> checkpoints = new ArrayList<>(checkpointStateHandles.size()); + + for (Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandle : checkpointStateHandles) { + checkpoints.add(stateHandle.f0.getState(userClassLoader)); + } + + return checkpoints; + } + + @Override + public int getNumberOfRetainedCheckpoints() { + return checkpointStateHandles.size(); + } + + @Override + public void discardAllCheckpoints() throws Exception { + for (Tuple2<StateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) { + try { + removeFromZooKeeperAndDiscardCheckpoint(checkpoint); + } + catch (Exception e) { + LOG.error("Failed to discard checkpoint.", e); + } + } + + checkpointStateHandles.clear(); + + String path = "/" + client.getNamespace(); + + LOG.info("Removing {} from ZooKeeper", path); + ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true); + } + + /** + * Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle. + */ + private void removeFromZooKeeperAndDiscardCheckpoint( + final Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception { + + final BackgroundCallback callback = new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + try { + if (event.getType() == CuratorEventType.DELETE) { + if (event.getResultCode() == 0) { + // The checkpoint + CompletedCheckpoint checkpoint = stateHandleAndPath + .f0.getState(userClassLoader); + + checkpoint.discard(userClassLoader); + + // Discard the state handle + stateHandleAndPath.f0.discardState(); + + // Discard the checkpoint + LOG.debug("Discarded " + checkpoint); + } + else { + throw new IllegalStateException("Unexpected result code " + + event.getResultCode() + " in '" + event + "' callback."); + } + } + else { + throw new IllegalStateException("Unexpected event type " + + event.getType() + " in '" + event + "' callback."); + } + } + catch (Exception e) { + LOG.error("Failed to discard checkpoint.", e); + } + } + }; + + // Remove state handle from ZooKeeper first. If this fails, we can still recover, but if + // we remove a state handle and fail to remove it from ZooKeeper, we end up in an + // inconsistent state. + checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index ef00484..9430d80 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -30,6 +30,8 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -39,6 +41,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.ExecutionGraphMessages; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -110,8 +113,6 @@ public class ExecutionGraph implements Serializable { /** The log object used for debugging. */ static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class); - private static final int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1; - // -------------------------------------------------------------------------------------------- /** The lock used to secure all access to mutable fields, especially the tracking of progress @@ -347,7 +348,11 @@ public class ExecutionGraph implements Serializable { List<ExecutionJobVertex> verticesToWaitFor, List<ExecutionJobVertex> verticesToCommitTo, ActorSystem actorSystem, - UUID leaderSessionID) { + UUID leaderSessionID, + CheckpointIDCounter checkpointIDCounter, + CompletedCheckpointStore completedCheckpointStore, + RecoveryMode recoveryMode) throws Exception { + // simple sanity checks if (interval < 10 || checkpointTimeout < 10) { throw new IllegalArgumentException(); @@ -367,12 +372,14 @@ public class ExecutionGraph implements Serializable { snapshotCheckpointsEnabled = true; checkpointCoordinator = new CheckpointCoordinator( jobID, - NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, checkpointTimeout, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, - userClassLoader); + userClassLoader, + checkpointIDCounter, + completedCheckpointStore, + recoveryMode); // the periodic checkpoint scheduler is activated and deactivated as a result of // job status changes (running -> on, all other states -> off) @@ -382,8 +389,14 @@ public class ExecutionGraph implements Serializable { interval, leaderSessionID)); } - - public void disableSnaphotCheckpointing() { + + /** + * Disables checkpointing. + * + * <p>The shutdown of the checkpoint coordinator might block. Make sure that calls to this + * method don't block the job manager actor and run asynchronously. + */ + public void disableSnaphotCheckpointing() throws Exception { if (state != JobStatus.CREATED) { throw new IllegalStateException("Job must be in CREATED state"); } @@ -773,6 +786,20 @@ public class ExecutionGraph implements Serializable { } /** + * Restores the latest checkpointed state. + * + * <p>The recovery of checkpoints might block. Make sure that calls to this method don't + * block the job manager actor and run asynchronously. + */ + public void restoreLatestCheckpointedState() throws Exception { + synchronized (progressLock) { + if (checkpointCoordinator != null) { + checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false); + } + } + } + + /** * This method cleans fields that are irrelevant for the archived execution attempt. */ public void prepareForArchiving() { @@ -886,7 +913,13 @@ public class ExecutionGraph implements Serializable { } }, executionContext); } else { - restart(); + future(new Callable<Object>() { + @Override + public Object call() throws Exception { + restart(); + return null; + } + }, executionContext); } break; } @@ -906,7 +939,7 @@ public class ExecutionGraph implements Serializable { } } } - + private void postRunCleanup() { try { CheckpointCoordinator coord = this.checkpointCoordinator; http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 6b36e2d..a64d63c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -538,4 +538,9 @@ public class JobGraph implements Serializable { } } } + + @Override + public String toString() { + return "JobGraph(jobId: " + jobID + ")"; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java index 2e75b19..17322d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java @@ -35,6 +35,19 @@ public enum RecoveryMode { ZOOKEEPER; /** + * Return the configured {@link RecoveryMode}. + * + * @param config The config to parse + * @return Configured recovery mode or {@link ConfigConstants#DEFAULT_RECOVERY_MODE} if not + * configured. + */ + public static RecoveryMode fromConfig(Configuration config) { + return RecoveryMode.valueOf(config.getString( + ConfigConstants.RECOVERY_MODE, + ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase()); + } + + /** * Returns true if the defined recovery mode supports high availability. * * @param configuration Configuration which contains the recovery mode http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java new file mode 100644 index 0000000..db36f92 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import scala.Option; + +import java.util.Collections; +import java.util.List; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#STANDALONE}. + * + * <p>All operations are NoOps, because {@link JobGraph} instances cannot be recovered in this + * recovery mode. + */ +public class StandaloneSubmittedJobGraphStore implements SubmittedJobGraphStore { + + @Override + public void start(SubmittedJobGraphListener jobGraphListener) throws Exception { + // Nothing to do + } + + @Override + public void stop() { + // Nothing to do + } + + @Override + public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception { + // Nothing to do + } + + @Override + public void removeJobGraph(JobID jobId) throws Exception { + // Nothing to do + } + + @Override + public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception { + return Option.empty(); + } + + @Override + public List<SubmittedJobGraph> recoverJobGraphs() throws Exception { + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java new file mode 100644 index 0000000..48da3b8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.io.Serializable; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A recoverable {@link JobGraph} and {@link JobInfo}. + */ +public class SubmittedJobGraph implements Serializable { + + private static final long serialVersionUID = 2836099271734771825L; + + /** The submitted {@link JobGraph} */ + private final JobGraph jobGraph; + + /** The {@link JobInfo}. */ + private final JobInfo jobInfo; + + /** + * Creates a {@link SubmittedJobGraph}. + * + * @param jobGraph The submitted {@link JobGraph} + * @param jobInfo The {@link JobInfo} + */ + public SubmittedJobGraph(JobGraph jobGraph, JobInfo jobInfo) { + this.jobGraph = checkNotNull(jobGraph, "Job graph"); + this.jobInfo = checkNotNull(jobInfo, "Job info"); + } + + /** + * Returns the submitted {@link JobGraph}. + */ + public JobGraph getJobGraph() { + return jobGraph; + } + + /** + * Returns the {@link JobID} of the submitted {@link JobGraph}. + */ + public JobID getJobId() { + return jobGraph.getJobID(); + } + + /** + * Returns the {@link JobInfo} of the client who submitted the {@link JobGraph}. + */ + public JobInfo getJobInfo() throws Exception { + return jobInfo; + } + + @Override + public String toString() { + return String.format("SubmittedJobGraph(%s, %s)", jobGraph, jobInfo); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java new file mode 100644 index 0000000..bd628cd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import scala.Option; + +import java.util.List; + +/** + * {@link SubmittedJobGraph} instances for recovery. + */ +public interface SubmittedJobGraphStore { + + /** + * Starts the {@link SubmittedJobGraphStore} service. + */ + void start(SubmittedJobGraphListener jobGraphListener) throws Exception; + + /** + * Stops the {@link SubmittedJobGraphStore} service. + */ + void stop() throws Exception; + + /** + * Returns a list of all submitted {@link JobGraph} instances. + */ + List<SubmittedJobGraph> recoverJobGraphs() throws Exception; + + /** + * Returns the {@link SubmittedJobGraph} with the given {@link JobID}. + * + * <p>An Exception is thrown, if no job graph with the given ID exists. + */ + Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception; + + /** + * Adds the {@link SubmittedJobGraph} instance. + * + * <p>If a job graph with the same {@link JobID} exists, it is replaced. + */ + void putJobGraph(SubmittedJobGraph jobGraph) throws Exception; + + /** + * Removes the {@link SubmittedJobGraph} with the given {@link JobID} if it exists. + */ + void removeJobGraph(JobID jobId) throws Exception; + + /** + * A listener for {@link SubmittedJobGraph} instances. This is used to react to races between + * multiple running {@link SubmittedJobGraphStore} instances (on multiple job managers). + */ + interface SubmittedJobGraphListener { + + /** + * Callback for {@link SubmittedJobGraph} instances added by a different {@link + * SubmittedJobGraphStore} instance. + * + * <p><strong>Important:</strong> It is possible to get false positives and be notified + * about a job graph, which was added by this instance. + * + * @param jobId The {@link JobID} of the added job graph + */ + void onAddedJobGraph(JobID jobId); + + /** + * Callback for {@link SubmittedJobGraph} instances removed by a different {@link + * SubmittedJobGraphStore} instance. + * + * @param jobId The {@link JobID} of the removed job graph + */ + void onRemovedJobGraph(JobID jobId); + + } + +}
