[
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14943207#comment-14943207
]
ASF GitHub Bot commented on FLINK-2354:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1153#discussion_r41129801
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * State handles backed by ZooKeeper.
+ *
+ * <p>Added state is persisted via {@link StateHandle}s, which in turn are
written to
+ * ZooKeeper. This level of indirection is necessary to keep the amount of
data in ZooKeeper
+ * small. ZooKeeper is build for data in the KB range whereas state can
grow to multiple MBs.
+ *
+ * <p>State modifications require some care, because it is possible that
certain failures bring
+ * the state handle backend and ZooKeeper out of sync.
+ *
+ * <p>ZooKeeper holds the ground truth about state handles, i.e. the
following holds:
+ *
+ * <pre>
+ * State handle in ZooKeeper => State handle exists
+ * </pre>
+ *
+ * But not:
+ *
+ * <pre>
+ * State handle exists => State handle in ZooKeeper
+ * </pre>
+ *
+ * There can be lingering state handles when failures happen during
operation. They
+ * need to be cleaned up manually (see <a
href="https://issues.apache.org/jira/browse/FLINK-2513">
+ * FLINK-2513</a> about a possible way to overcome this).
+ *
+ * @param <T> Type of state
+ */
+public class ZooKeeperStateHandleStore<T extends Serializable> {
+
+ /** Curator ZooKeeper client */
+ private final CuratorFramework client;
+
+ /** State handle provider */
+ private final StateHandleProvider<T> stateHandleProvider;
+
+ /**
+ * Creates a {@link ZooKeeperStateHandleStore}.
+ *
+ * @param client The Curator ZooKeeper client.
<strong>Important:</strong> It is
+ * expected that the client's namespace
ensures that the root
+ * path is exclusive for all state handles
managed by this
+ * instance, e.g.
<code>client.usingNamespace("/stateHandles")</code>
+ * @param stateHandleProvider The state handle provider for the state
+ */
+ public ZooKeeperStateHandleStore(
+ CuratorFramework client,
+ StateHandleProvider<T> stateHandleProvider) {
+
+ this.client = checkNotNull(client, "Curator client");
+ this.stateHandleProvider = checkNotNull(stateHandleProvider,
"State handle provider");
+ }
+
+ /**
+ * Creates a state handle and stores it in ZooKeeper with create mode
{@link
+ * CreateMode#PERSISTENT}.
+ *
+ * @see #add(String, Serializable, CreateMode)
+ */
+ public ZooKeeperStateHandle<T> add(String pathInZooKeeper, T state)
throws Exception {
+ return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+ }
+
+ /**
+ * Creates a state handle and stores it in ZooKeeper.
+ *
+ * <p><strong>Important</strong>: This will <em>not</em> store the
actual state in
+ * ZooKeeper, but create a state handle and store it in ZooKeeper. This
level of indirection
+ * makes sure that data in ZooKeeper is small.
+ *
+ * @param pathInZooKeeper Destination path in ZooKeeper (expected to
*not* exist yet and
+ * start with a '/')
+ * @param state State to be added
+ * @param createMode The create mode for the new path in ZooKeeper
+ * @return Created {@link ZooKeeperStateHandle}
+ * @throws Exception
+ */
+ public ZooKeeperStateHandle<T> add(String pathInZooKeeper, T state,
CreateMode createMode) throws Exception {
+ checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+ checkNotNull(state, "State");
+
+ // Create the state handle. Nothing persisted yet.
+ StateHandle<T> stateHandle =
stateHandleProvider.createStateHandle(state);
+
+ boolean success = false;
+
+ try {
+ // Serialize the state handle. This writes the state to
the backend.
+ byte[] serializedStateHandle =
InstantiationUtil.serializeObject(stateHandle);
+
+ // Write state handle (not the actual state) to
ZooKeeper. This is expected to be
+ // smaller than the state itself. This level of
indirection makes sure that data in
+ // ZooKeeper is small, because ZooKeeper is designed
for data in the KB range, but
+ // the state can be larger.
+ client
+ .create()
+ .withMode(createMode)
+ .forPath(pathInZooKeeper,
serializedStateHandle);
+
+ success = true;
+
+ return new ZooKeeperStateHandle<>(stateHandle,
pathInZooKeeper);
+ }
+ finally {
+ if (!success) {
+ // Cleanup the state handle if it was not
written to ZooKeeper.
+ if (stateHandle != null) {
+ stateHandle.discardState();
+ }
+ }
+ }
+ }
+
+ /**
+ * Replaces a state handle in ZooKeeper and discards the old state
handle.
+ *
+ * <p><strong>Important</strong>: This method will only discard the
state handle and not the
+ * state itself. Don't forget to run custom cleanup code of the state,
if necessary.
+ *
+ * <pre>
+ * T state = get(path).getState();
+ * state.discard(); // Custom clean up
+ * replace(path, version, newState)
+ * </pre>
+ *
+ * @param pathInZooKeeper Destination path in ZooKeeper (expected to
exist and start with a '/')
+ * @param expectedVersion Expected version of the node to replace
+ * @param state The new state to replace the old one
+ */
+ public void replace(String pathInZooKeeper, int expectedVersion, T
state) throws Exception {
+ checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+ checkNotNull(state, "State");
+
+ StateHandle<T> oldStateHandle = get(pathInZooKeeper);
+
+ StateHandle<T> stateHandle =
stateHandleProvider.createStateHandle(state);
+
+ boolean success = false;
+
+ try {
+ // Serialize the new state handle. This writes the
state to the backend.
+ byte[] serializedStateHandle =
InstantiationUtil.serializeObject(stateHandle);
+
+ // Replace state handle in ZooKeeper.
+ client.setData()
+ .withVersion(expectedVersion)
+ .forPath(pathInZooKeeper,
serializedStateHandle);
+
+ success = true;
+ }
+ finally {
+ if (success) {
+ oldStateHandle.discardState();
+ }
+ else {
+ stateHandle.discardState();
+ }
+ }
+ }
+
+ /**
+ * Returns the version of the node if it exists or <code>-1</code> if
it doesn't.
+ *
+ * @param pathInZooKeeper Path in ZooKeeper to check
+ * @return Version of the ZNode if the path exists, <code>-1</code>
otherwise.
+ * @throws Exception
+ */
+ public int exists(String pathInZooKeeper) throws Exception {
+ checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+
+ Stat stat = client.checkExists().forPath(pathInZooKeeper);
+
+ if (stat != null) {
+ return stat.getVersion();
+ }
+
+ return -1;
+ }
+
+ /**
+ * Gets a state handle from ZooKeeper.
+ *
+ * @param pathInZooKeeper Path in ZooKeeper to get the state handle
from (expected to
+ * exist and start with a '/').
+ * @return The state handle
+ * @throws Exception
+ */
+ @SuppressWarnings("unchecked")
+ public StateHandle<T> get(String pathInZooKeeper) throws Exception {
+ checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+
+ byte[] data = client.getData().forPath(pathInZooKeeper);
+
+ return (StateHandle<T>)
InstantiationUtil.deserializeObject(data, ClassLoader
+ .getSystemClassLoader());
+ }
+
+ /**
+ * Gets all available state handles from ZooKeeper.
+ *
+ * @return All state handles from ZooKeeper.
+ * @throws ConcurrentModificationException
+ */
+ @SuppressWarnings("unchecked")
+ public List<ZooKeeperStateHandle<T>> getAll() throws Exception {
+ // Initial cVersion (number of changes to the children of this
node)
+ int initialCVersion =
client.checkExists().forPath("/").getCversion();
+
+ List<String> children = client.getChildren().forPath("/");
+
+ final List<ZooKeeperStateHandle<T>> stateHandles = new
ArrayList<>(children.size());
+
+ for (String path : children) {
+ path = "/" + path;
+
+ try {
+ final StateHandle<T> stateHandle = get(path);
+ stateHandles.add(new
ZooKeeperStateHandle(stateHandle, path));
+ }
+ catch (KeeperException.NoNodeException e) {
+ throw new
ConcurrentModificationException("Modification during getAll()", e);
+ }
+ }
+
+ verifyExpectedCVersion(initialCVersion, "/", "Modification
during getAll()");
+
+ return stateHandles;
+ }
+
+ /**
+ * Gets all available state handles from ZooKeeper sorted by name
(ascending).
+ *
+ * @return All state handles in ZooKeeper.
+ */
+ @SuppressWarnings("unchecked")
+ public List<ZooKeeperStateHandle<T>> getAllSortedByName() throws
Exception {
+ // Initial cVersion (number of changes to the children of this
node)
+ int initialCVersion =
client.checkExists().forPath("/").getCversion();
+
+ List<String> children = ZKPaths.getSortedChildren(
+ client.getZookeeperClient().getZooKeeper(),
+ ZKPaths.fixForNamespace(client.getNamespace(),
"/"));
+
+ final List<ZooKeeperStateHandle<T>> stateHandles = new
ArrayList<>(children.size());
+
+ for (String path : children) {
+ path = "/" + path;
+ try {
+ final StateHandle<T> stateHandle = get(path);
+ stateHandles.add(new
ZooKeeperStateHandle(stateHandle, path));
+ }
+ catch (KeeperException.NoNodeException e) {
+ throw new
ConcurrentModificationException("Modification during getAllSortedByName()", e);
+ }
+ }
+
+ verifyExpectedCVersion(initialCVersion, "/", "Modification
during getAllSortedByName()");
+
+ return stateHandles;
+ }
+
+ /**
+ * Removes a state handle from ZooKeeper.
+ *
+ * <p><stong>Important</stong>: this does not discard the state handle.
If you want to
+ * discard the state handle call {@link #discard(String)}.
+ *
+ * @param pathInZooKeeper Path of state handle to remove (expected to
start with a '/')
+ * @throws Exception
+ */
+ public void remove(String pathInZooKeeper) throws Exception {
+ checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+
+ client
+ .delete()
+ .deletingChildrenIfNeeded()
+ .forPath(pathInZooKeeper);
+ }
+
+ /**
+ * Removes a state handle from ZooKeeper asynchronously.
+ *
+ * <p><stong>Important</stong>: this does not discard the state handle.
If you want to
+ * discard the state handle call {@link #discard(String)}.
+ *
+ * @param pathInZooKeeper Path of state handle to remove (expected to
start with a '/')
+ * @param callback The callback after the operation finishes
+ * @throws Exception
+ */
+ public void remove(String pathInZooKeeper, BackgroundCallback callback)
throws Exception {
+ checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+ checkNotNull(callback, "Background callback");
+
+ client
+ .delete()
+ .deletingChildrenIfNeeded()
+ .inBackground(callback)
+ .forPath(pathInZooKeeper);
+ }
+
+ /**
+ * Discards a state handle and removes it from ZooKeeper.
+ *
+ * <p><strong>Important</strong>: This method will only discard the
state handle and not the
+ * state itself. Don't forget to run custom cleanup code of the state,
if necessary.
+ *
+ * <pre>
+ * T state = get(path).getState();
+ * state.discard(); // Custom clean up
+ * discard(path)
+ * </pre>
+ *
+ * <p>If you only want to remove the state handle in ZooKeeper call
{@link #remove(String)}.
+ *
+ * @param pathInZooKeeper Path of state handle to discard (expected to
start with a '/')
+ * @throws Exception
+ */
+ public void discard(String pathInZooKeeper) throws Exception {
+ checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+
+ StateHandle<T> stateHandle = get(pathInZooKeeper);
+
+ // Delete the state handle from ZooKeeper first
+ client
+ .delete()
+ .deletingChildrenIfNeeded()
+ .forPath(pathInZooKeeper);
--- End diff --
indentation
> Recover running jobs on JobManager failure
> ------------------------------------------
>
> Key: FLINK-2354
> URL: https://issues.apache.org/jira/browse/FLINK-2354
> Project: Flink
> Issue Type: Sub-task
> Components: JobManager
> Affects Versions: master
> Reporter: Ufuk Celebi
> Assignee: Ufuk Celebi
> Fix For: 0.10
>
>
> tl;dr Persist JobGraphs in state backend and coordinate reference to state
> handle via ZooKeeper.
> Problem: When running multiple JobManagers in high availability mode, the
> leading job manager looses all running jobs when it fails. After a new
> leading job manager is elected, it is not possible to recover any previously
> running jobs.
> Solution: The leading job manager, which receives the job graph writes 1) the
> job graph to a state backend, and 2) a reference to the respective state
> handle to ZooKeeper. In general, job graphs can become large (multiple MBs,
> because they include closures etc.). ZooKeeper is not designed for data of
> this size. The level of indirection via the reference to the state backend
> keeps the data in ZooKeeper small.
> Proposed ZooKeeper layout:
> /flink (default)
> +- currentJobs
> +- job id i
> +- state handle reference of job graph i
> The 'currentJobs' node needs to be persistent to allow recovery of jobs
> between job managers. The currentJobs node needs to satisfy the following
> invariant: There is a reference to a job graph with id i IFF the respective
> job graph needs to be recovered by a newly elected job manager leader.
> With this in place, jobs will be recovered from their initial state (as if
> resubmitted). The next step is to backup the runtime state handles of
> checkpoints in a similar manner.
> ---
> This work will be based on [[email protected]]'s implementation of
> FLINK-2291. The leader election service notifies the job manager about
> granted/revoked leadership. This notification happens via Akka and thus
> serially *per* job manager, but results in eventually consistent state
> between job managers. For some snapshots of time it is possible to have a new
> leader granted leadership, before the old one has been revoked its leadership.
> [[email protected]], can you confirm that leadership does not guarantee
> mutually exclusive access to the shared 'currentJobs' state?
> For example, the following can happen:
> - JM 1 is leader, JM 2 is standby
> - JOB i is running (and hence /flink/currentJobs/i exists)
> - ZK notifies leader election service (LES) of JM 1 and JM 2
> - LES 2 immediately notifies JM 2 about granted leadership, but LES 1
> notification revoking leadership takes longer
> - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives
> final JobStatusChange
> - JM 2 resubmits the job /flink/currentJobs/i
> - JM 1 removes /flink/currentJobs/i, because it is now finished
> => inconsistent state (wrt the specified invariant above)
> If it is indeed a problem, we can circumvent this with a Curator recipe for
> [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to
> coordinate the access to currentJobs. The lock needs to be acquired on
> leadership.
> ---
> Minimum required tests:
> - Unit tests for job graph serialization and writing to state backend and
> ZooKeeper with expected nodes
> - Unit tests for job submission to job manager in leader/non-leader state
> - Unit tests for leadership granting/revoking and job submission/restarting
> interleavings
> - Process failure integration tests with single and multiple running jobs
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)