[ 
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14948786#comment-14948786
 ] 

ASF GitHub Bot commented on FLINK-2354:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1153#discussion_r41523760
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
    @@ -0,0 +1,423 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.zookeeper;
    +
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.api.BackgroundCallback;
    +import org.apache.curator.utils.ZKPaths;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.state.StateHandleProvider;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.zookeeper.CreateMode;
    +import org.apache.zookeeper.KeeperException;
    +import org.apache.zookeeper.data.Stat;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +/**
    + * State handles backed by ZooKeeper.
    + *
    + * <p>Added state is persisted via {@link StateHandle}s, which in turn are 
written to
    + * ZooKeeper. This level of indirection is necessary to keep the amount of 
data in ZooKeeper
    + * small. ZooKeeper is build for data in the KB range whereas state can 
grow to multiple MBs.
    + *
    + * <p>State modifications require some care, because it is possible that 
certain failures bring
    + * the state handle backend and ZooKeeper out of sync.
    + *
    + * <p>ZooKeeper holds the ground truth about state handles, i.e. the 
following holds:
    + *
    + * <pre>
    + * State handle in ZooKeeper => State handle exists
    + * </pre>
    + *
    + * But not:
    + *
    + * <pre>
    + * State handle exists => State handle in ZooKeeper
    + * </pre>
    + *
    + * There can be lingering state handles when failures happen during 
operation. They
    + * need to be cleaned up manually (see <a 
href="https://issues.apache.org/jira/browse/FLINK-2513";>
    + * FLINK-2513</a> about a possible way to overcome this).
    + *
    + * @param <T> Type of state
    + */
    +public class ZooKeeperStateHandleStore<T extends Serializable> {
    +
    +   /** Curator ZooKeeper client */
    +   private final CuratorFramework client;
    +
    +   /** State handle provider */
    +   private final StateHandleProvider<T> stateHandleProvider;
    +
    +   /**
    +    * Creates a {@link ZooKeeperStateHandleStore}.
    +    *
    +    * @param client              The Curator ZooKeeper client. 
<strong>Important:</strong> It is
    +    *                            expected that the client's namespace 
ensures that the root
    +    *                            path is exclusive for all state handles 
managed by this
    +    *                            instance, e.g. 
<code>client.usingNamespace("/stateHandles")</code>
    +    * @param stateHandleProvider The state handle provider for the state
    +    */
    +   public ZooKeeperStateHandleStore(
    +                   CuratorFramework client,
    +                   StateHandleProvider<T> stateHandleProvider) {
    +
    +           this.client = checkNotNull(client, "Curator client");
    +           this.stateHandleProvider = checkNotNull(stateHandleProvider, 
"State handle provider");
    +   }
    +
    +   /**
    +    * Creates a state handle and stores it in ZooKeeper with create mode 
{@link
    +    * CreateMode#PERSISTENT}.
    +    *
    +    * @see #add(String, Serializable, CreateMode)
    +    */
    +   public ZooKeeperStateHandle<T> add(String pathInZooKeeper, T state) 
throws Exception {
    +           return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
    +   }
    +
    +   /**
    +    * Creates a state handle and stores it in ZooKeeper.
    +    *
    +    * <p><strong>Important</strong>: This will <em>not</em> store the 
actual state in
    +    * ZooKeeper, but create a state handle and store it in ZooKeeper. This 
level of indirection
    +    * makes sure that data in ZooKeeper is small.
    +    *
    +    * @param pathInZooKeeper Destination path in ZooKeeper (expected to 
*not* exist yet and
    +    *                        start with a '/')
    +    * @param state           State to be added
    +    * @param createMode      The create mode for the new path in ZooKeeper
    +    * @return Created {@link ZooKeeperStateHandle}
    +    * @throws Exception If a ZooKeeper or state handle operation fails
    +    */
    +   public ZooKeeperStateHandle<T> add(String pathInZooKeeper, T state, 
CreateMode createMode) throws Exception {
    +           checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
    +           checkNotNull(state, "State");
    +
    +           // Create the state handle. Nothing persisted yet.
    +           StateHandle<T> stateHandle = 
stateHandleProvider.createStateHandle(state);
    +
    +           boolean success = false;
    +
    +           try {
    +                   // Serialize the state handle. This writes the state to 
the backend.
    +                   byte[] serializedStateHandle = 
InstantiationUtil.serializeObject(stateHandle);
    +
    +                   // Write state handle (not the actual state) to 
ZooKeeper. This is expected to be
    +                   // smaller than the state itself. This level of 
indirection makes sure that data in
    +                   // ZooKeeper is small, because ZooKeeper is designed 
for data in the KB range, but
    +                   // the state can be larger.
    +                   
client.create().withMode(createMode).forPath(pathInZooKeeper, 
serializedStateHandle);
    +
    +                   success = true;
    +
    +                   return new ZooKeeperStateHandle<>(stateHandle, 
pathInZooKeeper);
    +           }
    +           finally {
    +                   if (!success) {
    +                           // Cleanup the state handle if it was not 
written to ZooKeeper.
    +                           if (stateHandle != null) {
    +                                   stateHandle.discardState();
    +                           }
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Replaces a state handle in ZooKeeper and discards the old state 
handle.
    +    *
    +    * @param pathInZooKeeper Destination path in ZooKeeper (expected to 
exist and start with a '/')
    +    * @param expectedVersion Expected version of the node to replace
    +    * @param state           The new state to replace the old one
    +    * @throws Exception If a ZooKeeper or state handle operation fails
    +    */
    +   public void replace(String pathInZooKeeper, int expectedVersion, T 
state) throws Exception {
    +           checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
    +           checkNotNull(state, "State");
    +
    +           StateHandle<T> oldStateHandle = get(pathInZooKeeper);
    +
    +           StateHandle<T> stateHandle = 
stateHandleProvider.createStateHandle(state);
    +
    +           boolean success = false;
    +
    +           try {
    +                   // Serialize the new state handle. This writes the 
state to the backend.
    +                   byte[] serializedStateHandle = 
InstantiationUtil.serializeObject(stateHandle);
    +
    +                   // Replace state handle in ZooKeeper.
    +                   client.setData()
    +                                   .withVersion(expectedVersion)
    +                                   .forPath(pathInZooKeeper, 
serializedStateHandle);
    +
    +                   success = true;
    +           }
    +           finally {
    +                   if (success) {
    +                           oldStateHandle.discardState();
    +                   }
    +                   else {
    +                           stateHandle.discardState();
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Returns the version of the node if it exists or <code>-1</code> if 
it doesn't.
    +    *
    +    * @param pathInZooKeeper Path in ZooKeeper to check
    +    * @return Version of the ZNode if the path exists, <code>-1</code> 
otherwise.
    +    * @throws Exception If the ZooKeeper operation fails
    +    */
    +   public int exists(String pathInZooKeeper) throws Exception {
    +           checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
    +
    +           Stat stat = client.checkExists().forPath(pathInZooKeeper);
    +
    +           if (stat != null) {
    +                   return stat.getVersion();
    +           }
    +
    +           return -1;
    +   }
    +
    +   /**
    +    * Gets a state handle from ZooKeeper.
    +    *
    +    * @param pathInZooKeeper Path in ZooKeeper to get the state handle 
from (expected to
    +    *                        exist and start with a '/').
    +    * @return The state handle
    +    * @throws Exception If a ZooKeeper or state handle operation fails
    +    */
    +   @SuppressWarnings("unchecked")
    +   public ZooKeeperStateHandle<T> get(String pathInZooKeeper) throws 
Exception {
    +           checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
    +
    +           byte[] data = client.getData().forPath(pathInZooKeeper);
    +
    +           StateHandle<T> stateHandle = (StateHandle<T>) InstantiationUtil
    +                           .deserializeObject(data, 
ClassLoader.getSystemClassLoader());
    +
    +           return new ZooKeeperStateHandle<>(stateHandle, pathInZooKeeper);
    +   }
    +
    +   /**
    +    * Gets all available state handles from ZooKeeper.
    +    *
    +    * <p>If there is a concurrent modification, the operation is retried 
until it succeeds.
    +    *
    +    * @return All state handles from ZooKeeper.
    +    * @throws Exception If a ZooKeeper or state handle operation fails
    +    */
    +   @SuppressWarnings("unchecked")
    +   public List<ZooKeeperStateHandle<T>> getAll() throws Exception {
    +           final List<ZooKeeperStateHandle<T>> stateHandles = new 
ArrayList<>();
    +
    +           boolean success = false;
    +
    +           retry:
    +           while (!success) {
    +                   // Initial cVersion (number of changes to the children 
of this node)
    +                   int initialCVersion = 
client.checkExists().forPath("/").getCversion();
    +
    +                   List<String> children = 
client.getChildren().forPath("/");
    +
    +                   for (String path : children) {
    +                           path = "/" + path;
    +
    +                           try {
    +                                   final StateHandle<T> stateHandle = 
get(path);
    +                                   stateHandles.add(new 
ZooKeeperStateHandle(stateHandle, path));
    +                           }
    +                           catch (KeeperException.NoNodeException ignored) 
{
    +                                   // Concurrent deletion, retry
    +                                   continue retry;
    +                           }
    +                   }
    +
    +                   int finalCVersion = 
client.checkExists().forPath("/").getCversion();
    +
    +                   // Check for concurrent modifications
    +                   success = initialCVersion == finalCVersion;
    +           }
    +
    +           return stateHandles;
    +   }
    +
    +   /**
    +    * Gets all available state handles from ZooKeeper sorted by name 
(ascending).
    +    *
    +    * <p>If there is a concurrent modification, the operation is retried 
until it succeeds.
    +    *
    +    * @return All state handles in ZooKeeper.
    +    * @throws Exception If a ZooKeeper or state handle operation fails
    +    */
    +   @SuppressWarnings("unchecked")
    +   public List<ZooKeeperStateHandle<T>> getAllSortedByName() throws 
Exception {
    +           final List<ZooKeeperStateHandle<T>> stateHandles = new 
ArrayList<>();
    +
    +           boolean success = false;
    +
    +           retry:
    +           while (!success) {
    +                   // Initial cVersion (number of changes to the children 
of this node)
    +                   int initialCVersion = 
client.checkExists().forPath("/").getCversion();
    +
    +                   List<String> children = ZKPaths.getSortedChildren(
    +                                   
client.getZookeeperClient().getZooKeeper(),
    +                                   
ZKPaths.fixForNamespace(client.getNamespace(), "/"));
    +
    +                   for (String path : children) {
    +                           path = "/" + path;
    +
    +                           try {
    +                                   final StateHandle<T> stateHandle = 
get(path);
    +                                   stateHandles.add(new 
ZooKeeperStateHandle(stateHandle, path));
    +                           }
    +                           catch (KeeperException.NoNodeException ignored) 
{
    +                                   // Concurrent deletion, retry
    +                                   continue retry;
    +                           }
    +                   }
    +
    +                   int finalCVersion = 
client.checkExists().forPath("/").getCversion();
    +
    +                   // Check for concurrent modifications
    +                   success = initialCVersion == finalCVersion;
    +           }
    +
    +           return stateHandles;
    +   }
    +
    +   /**
    +    * Removes a state handle from ZooKeeper.
    +    *
    +    * <p><stong>Important</stong>: this does not discard the state handle. 
If you want to
    +    * discard the state handle call {@link #removeAndDiscardState(String)}.
    +    *
    +    * @param pathInZooKeeper Path of state handle to remove (expected to 
start with a '/')
    +    * @throws Exception If the ZooKeeper operation fails
    +    */
    +   public void remove(String pathInZooKeeper) throws Exception {
    +           checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
    +
    +           
client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
    +   }
    +
    +   /**
    +    * Removes a state handle from ZooKeeper asynchronously.
    +    *
    +    * <p><stong>Important</stong>: this does not discard the state handle. 
If you want to
    +    * discard the state handle call {@link #removeAndDiscardState(String)}.
    +    *
    +    * @param pathInZooKeeper Path of state handle to remove (expected to 
start with a '/')
    +    * @param callback        The callback after the operation finishes
    +    * @throws Exception If the ZooKeeper operation fails
    +    */
    +   public void remove(String pathInZooKeeper, BackgroundCallback callback) 
throws Exception {
    +           checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
    +           checkNotNull(callback, "Background callback");
    +
    +           
client.delete().deletingChildrenIfNeeded().inBackground(callback).forPath(pathInZooKeeper);
    +   }
    +
    +   /**
    +    * Discards a state handle and removes it from ZooKeeper.
    +    *
    +    * <p>If you only want to remove the state handle in ZooKeeper call 
{@link #remove(String)}.
    +    *
    +    * @param pathInZooKeeper Path of state handle to discard (expected to 
start with a '/')
    +    * @throws Exception If the ZooKeeper or state handle operation fails
    +    */
    +   public void removeAndDiscardState(String pathInZooKeeper) throws 
Exception {
    +           checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
    +
    +           StateHandle<T> stateHandle = get(pathInZooKeeper);
    +
    +           // Delete the state handle from ZooKeeper first
    +           
client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
    +
    +           // Discard the state handle only after it has been successfully 
deleted from ZooKeeper.
    +           // Otherwise we might enter an illegal state after failures 
(with a state handle in
    +           // ZooKeeper, which has already been discarded).
    +           stateHandle.discardState();
    +   }
    +
    +   /**
    +    * Discards all available state handles and removes them from ZooKeeper.
    +    *
    +    * @throws Exception If a ZooKeeper or state handle operation fails
    +    */
    +   public void removeAndDiscardAllState() throws Exception {
    +           final List<ZooKeeperStateHandle<T>> allStateHandles = getAll();
    +
    +           ZKPaths.deleteChildren(
    +                           client.getZookeeperClient().getZooKeeper(),
    +                           ZKPaths.fixForNamespace(client.getNamespace(), 
"/"),
    +                           false);
    +
    +           // Discard the state handles only after they have been 
successfully deleted from ZooKeeper.
    +           for (ZooKeeperStateHandle<T> stateHandle : allStateHandles) {
    +                   stateHandle.discardState();
    +           }
    +   }
    +
    +   /**
    +    * A {@link StateHandle} with its path in ZooKeeper.
    +    *
    +    * @param <T> Type of state
    +    */
    +   public static class ZooKeeperStateHandle<T> implements StateHandle<T> {
    +
    +           private static final long serialVersionUID = 
-7289601198657398910L;
    +
    +           private final StateHandle<T> stateHandle;
    +
    +           private final String pathInZooKeeper;
    +
    +           private ZooKeeperStateHandle(StateHandle<T> stateHandle, String 
pathInZooKeeper) {
    +                   this.stateHandle = checkNotNull(stateHandle, "State 
handle");
    +                   this.pathInZooKeeper = checkNotNull(pathInZooKeeper, 
"Path in ZooKeeper");
    +           }
    +
    +           @Override
    +           public T getState(ClassLoader userClassLoader) throws Exception 
{
    +                   return stateHandle.getState(userClassLoader);
    +           }
    +
    +           @Override
    +           public void discardState() throws Exception {
    --- End diff --
    
    Shouldn't this call not also remove the state handle from ZooKeeper? You 
could make this class an inner class of `ZooKeeperStateHandleStore` and then 
you can simply call `discardState(pathInZooKeeper)`?


> Recover running jobs on JobManager failure
> ------------------------------------------
>
>                 Key: FLINK-2354
>                 URL: https://issues.apache.org/jira/browse/FLINK-2354
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager
>    Affects Versions: 0.10
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>             Fix For: 0.10
>
>
> tl;dr Persist JobGraphs in state backend and coordinate reference to state 
> handle via ZooKeeper.
> Problem: When running multiple JobManagers in high availability mode, the 
> leading job manager looses all running jobs when it fails. After a new 
> leading job manager is elected, it is not possible to recover any previously 
> running jobs.
> Solution: The leading job manager, which receives the job graph writes 1) the 
> job graph to a state backend, and 2) a reference to the respective state 
> handle to ZooKeeper. In general, job graphs can become large (multiple MBs, 
> because they include closures etc.). ZooKeeper is not designed for data of 
> this size. The level of indirection via the reference to the state backend 
> keeps the data in ZooKeeper small.
> Proposed ZooKeeper layout:
> /flink (default)
>   +- currentJobs
>        +- job id i
>             +- state handle reference of job graph i
> The 'currentJobs' node needs to be persistent to allow recovery of jobs 
> between job managers. The currentJobs node needs to satisfy the following 
> invariant: There is a reference to a job graph with id i IFF the respective 
> job graph needs to be recovered by a newly elected job manager leader.
> With this in place, jobs will be recovered from their initial state (as if 
> resubmitted). The next step is to backup the runtime state handles of 
> checkpoints in a similar manner.
> ---
> This work will be based on [~trohrm...@apache.org]'s implementation of 
> FLINK-2291. The leader election service notifies the job manager about 
> granted/revoked leadership. This notification happens via Akka and thus 
> serially *per* job manager, but results in eventually consistent state 
> between job managers. For some snapshots of time it is possible to have a new 
> leader granted leadership, before the old one has been revoked its leadership.
> [~trohrm...@apache.org], can you confirm that leadership does not guarantee 
> mutually exclusive access to the shared 'currentJobs' state?
> For example, the following can happen:
> - JM 1 is leader, JM 2 is standby
> - JOB i is running (and hence /flink/currentJobs/i exists)
> - ZK notifies leader election service (LES) of JM 1 and JM 2
> - LES 2 immediately notifies JM 2 about granted leadership, but LES 1 
> notification revoking leadership takes longer
> - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives 
> final JobStatusChange
> - JM 2 resubmits the job /flink/currentJobs/i
> - JM 1 removes /flink/currentJobs/i, because it is now finished
> => inconsistent state (wrt the specified invariant above)
> If it is indeed a problem, we can circumvent this with a Curator recipe for 
> [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to 
> coordinate the access to currentJobs. The lock needs to be acquired on 
> leadership.
> ---
> Minimum required tests:
> - Unit tests for job graph serialization and writing to state backend and 
> ZooKeeper with expected nodes
> - Unit tests for job submission to job manager in leader/non-leader state
> - Unit tests for leadership granting/revoking and job submission/restarting 
> interleavings
> - Process failure integration tests with single and multiple running jobs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to