[
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14943152#comment-14943152
]
ASF GitHub Bot commented on FLINK-2354:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1153#discussion_r41126979
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphs.java
---
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import
org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandle;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link
RecoveryMode#ZOOKEEPER}.
+ *
+ * <p>Each job graph creates ZNode:
+ * <pre>
+ * +----O /flink/jobgraphs/<job-id> 1 [persistent]
+ * .
+ * .
+ * .
+ * +----O /flink/jobgraphs/<job-id> N [persistent]
+ * </pre>
+ *
+ * <p>The root path is watched to detect concurrent modifications in
corner situations where
+ * multiple instances operate concurrently. The job manager acts as a
{@link SubmittedJobGraphListener}
+ * to react to such situations.
+ */
+public class ZooKeeperSubmittedJobGraphs implements SubmittedJobGraphs {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphs.class);
+
+ /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
+ private final Object cacheLock = new Object();
+
+ /** Client (not a namespace facade) */
+ private final CuratorFramework client;
+
+ /** The set of IDs of all added job graphs. */
+ private final Set<JobID> addedJobGraphs = new HashSet<>();
+
+ /** Completed checkpoints in ZooKeeper */
+ private final ZooKeeperStateHandleStore<SubmittedJobGraph>
jobGraphsInZooKeeper;
+
+ /**
+ * Cache to monitor all children. This is used to detect races with
other instances working
+ * on the same state.
+ */
+ private final PathChildrenCache pathCache;
+
+ /** The external listener to be notified on races. */
+ private SubmittedJobGraphListener jobGraphListener;
+
+ /** Flag indicating whether this instance is running. */
+ private boolean isRunning;
+
+ public ZooKeeperSubmittedJobGraphs(
+ CuratorFramework client,
+ String currentJobsPath,
+ StateHandleProvider<SubmittedJobGraph>
stateHandleProvider) throws Exception {
+
+ checkNotNull(client, "Curator client");
+ checkNotNull(currentJobsPath, "Current jobs path");
+ checkNotNull(stateHandleProvider, "State handle provider");
+
+ // Keep a reference to the original client and not the
namespace facade. The namespace
+ // facade cannot be closed.
+ this.client = client;
+
+ // Ensure that the job graphs path exists
+ client.newNamespaceAwareEnsurePath(currentJobsPath)
+ .ensure(client.getZookeeperClient());
+
+ // All operations will have the path as root
+ client = client.usingNamespace(client.getNamespace() +
currentJobsPath);
+
+ this.jobGraphsInZooKeeper = new
ZooKeeperStateHandleStore<>(client, stateHandleProvider);
+
+ this.pathCache = new PathChildrenCache(client, "/", false);
+ pathCache.getListenable().addListener(new
SubmittedJobGraphsPathCacheListener());
+ }
+
+ @Override
+ public void start(SubmittedJobGraphListener jobGraphListener) throws
Exception {
+ synchronized (cacheLock) {
+ if (!isRunning) {
+ this.jobGraphListener = jobGraphListener;
+
+ pathCache.start();
+
+ isRunning = true;
+ }
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ synchronized (cacheLock) {
+ if (isRunning) {
+ jobGraphListener = null;
+
+ pathCache.close();
+
+ client.close();
+
+ isRunning = false;
+ }
+ }
+ }
+
+ @Override
+ public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
+ synchronized (cacheLock) {
+ verifyIsRunning();
+
+ List<ZooKeeperStateHandle<SubmittedJobGraph>> submitted;
+
+ while (true) {
+ try {
+ submitted =
jobGraphsInZooKeeper.getAll();
+ break;
+ }
+ catch (ConcurrentModificationException e) {
+ LOG.warn("Concurrent modification while
reading from ZooKeeper. Retrying.");
+ }
+ }
+
+ if (submitted.size() != 0) {
+ List<SubmittedJobGraph> jobGraphs = new
ArrayList<>(submitted.size());
+
+ for (ZooKeeperStateHandle<SubmittedJobGraph>
jobStateHandle : submitted) {
+ SubmittedJobGraph jobGraph =
jobStateHandle
+ .getStateHandle()
+
.getState(ClassLoader.getSystemClassLoader());
+
+ addedJobGraphs.add(jobGraph.getJobId());
+
+ jobGraphs.add(jobGraph);
+ }
+
+ LOG.info("Recovered {} job graphs: {}.",
jobGraphs.size(), jobGraphs);
+ return jobGraphs;
+ }
+ else {
+ LOG.info("No job graph to recover.");
+ return Collections.emptyList();
+ }
+ }
+ }
+
+ @Override
+ public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+ checkNotNull(jobId, "Job ID");
+ String path = getPathForJob(jobId);
+
+ synchronized (cacheLock) {
+ verifyIsRunning();
+
+ StateHandle<SubmittedJobGraph> jobStateHandle =
jobGraphsInZooKeeper.get(path);
+
+ SubmittedJobGraph jobGraph = jobStateHandle
+
.getState(ClassLoader.getSystemClassLoader());
+
+ addedJobGraphs.add(jobGraph.getJobId());
+
+ LOG.info("Recovered {}.", jobGraph);
+ return jobGraph;
+ }
+ }
+
+ @Override
+ public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+ checkNotNull(jobGraph, "Job graph");
+ String path = getPathForJob(jobGraph.getJobId());
+
+ synchronized (cacheLock) {
+ verifyIsRunning();
+
+ int currentVersion = jobGraphsInZooKeeper.exists(path);
+
+ if (currentVersion == -1) {
+ jobGraphsInZooKeeper.add(path, jobGraph);
+ LOG.info("Added {} to ZooKeeper.", jobGraph);
+ }
+ else if (addedJobGraphs.contains(jobGraph.getJobId())) {
+ jobGraphsInZooKeeper.replace(path,
currentVersion, jobGraph);
+ LOG.info("Updated {} in ZooKeeper.", jobGraph);
+ }
+ else {
+ throw new IllegalStateException("Oh, no. Trying
to update a graph you didn't " +
+ "#getAllSubmittedJobGraphs() or
#putJobGraph() yourself before.");
+ }
+
+ addedJobGraphs.add(jobGraph.getJobId());
+ }
+ }
+
+ @Override
+ public void removeJobGraph(JobID jobId) throws Exception {
+ checkNotNull(jobId, "Job ID");
+ String path = getPathForJob(jobId);
+
+ synchronized (cacheLock) {
+ if (addedJobGraphs.contains(jobId)) {
+ jobGraphsInZooKeeper.discard(path);
+
+ addedJobGraphs.remove(jobId);
+ LOG.info("Removed job graph {} from
ZooKeeper.", jobId);
+ }
+ }
+ }
+
+ /**
+ * Monitors ZooKeeper for changes.
+ *
+ * <p>Detects modifications from other job managers in corner
situations. The event
+ * notifications fire for changes from this job manager as well.
+ */
+ private final class SubmittedJobGraphsPathCacheListener implements
PathChildrenCacheListener {
+
+ @Override
+ public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event)
+ throws Exception {
+
+ if (LOG.isDebugEnabled()) {
+ if (event.getData() != null) {
+ LOG.debug("Received {} event (path:
{})", event.getType(), event.getData().getPath());
+ }
+ else {
+ LOG.debug("Received {} event",
event.getType());
+ }
+ }
+
+ synchronized (cacheLock) {
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ try {
+ JobID jobId =
fromEvent(event);
+ if (jobGraphListener !=
null && !addedJobGraphs.contains(jobId)) {
+ try {
+ //
Whoa! This has been added by someone else. Or we were fast
+ // to
remove it (false positive).
+
jobGraphListener.onAddedJobGraph(jobId);
+ }
+ catch
(Throwable t) {
+
LOG.error("Error in callback", t);
+ }
+ }
+ }
+ catch (Exception e) {
+ LOG.error("Error in
SubmittedJobGraphsPathCacheListener", e);
+ }
+
+ break;
+
+ case CHILD_UPDATED:
+ // TODO What to do if someone
overwrites one of our job graphs? And how
+ // to tell whose it is? Is the
update-time check enough to prevent these
+ // problematic situations.
--- End diff --
Hmm should be logged at least.
> Recover running jobs on JobManager failure
> ------------------------------------------
>
> Key: FLINK-2354
> URL: https://issues.apache.org/jira/browse/FLINK-2354
> Project: Flink
> Issue Type: Sub-task
> Components: JobManager
> Affects Versions: master
> Reporter: Ufuk Celebi
> Assignee: Ufuk Celebi
> Fix For: 0.10
>
>
> tl;dr Persist JobGraphs in state backend and coordinate reference to state
> handle via ZooKeeper.
> Problem: When running multiple JobManagers in high availability mode, the
> leading job manager looses all running jobs when it fails. After a new
> leading job manager is elected, it is not possible to recover any previously
> running jobs.
> Solution: The leading job manager, which receives the job graph writes 1) the
> job graph to a state backend, and 2) a reference to the respective state
> handle to ZooKeeper. In general, job graphs can become large (multiple MBs,
> because they include closures etc.). ZooKeeper is not designed for data of
> this size. The level of indirection via the reference to the state backend
> keeps the data in ZooKeeper small.
> Proposed ZooKeeper layout:
> /flink (default)
> +- currentJobs
> +- job id i
> +- state handle reference of job graph i
> The 'currentJobs' node needs to be persistent to allow recovery of jobs
> between job managers. The currentJobs node needs to satisfy the following
> invariant: There is a reference to a job graph with id i IFF the respective
> job graph needs to be recovered by a newly elected job manager leader.
> With this in place, jobs will be recovered from their initial state (as if
> resubmitted). The next step is to backup the runtime state handles of
> checkpoints in a similar manner.
> ---
> This work will be based on [[email protected]]'s implementation of
> FLINK-2291. The leader election service notifies the job manager about
> granted/revoked leadership. This notification happens via Akka and thus
> serially *per* job manager, but results in eventually consistent state
> between job managers. For some snapshots of time it is possible to have a new
> leader granted leadership, before the old one has been revoked its leadership.
> [[email protected]], can you confirm that leadership does not guarantee
> mutually exclusive access to the shared 'currentJobs' state?
> For example, the following can happen:
> - JM 1 is leader, JM 2 is standby
> - JOB i is running (and hence /flink/currentJobs/i exists)
> - ZK notifies leader election service (LES) of JM 1 and JM 2
> - LES 2 immediately notifies JM 2 about granted leadership, but LES 1
> notification revoking leadership takes longer
> - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives
> final JobStatusChange
> - JM 2 resubmits the job /flink/currentJobs/i
> - JM 1 removes /flink/currentJobs/i, because it is now finished
> => inconsistent state (wrt the specified invariant above)
> If it is indeed a problem, we can circumvent this with a Curator recipe for
> [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to
> coordinate the access to currentJobs. The lock needs to be acquired on
> leadership.
> ---
> Minimum required tests:
> - Unit tests for job graph serialization and writing to state backend and
> ZooKeeper with expected nodes
> - Unit tests for job submission to job manager in leader/non-leader state
> - Unit tests for leadership granting/revoking and job submission/restarting
> interleavings
> - Process failure integration tests with single and multiple running jobs
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)