[
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14948836#comment-14948836
]
ASF GitHub Bot commented on FLINK-2354:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/1153#discussion_r41527578
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.ZooKeeperStateHandle;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link CompletedCheckpointStore} for JobManagers running in {@link
RecoveryMode#ZOOKEEPER}.
+ *
+ * <p>Checkpoints are added under a ZNode per job:
+ * <pre>
+ * +----O /flink/checkpoints/<job-id> [persistent]
+ * . |
+ * . +----O /flink/checkpoints/<job-id>/1 [persistent_sequential]
+ * . . .
+ * . . .
+ * . . .
+ * . +----O /flink/checkpoints/<job-id>/N [persistent_sequential]
+ * </pre>
+ *
+ * <p>During recovery, the latest checkpoint is read from ZooKeeper. If
there is more than one,
+ * only the latest one is used and older ones are discarded (even if the
maximum number
+ * of retained checkpoints is greater than one). The sequential; cversion
determines, which
+ * checkpoint is the latest one.
+ *
+ * <p>If there is a network partition and multiple JobManagers run
concurrent checkpoints for the
+ * same program, it is OK to take any valid successful checkpoint as long
as the "history" of
+ * checkpoints is consistent. Currently, after recovery we start out with
only a single
+ * checkpoint in oder to guarantee this.
+ */
+public class ZooKeeperCompletedCheckpointStore implements
CompletedCheckpointStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
+
+ /** Curator ZooKeeper client */
+ private final CuratorFramework client;
+
+ /** Completed checkpoints in ZooKeeper */
+ private final ZooKeeperStateHandleStore<CompletedCheckpoint>
checkpointsInZooKeeper;
+
+ /** The maximum number of checkpoints to retain (at least 1). */
+ private final int maxNumberOfCheckpointsToRetain;
+
+ /** User class loader for discarding {@link CompletedCheckpoint}
instances. */
+ private final ClassLoader userClassLoader;
+
+ /** Local completed checkpoints. */
+ private final ArrayDeque<ZooKeeperStateHandle<CompletedCheckpoint>>
checkpointStateHandles;
+
+ /**
+ * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
+ *
+ * @param maxNumberOfCheckpointsToRetain The maximum number of
checkpoints to retain (at
+ * least 1). Adding more
checkpoints than this results
+ * in older checkpoints being
discarded. On recovery,
+ * we will only start with a
single checkpoint.
+ * @param userClassLoader The user class loader used to
discard checkpoints
+ * @param client The Curator ZooKeeper client
+ * @param checkpointsPath The ZooKeeper path for the
checkpoints (needs to
+ * start with a '/')
+ * @param stateHandleProvider The state handle provider for
checkpoints
+ * @throws Exception
+ */
+ public ZooKeeperCompletedCheckpointStore(
+ int maxNumberOfCheckpointsToRetain,
+ ClassLoader userClassLoader,
+ CuratorFramework client,
+ String checkpointsPath,
+ StateHandleProvider<CompletedCheckpoint>
stateHandleProvider) throws Exception {
+
+ checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain
at least one checkpoint.");
+
+ this.maxNumberOfCheckpointsToRetain =
maxNumberOfCheckpointsToRetain;
+ this.userClassLoader = checkNotNull(userClassLoader, "User
class loader");
+
+ checkNotNull(client, "Curator client");
+ checkNotNull(checkpointsPath, "Checkpoints path");
+ checkNotNull(stateHandleProvider, "State handle provider");
+
+ // Ensure that the checkpoints path exists
+ client.newNamespaceAwareEnsurePath(checkpointsPath)
+ .ensure(client.getZookeeperClient());
+
+ // All operations will have the path as root
+ this.client = client.usingNamespace(client.getNamespace() +
checkpointsPath);
+
+ this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(
+ this.client, stateHandleProvider);
+
+ this.checkpointStateHandles = new
ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
+
+ LOG.info("Initialized in '{}'.", checkpointsPath);
+ }
+
+ /**
+ * Gets the latest checkpoint from ZooKeeper and removes all others.
+ *
+ * <p><strong>Important</strong>: Even if there are more than one
checkpoint in ZooKeeper,
+ * this will only recover the latest and discard the others. Otherwise,
there is no guarantee
+ * that the history of checkpoints is consistent.
+ */
+ @Override
+ public void recover() throws Exception {
+ LOG.info("Recovering checkpoints from ZooKeeper.");
+
+ // Get all there is first
+ List<ZooKeeperStateHandle<CompletedCheckpoint>>
initialCheckpoints;
+ while (true) {
+ try {
+ initialCheckpoints =
checkpointsInZooKeeper.getAllSortedByName();
+ break;
+ }
+ catch (ConcurrentModificationException e) {
+ LOG.warn("Concurrent modification while reading
from ZooKeeper. Retrying.");
+ }
+ }
+
+ int numberOfInitialCheckpoints = initialCheckpoints.size();
+
+ LOG.info("Found {} checkpoints in ZooKeeper.",
numberOfInitialCheckpoints);
+
+ if (numberOfInitialCheckpoints > 0) {
+ // Take the last one. This is the latest checkpoints,
because path names are strictly
+ // increasing (checkpoint ID).
+ ZooKeeperStateHandle<CompletedCheckpoint> latest =
initialCheckpoints
+ .get(numberOfInitialCheckpoints - 1);
+
+ CompletedCheckpoint latestCheckpoint =
latest.getState(userClassLoader);
+
+ checkpointStateHandles.add(latest);
+
+ LOG.info("Initialized with {}. Removing all older
checkpoints.", latestCheckpoint);
+
+ for (int i = 0; i < numberOfInitialCheckpoints - 1;
i++) {
+ try {
+
removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
+ }
+ catch (Exception e) {
+ LOG.error("Failed to discard
checkpoint", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public int getNextCheckpointID() throws Exception {
+ Stat stat = client.checkExists().forPath("/");
+
+ if (stat == null) {
+ throw new IllegalStateException("Checkpoint root does
not exist");
+ }
+ else {
+ return stat.getCversion() + 1;
+ }
+ }
+
+ /**
+ * Synchronously writes the new checkpoint to ZooKeeper and
asynchronously removes older ones.
+ *
+ * @param checkpoint Completed checkpoint to add.
+ */
+ @Override
+ public void addCheckpoint(CompletedCheckpoint checkpoint) throws
Exception {
+ checkNotNull(checkpoint, "Checkpoint");
+
+ // First add the new one. If it fails, we don't want to loose
existing data.
+ String path = String.format("/%s",
checkpoint.getCheckpointID());
+
+ final ZooKeeperStateHandle<CompletedCheckpoint> stateHandle =
+ checkpointsInZooKeeper.add(path, checkpoint);
+
+ checkpointStateHandles.addLast(stateHandle);
+
+ // Everything worked, let's remove a previous checkpoint if
necessary.
+ if (checkpointStateHandles.size() >
maxNumberOfCheckpointsToRetain) {
+
removeFromZooKeeperAndDiscardCheckpoint(checkpointStateHandles.removeFirst());
+ }
+
+ LOG.debug("Added {} to {}.", checkpoint, path);
+ assert (checkpointStateHandles.getLast().equals(stateHandle));
+ }
+
+ @Override
+ public CompletedCheckpoint getLatestCheckpoint() throws Exception {
+ if (checkpointStateHandles.isEmpty()) {
+ return null;
+ }
+ else {
+ return
checkpointStateHandles.getLast().getState(userClassLoader);
+ }
+ }
+
+ @Override
+ public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
+ List<CompletedCheckpoint> checkpoints = new
ArrayList<>(checkpointStateHandles.size());
+
+ for (ZooKeeperStateHandle<CompletedCheckpoint> stateHandle
+ : checkpointStateHandles) {
+ checkpoints.add(stateHandle.getState(userClassLoader));
+ }
+
+ return checkpoints;
+ }
+
+ @Override
+ public int getNumberOfRetainedCheckpoints() {
+ return checkpointStateHandles.size();
+ }
+
+ @Override
+ public void discardAllCheckpoints() throws Exception {
+ for (ZooKeeperStateHandle<CompletedCheckpoint> checkpoint
+ : checkpointStateHandles) {
+ try {
+
removeFromZooKeeperAndDiscardCheckpoint(checkpoint);
+ }
+ catch (Exception e) {
+ LOG.error("Failed to discard checkpoint.", e);
+ }
+ }
+
+ checkpointStateHandles.clear();
+
+ String path = "/" + client.getNamespace();
+
+ LOG.info("Removing {} from ZooKeeper", path);
+
ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
+ }
+
+ /**
+ * Removes the state handle from ZooKeeper, discards the checkpoints,
and the state handle.
+ */
+ private void removeFromZooKeeperAndDiscardCheckpoint(
+ final ZooKeeperStateHandle<CompletedCheckpoint>
stateHandle) throws Exception {
+
+ final BackgroundCallback callback = new BackgroundCallback() {
+ @Override
+ public void processResult(CuratorFramework client,
CuratorEvent event) throws Exception {
+ try {
+ if (event.getType() ==
CuratorEventType.DELETE) {
+ if (event.getResultCode() == 0)
{
+ // The checkpoint
+ CompletedCheckpoint
checkpoint = stateHandle
+
.getState(userClassLoader);
+
+
checkpoint.discard(userClassLoader);
+
+ // Discard the state
handle
+
stateHandle.discardState();
+
+ // Discard the
checkpoint
+ LOG.debug("Discarded "
+ checkpoint);
+ }
+ else {
+ throw new
IllegalStateException("Unexpected result code " +
+
event.getResultCode() + " in '" + event + "' callback.");
+ }
+ }
+ else {
+ throw new
IllegalStateException("Unexpected event type " +
+ event.getType()
+ " in '" + event + "' callback.");
+ }
+ }
+ catch (Exception e) {
+ LOG.error("Failed to discard
checkpoint.", e);
+ }
+ }
+ };
+
+ // Remove state handle from ZooKeeper first. If this fails, we
can still recover, but if
+ // we remove a state handle and fail to remove it from
ZooKeeper, we end up in an
+ // inconsistent state.
+ checkpointsInZooKeeper.remove(stateHandle.getPathInZooKeeper(),
callback);
--- End diff --
I think this requires to much knowledge of the way you want to remove the
state from ZooKeeper. I think the ZooKeeperStateHandle was not well thought it.
I will remove it and manage (StateHandle, String) pairs in CheckpointStore
manually.
> Recover running jobs on JobManager failure
> ------------------------------------------
>
> Key: FLINK-2354
> URL: https://issues.apache.org/jira/browse/FLINK-2354
> Project: Flink
> Issue Type: Sub-task
> Components: JobManager
> Affects Versions: 0.10
> Reporter: Ufuk Celebi
> Assignee: Ufuk Celebi
> Fix For: 0.10
>
>
> tl;dr Persist JobGraphs in state backend and coordinate reference to state
> handle via ZooKeeper.
> Problem: When running multiple JobManagers in high availability mode, the
> leading job manager looses all running jobs when it fails. After a new
> leading job manager is elected, it is not possible to recover any previously
> running jobs.
> Solution: The leading job manager, which receives the job graph writes 1) the
> job graph to a state backend, and 2) a reference to the respective state
> handle to ZooKeeper. In general, job graphs can become large (multiple MBs,
> because they include closures etc.). ZooKeeper is not designed for data of
> this size. The level of indirection via the reference to the state backend
> keeps the data in ZooKeeper small.
> Proposed ZooKeeper layout:
> /flink (default)
> +- currentJobs
> +- job id i
> +- state handle reference of job graph i
> The 'currentJobs' node needs to be persistent to allow recovery of jobs
> between job managers. The currentJobs node needs to satisfy the following
> invariant: There is a reference to a job graph with id i IFF the respective
> job graph needs to be recovered by a newly elected job manager leader.
> With this in place, jobs will be recovered from their initial state (as if
> resubmitted). The next step is to backup the runtime state handles of
> checkpoints in a similar manner.
> ---
> This work will be based on [[email protected]]'s implementation of
> FLINK-2291. The leader election service notifies the job manager about
> granted/revoked leadership. This notification happens via Akka and thus
> serially *per* job manager, but results in eventually consistent state
> between job managers. For some snapshots of time it is possible to have a new
> leader granted leadership, before the old one has been revoked its leadership.
> [[email protected]], can you confirm that leadership does not guarantee
> mutually exclusive access to the shared 'currentJobs' state?
> For example, the following can happen:
> - JM 1 is leader, JM 2 is standby
> - JOB i is running (and hence /flink/currentJobs/i exists)
> - ZK notifies leader election service (LES) of JM 1 and JM 2
> - LES 2 immediately notifies JM 2 about granted leadership, but LES 1
> notification revoking leadership takes longer
> - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives
> final JobStatusChange
> - JM 2 resubmits the job /flink/currentJobs/i
> - JM 1 removes /flink/currentJobs/i, because it is now finished
> => inconsistent state (wrt the specified invariant above)
> If it is indeed a problem, we can circumvent this with a Curator recipe for
> [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to
> coordinate the access to currentJobs. The lock needs to be acquired on
> leadership.
> ---
> Minimum required tests:
> - Unit tests for job graph serialization and writing to state backend and
> ZooKeeper with expected nodes
> - Unit tests for job submission to job manager in leader/non-leader state
> - Unit tests for leadership granting/revoking and job submission/restarting
> interleavings
> - Process failure integration tests with single and multiple running jobs
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)