[ 
https://issues.apache.org/jira/browse/FLINK-10329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614803#comment-16614803
 ] 

ASF GitHub Bot commented on FLINK-10329:
----------------------------------------

asfgit closed pull request #6686: [FLINK-10329] [FLINK-10328] Fail 
ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed & 
Release all locks when stopping the ZooKeeperSubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6686
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
index 326e924b448..d8ad5aba8ea 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
@@ -22,6 +22,7 @@
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Properties;
 
@@ -36,8 +37,8 @@
        @Nonnull
        private final SavepointRestoreSettings savepointRestoreSettings;
 
-       public StandaloneJobClusterConfiguration(@Nonnull String configDir, 
@Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, 
@Nonnull String jobClassName, @Nonnull SavepointRestoreSettings 
savepointRestoreSettings) {
-               super(configDir, dynamicProperties, args, restPort);
+       public StandaloneJobClusterConfiguration(@Nonnull String configDir, 
@Nonnull Properties dynamicProperties, @Nonnull String[] args, @Nullable String 
hostname, int restPort, @Nonnull String jobClassName, @Nonnull 
SavepointRestoreSettings savepointRestoreSettings) {
+               super(configDir, dynamicProperties, args, hostname, restPort);
                this.jobClassName = jobClassName;
                this.savepointRestoreSettings = savepointRestoreSettings;
        }
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
index 3c65ba864ed..17217eff018 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
@@ -32,6 +32,7 @@
 
 import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
 import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
+import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.HOST_OPTION;
 import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION;
 
 /**
@@ -67,6 +68,7 @@ public StandaloneJobClusterConfiguration 
createResult(@Nonnull CommandLine comma
                final Properties dynamicProperties = 
commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
                final String restPortString = 
commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
                final int restPort = Integer.parseInt(restPortString);
+               final String hostname = 
commandLine.getOptionValue(HOST_OPTION.getOpt());
                final String jobClassName = 
commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
                final SavepointRestoreSettings savepointRestoreSettings = 
CliFrontendParser.createSavepointRestoreSettings(commandLine);
 
@@ -74,6 +76,7 @@ public StandaloneJobClusterConfiguration 
createResult(@Nonnull CommandLine comma
                        configDir,
                        dynamicProperties,
                        commandLine.getArgs(),
+                       hostname,
                        restPort,
                        jobClassName,
                        savepointRestoreSettings);
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
 
b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
index 5864c8a985d..6fc5b76f246 100644
--- 
a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
@@ -30,7 +30,7 @@
  * @param <E> type of the thrown exception
  */
 @FunctionalInterface
-public interface BiConsumerWithException<T, U, E extends Throwable> extends 
BiConsumer<T, U> {
+public interface BiConsumerWithException<T, U, E extends Throwable> {
 
        /**
         * Performs this operation on the given arguments.
@@ -39,14 +39,23 @@
         * @param u the second input argument
         * @throws E in case of an error
         */
-       void acceptWithException(T t, U u) throws E;
+       void accept(T t, U u) throws E;
 
-       @Override
-       default void accept(T t, U u) {
-               try {
-                       acceptWithException(t, u);
-               } catch (Throwable e) {
-                       ExceptionUtils.rethrow(e);
-               }
+       /**
+        * Convert a {@link BiConsumerWithException} into a {@link BiConsumer}.
+        *
+        * @param biConsumerWithException BiConsumer with exception to convert 
into a {@link BiConsumer}.
+        * @param <A> first input type
+        * @param <B> second input type
+        * @return {@link BiConsumer} which rethrows all checked exceptions as 
unchecked.
+        */
+       static <A, B> BiConsumer<A, B> unchecked(BiConsumerWithException<A, B, 
?> biConsumerWithException) {
+               return (A a, B b) -> {
+                       try {
+                               biConsumerWithException.accept(a, b);
+                       } catch (Throwable t) {
+                               ExceptionUtils.rethrow(t);
+                       }
+               };
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
 
b/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
index 967c737e584..ccba8a7e774 100644
--- 
a/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
@@ -31,7 +31,7 @@
  * @param <E> type of the exception which can be thrown
  */
 @FunctionalInterface
-public interface BiFunctionWithException<T, U, R, E extends Throwable> extends 
BiFunction<T, U, R> {
+public interface BiFunctionWithException<T, U, R, E extends Throwable> {
 
        /**
         * Apply the given values t and u to obtain the resulting value. The 
operation can
@@ -42,16 +42,25 @@
         * @return result value
         * @throws E if the operation fails
         */
-       R applyWithException(T t, U u) throws E;
+       R apply(T t, U u) throws E;
 
-       default R apply(T t, U u) {
-               try {
-                       return applyWithException(t, u);
-               } catch (Throwable e) {
-                       ExceptionUtils.rethrow(e);
-                       // we have to return a value to please the compiler
-                       // but we will never reach the code here
-                       return null;
-               }
+       /**
+        * Convert at {@link BiFunctionWithException} into a {@link BiFunction}.
+        *
+        * @param biFunctionWithException function with exception to convert 
into a function
+        * @param <A> input type
+        * @param <B> output type
+        * @return {@link BiFunction} which throws all checked exception as an 
unchecked exception.
+        */
+       static <A, B, C> BiFunction<A, B, C> 
unchecked(BiFunctionWithException<A, B, C, ?> biFunctionWithException) {
+               return (A a, B b) -> {
+                       try {
+                               return biFunctionWithException.apply(a, b);
+                       } catch (Throwable t) {
+                               ExceptionUtils.rethrow(t);
+                               // we need this to appease the compiler :-(
+                               return null;
+                       }
+               };
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
 
b/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
deleted file mode 100644
index 09507d4e9f2..00000000000
--- 
a/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util.function;
-
-import org.apache.flink.util.ExceptionUtils;
-
-import java.util.function.Consumer;
-
-/**
- * A checked extension of the {@link Consumer} interface.
- *
- * @param <T> type of the first argument
- * @param <E> type of the thrown exception
- */
-public interface ConsumerWithException<T, E extends Throwable> extends 
Consumer<T> {
-
-       void acceptWithException(T value) throws E;
-
-       @Override
-       default void accept(T value) {
-               try {
-                       acceptWithException(value);
-               } catch (Throwable t) {
-                       ExceptionUtils.rethrow(t);
-               }
-       }
-}
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
new file mode 100644
index 00000000000..678ef9f78b6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Utility class for Flink's functions.
+ */
+public class FunctionUtils {
+
+       private FunctionUtils() {
+               throw new UnsupportedOperationException("This class should 
never be instantiated.");
+       }
+
+       /**
+        * Convert at {@link FunctionWithException} into a {@link Function}.
+        *
+        * @param functionWithException function with exception to convert into 
a function
+        * @param <A> input type
+        * @param <B> output type
+        * @return {@link Function} which throws all checked exception as an 
unchecked exception.
+        */
+       public static <A, B> Function<A, B> 
uncheckedFunction(FunctionWithException<A, B, ?> functionWithException) {
+               return (A value) -> {
+                       try {
+                               return functionWithException.apply(value);
+                       } catch (Throwable t) {
+                               ExceptionUtils.rethrow(t);
+                               // we need this to appease the compiler :-(
+                               return null;
+                       }
+               };
+       }
+
+       /**
+        * Converts a {@link ThrowingConsumer} into a {@link Consumer} which 
throws checked exceptions
+        * as unchecked.
+        *
+        * @param throwingConsumer to convert into a {@link Consumer}
+        * @param <A> input type
+        * @return {@link Consumer} which throws all checked exceptions as 
unchecked
+        */
+       public static <A> Consumer<A> uncheckedConsumer(ThrowingConsumer<A, ?> 
throwingConsumer) {
+               return (A value) -> {
+                       try {
+                               throwingConsumer.accept(value);
+                       } catch (Throwable t) {
+                               ExceptionUtils.rethrow(t);
+                       }
+               };
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java 
b/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
index 4fef4207838..0dd4047a1e5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
@@ -19,6 +19,7 @@
 package org.apache.flink.util.function;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.ExceptionUtils;
 
 /**
  * Similar to a {@link Runnable}, this interface is used to capture a block of 
code
@@ -35,4 +36,21 @@
         * @throws E Exceptions may be thrown.
         */
        void run() throws E;
+
+       /**
+        * Converts a {@link ThrowingRunnable} into a {@link Runnable} which 
throws all checked exceptions
+        * as unchecked.
+        *
+        * @param throwingRunnable to convert into a {@link Runnable}
+        * @return {@link Runnable} which throws all checked exceptions as 
unchecked.
+        */
+       static Runnable unchecked(ThrowingRunnable<?> throwingRunnable) {
+               return () -> {
+                       try {
+                               throwingRunnable.run();
+                       } catch (Throwable t) {
+                               ExceptionUtils.rethrow(t);
+                       }
+               };
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 131733924ae..e443fc21552 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -25,7 +25,7 @@
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.function.ConsumerWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
@@ -246,7 +246,7 @@ public void addCheckpoint(final CompletedCheckpoint 
checkpoint) throws Exception
                LOG.debug("Added {} to {}.", checkpoint, path);
        }
 
-       private void tryRemoveCompletedCheckpoint(CompletedCheckpoint 
completedCheckpoint, ConsumerWithException<CompletedCheckpoint, Exception> 
discardCallback) {
+       private void tryRemoveCompletedCheckpoint(CompletedCheckpoint 
completedCheckpoint, ThrowingConsumer<CompletedCheckpoint, Exception> 
discardCallback) {
                try {
                        if (tryRemove(completedCheckpoint.getCheckpointID())) {
                                executor.execute(() -> {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c31e64c0adc..5279e502a93 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -60,11 +60,13 @@
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.function.ConsumerWithException;
+import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.FunctionUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -128,6 +130,8 @@
 
        private final Map<JobID, CompletableFuture<Void>> 
jobManagerTerminationFutures;
 
+       private CompletableFuture<Void> recoveryOperation = 
CompletableFuture.completedFuture(null);
+
        public Dispatcher(
                        RpcService rpcService,
                        String endpointId,
@@ -629,31 +633,51 @@ private void terminateJobManagerRunners() {
         * Recovers all jobs persisted via the submitted job graph store.
         */
        @VisibleForTesting
-       CompletableFuture<Collection<JobGraph>> recoverJobs() {
+       Collection<JobGraph> recoverJobs() throws Exception {
                log.info("Recovering all persisted jobs.");
-               return FutureUtils.supplyAsync(
-                       () -> {
-                               final Collection<JobID> jobIds = 
submittedJobGraphStore.getJobIds();
-
-                               final List<JobGraph> jobGraphs = new 
ArrayList<>(jobIds.size());
+               final Collection<JobID> jobIds = 
submittedJobGraphStore.getJobIds();
 
-                               for (JobID jobId : jobIds) {
-                                       jobGraphs.add(recoverJob(jobId));
+               try {
+                       return recoverJobGraphs(jobIds);
+               } catch (Exception e) {
+                       // release all recovered job graphs
+                       for (JobID jobId : jobIds) {
+                               try {
+                                       
submittedJobGraphStore.releaseJobGraph(jobId);
+                               } catch (Exception ie) {
+                                       e.addSuppressed(ie);
                                }
+                       }
+                       throw e;
+               }
+       }
 
-                               return jobGraphs;
-                       },
-                       getRpcService().getExecutor());
+       @Nonnull
+       private Collection<JobGraph> recoverJobGraphs(Collection<JobID> jobIds) 
throws Exception {
+               final List<JobGraph> jobGraphs = new ArrayList<>(jobIds.size());
+
+               for (JobID jobId : jobIds) {
+                       final JobGraph jobGraph = recoverJob(jobId);
+
+                       if (jobGraph == null) {
+                               throw new FlinkJobNotFoundException(jobId);
+                       }
+
+                       jobGraphs.add(jobGraph);
+               }
+
+               return jobGraphs;
        }
 
+       @Nullable
        private JobGraph recoverJob(JobID jobId) throws Exception {
                log.debug("Recover job {}.", jobId);
-               SubmittedJobGraph submittedJobGraph = 
submittedJobGraphStore.recoverJobGraph(jobId);
+               final SubmittedJobGraph submittedJobGraph = 
submittedJobGraphStore.recoverJobGraph(jobId);
 
                if (submittedJobGraph != null) {
                        return submittedJobGraph.getJobGraph();
                } else {
-                       throw new FlinkJobNotFoundException(jobId);
+                       return null;
                }
        }
 
@@ -768,27 +792,40 @@ private void jobMasterFailed(JobID jobId, Throwable 
cause) {
         */
        @Override
        public void grantLeadership(final UUID newLeaderSessionID) {
-               log.info("Dispatcher {} was granted leadership with fencing 
token {}", getAddress(), newLeaderSessionID);
+               runAsyncWithoutFencing(
+                       () -> {
+                               log.info("Dispatcher {} was granted leadership 
with fencing token {}", getAddress(), newLeaderSessionID);
 
-               final CompletableFuture<Collection<JobGraph>> 
recoveredJobsFuture = recoverJobs();
+                               final CompletableFuture<Collection<JobGraph>> 
recoveredJobsFuture = recoveryOperation.thenApplyAsync(
+                                       FunctionUtils.uncheckedFunction(ignored 
-> recoverJobs()),
+                                       getRpcService().getExecutor());
 
-               final CompletableFuture<Boolean> fencingTokenFuture = 
recoveredJobsFuture.thenComposeAsync(
-                       (Collection<JobGraph> recoveredJobs) -> 
tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
-                       getUnfencedMainThreadExecutor());
+                               final CompletableFuture<Boolean> 
fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
+                                       (Collection<JobGraph> recoveredJobs) -> 
tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
+                                       getUnfencedMainThreadExecutor());
 
-               final CompletableFuture<Void> confirmationFuture = 
fencingTokenFuture.thenAcceptAsync(
-                       (Boolean confirmLeadership) -> {
-                               if (confirmLeadership) {
-                                       
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
-                               }
-                       },
-                       getRpcService().getExecutor());
+                               final CompletableFuture<Void> 
confirmationFuture = fencingTokenFuture.thenCombineAsync(
+                                       recoveredJobsFuture,
+                                       
BiFunctionWithException.unchecked((Boolean confirmLeadership, 
Collection<JobGraph> recoveredJobs) -> {
+                                               if (confirmLeadership) {
+                                                       
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+                                               } else {
+                                                       for (JobGraph 
recoveredJob : recoveredJobs) {
+                                                               
submittedJobGraphStore.releaseJobGraph(recoveredJob.getJobID());
+                                                       }
+                                               }
+                                               return null;
+                                       }),
+                                       getRpcService().getExecutor());
+
+                               confirmationFuture.whenComplete(
+                                       (Void ignored, Throwable throwable) -> {
+                                               if (throwable != null) {
+                                                       
onFatalError(ExceptionUtils.stripCompletionException(throwable));
+                                               }
+                                       });
 
-               confirmationFuture.whenComplete(
-                       (Void ignored, Throwable throwable) -> {
-                               if (throwable != null) {
-                                       
onFatalError(ExceptionUtils.stripCompletionException(throwable));
-                               }
+                               recoveryOperation = confirmationFuture;
                        });
        }
 
@@ -813,7 +850,7 @@ public void grantLeadership(final UUID newLeaderSessionID) {
                }
        }
 
-       private CompletableFuture<Void> waitForTerminatingJobManager(JobID 
jobId, JobGraph jobGraph, ConsumerWithException<JobGraph, ?> action) {
+       private CompletableFuture<Void> waitForTerminatingJobManager(JobID 
jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
                final CompletableFuture<Void> jobManagerTerminationFuture = 
getJobTerminationFuture(jobId)
                        .exceptionally((Throwable throwable) -> {
                                throw new CompletionException(
@@ -822,14 +859,14 @@ public void grantLeadership(final UUID 
newLeaderSessionID) {
                                                throwable)); });
 
                return jobManagerTerminationFuture.thenRunAsync(
-                       () -> {
+                       ThrowingRunnable.unchecked(() -> {
                                jobManagerTerminationFutures.remove(jobId);
                                action.accept(jobGraph);
-                       },
+                       }),
                        getMainThreadExecutor());
        }
 
-       protected CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+       CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
                if (jobManagerRunners.containsKey(jobId)) {
                        return FutureUtils.completedExceptionally(new 
DispatcherException(String.format("Job with job id %s is still running.", 
jobId)));
                } else {
@@ -837,6 +874,11 @@ public void grantLeadership(final UUID newLeaderSessionID) 
{
                }
        }
 
+       @VisibleForTesting
+       CompletableFuture<Void> getRecoveryOperation() {
+               return recoveryOperation;
+       }
+
        private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
                // clear the state if we've been the leader before
                if (getFencingToken() != null) {
@@ -879,24 +921,63 @@ public void handleError(final Exception exception) {
 
        @Override
        public void onAddedJobGraph(final JobID jobId) {
-               final CompletableFuture<SubmittedJobGraph> recoveredJob = 
getRpcService().execute(
-                       () -> submittedJobGraphStore.recoverJobGraph(jobId));
-
-               final CompletableFuture<Acknowledge> submissionFuture = 
recoveredJob.thenComposeAsync(
-                       (SubmittedJobGraph submittedJobGraph) -> 
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
-                       getMainThreadExecutor());
-
-               submissionFuture.whenComplete(
-                       (Acknowledge acknowledge, Throwable throwable) -> {
-                               if (throwable != null) {
-                                       onFatalError(
-                                               new DispatcherException(
-                                                       String.format("Could 
not start the added job %s", jobId),
-                                                       
ExceptionUtils.stripCompletionException(throwable)));
+               runAsync(
+                       () -> {
+                               if (!jobManagerRunners.containsKey(jobId)) {
+                                       // IMPORTANT: onAddedJobGraph can 
generate false positives and, thus, we must expect that
+                                       // the specified job is already removed 
from the SubmittedJobGraphStore. In this case,
+                                       // SubmittedJobGraphStore.recoverJob 
returns null.
+                                       final 
CompletableFuture<Optional<JobGraph>> recoveredJob = 
recoveryOperation.thenApplyAsync(
+                                               
FunctionUtils.uncheckedFunction(ignored -> 
Optional.ofNullable(recoverJob(jobId))),
+                                               getRpcService().getExecutor());
+
+                                       final DispatcherId dispatcherId = 
getFencingToken();
+                                       final CompletableFuture<Void> 
submissionFuture = recoveredJob.thenComposeAsync(
+                                               (Optional<JobGraph> 
jobGraphOptional) -> jobGraphOptional.map(
+                                                       
FunctionUtils.uncheckedFunction(jobGraph -> tryRunRecoveredJobGraph(jobGraph, 
dispatcherId).thenAcceptAsync(
+                                                               
FunctionUtils.uncheckedConsumer((Boolean isRecoveredJobRunning) -> {
+                                                                               
if (!isRecoveredJobRunning) {
+                                                                               
        submittedJobGraphStore.releaseJobGraph(jobId);
+                                                                               
}
+                                                                       }),
+                                                                       
getRpcService().getExecutor())))
+                                                       
.orElse(CompletableFuture.completedFuture(null)),
+                                               
getUnfencedMainThreadExecutor());
+
+                                       submissionFuture.whenComplete(
+                                               (Void ignored, Throwable 
throwable) -> {
+                                                       if (throwable != null) {
+                                                               onFatalError(
+                                                                       new 
DispatcherException(
+                                                                               
String.format("Could not start the added job %s", jobId),
+                                                                               
ExceptionUtils.stripCompletionException(throwable)));
+                                                       }
+                                               });
+
+                                       recoveryOperation = submissionFuture;
                                }
                        });
        }
 
+       private CompletableFuture<Boolean> tryRunRecoveredJobGraph(JobGraph 
jobGraph, DispatcherId dispatcherId) throws Exception {
+               if (leaderElectionService.hasLeadership(dispatcherId.toUUID())) 
{
+                       final JobID jobId = jobGraph.getJobID();
+                       if (jobManagerRunners.containsKey(jobId)) {
+                               // we must not release the job graph lock since 
it can only be locked once and
+                               // is currently being executed. Once we support 
multiple locks, we must release
+                               // the JobGraph here
+                               log.debug("Ignore added JobGraph because the 
job {} is already running.", jobId);
+                               return CompletableFuture.completedFuture(true);
+                       } else if 
(runningJobsRegistry.getJobSchedulingStatus(jobId) != 
RunningJobsRegistry.JobSchedulingStatus.DONE) {
+                               return waitForTerminatingJobManager(jobId, 
jobGraph, this::runJob).thenApply(ignored -> true);
+                       } else {
+                               log.debug("Ignore added JobGraph because the 
job {} has already been completed.", jobId);
+                       }
+               }
+
+               return CompletableFuture.completedFuture(false);
+       }
+
        @Override
        public void onRemovedJobGraph(final JobID jobId) {
                runAsync(() -> {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index ddd3751cc2a..0fd4389fc35 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -708,6 +708,12 @@ protected static Configuration 
loadConfiguration(EntrypointClusterConfiguration
                        configuration.setInteger(RestOptions.PORT, restPort);
                }
 
+               final String hostname = 
entrypointClusterConfiguration.getHostname();
+
+               if (hostname != null) {
+                       configuration.setString(JobManagerOptions.ADDRESS, 
hostname);
+               }
+
                return configuration;
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
index 75cad7aa946..3472f35d6a2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.entrypoint;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Properties;
 
@@ -27,14 +28,23 @@
  */
 public class EntrypointClusterConfiguration extends ClusterConfiguration {
 
+       @Nullable
+       private final String hostname;
+
        private final int restPort;
 
-       public EntrypointClusterConfiguration(@Nonnull String configDir, 
@Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort) {
+       public EntrypointClusterConfiguration(@Nonnull String configDir, 
@Nonnull Properties dynamicProperties, @Nonnull String[] args, @Nullable String 
hostname, int restPort) {
                super(configDir, dynamicProperties, args);
+               this.hostname = hostname;
                this.restPort = restPort;
        }
 
        public int getRestPort() {
                return restPort;
        }
+
+       @Nullable
+       public String getHostname() {
+               return hostname;
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
index 7dfb784a79c..52f59eeef8b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
@@ -29,6 +29,8 @@
 
 import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
 import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
+import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.EXECUTION_MODE_OPTION;
+import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.HOST_OPTION;
 import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION;
 
 /**
@@ -42,6 +44,8 @@ public Options getOptions() {
                options.addOption(CONFIG_DIR_OPTION);
                options.addOption(REST_PORT_OPTION);
                options.addOption(DYNAMIC_PROPERTY_OPTION);
+               options.addOption(HOST_OPTION);
+               options.addOption(EXECUTION_MODE_OPTION);
 
                return options;
        }
@@ -52,11 +56,13 @@ public EntrypointClusterConfiguration createResult(@Nonnull 
CommandLine commandL
                final Properties dynamicProperties = 
commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
                final String restPortStr = 
commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
                final int restPort = Integer.parseInt(restPortStr);
+               final String hostname = 
commandLine.getOptionValue(HOST_OPTION.getOpt());
 
                return new EntrypointClusterConfiguration(
                        configDir,
                        dynamicProperties,
                        commandLine.getArgs(),
+                       hostname,
                        restPort);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
index 23c9da2485f..443014b9903 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
@@ -48,5 +48,26 @@
                .desc("use value for given property")
                .build();
 
+       public static final Option HOST_OPTION = Option.builder("h")
+               .longOpt("host")
+               .required(false)
+               .hasArg(true)
+               .argName("hostname")
+               .desc("Hostname for the RPC service.")
+               .build();
+
+       /**
+        * @deprecated exists only for compatibility with legacy mode. Remove 
once legacy mode
+        * and execution mode option has been removed.
+        */
+       @Deprecated
+       public static final Option EXECUTION_MODE_OPTION = Option.builder("x")
+               .longOpt("executionMode")
+               .required(false)
+               .hasArg(true)
+               .argName("execution mode")
+               .desc("Deprecated option")
+               .build();
+
        private CommandLineOptions() {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 2b935af229a..2fd19fbacf5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -22,6 +22,7 @@
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
 import org.apache.curator.framework.CuratorFramework;
@@ -146,9 +147,23 @@ public void stop() throws Exception {
                                jobGraphListener = null;
 
                                try {
-                                       pathCache.close();
-                               } catch (Exception e) {
-                                       throw new Exception("Could not properly 
stop the ZooKeeperSubmittedJobGraphStore.", e);
+                                       Exception exception = null;
+
+                                       try {
+                                               
jobGraphsInZooKeeper.releaseAll();
+                                       } catch (Exception e) {
+                                               exception = e;
+                                       }
+
+                                       try {
+                                               pathCache.close();
+                                       } catch (Exception e) {
+                                               exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
+                                       }
+
+                                       if (exception != null) {
+                                               throw new FlinkException("Could 
not properly stop the ZooKeeperSubmittedJobGraphStore.", exception);
+                                       }
                                } finally {
                                        isRunning = false;
                                }
@@ -264,9 +279,11 @@ public void removeJobGraph(JobID jobId) throws Exception {
 
                synchronized (cacheLock) {
                        if (addedJobGraphs.contains(jobId)) {
-                               jobGraphsInZooKeeper.releaseAndTryRemove(path);
-
-                               addedJobGraphs.remove(jobId);
+                               if 
(jobGraphsInZooKeeper.releaseAndTryRemove(path)) {
+                                       addedJobGraphs.remove(jobId);
+                               } else {
+                                       throw new 
FlinkException(String.format("Could not remove job graph with job id %s from 
ZooKeeper.", jobId));
+                               }
                        }
                }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 4a66d32a2ac..736984e88e7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1483,7 +1483,7 @@ private void rescaleJobGraph(Collection<JobVertexID> 
operators, int newParalleli
                                
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
                        }
 
-                       rescalingBehaviour.acceptWithException(jobVertex, 
newParallelism);
+                       rescalingBehaviour.accept(jobVertex, newParallelism);
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
index 7de956081d8..64e2ffa1124 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
@@ -29,7 +29,7 @@
        // rescaling is only executed if the operator can be set to the given 
parallelism
        STRICT {
                @Override
-               public void acceptWithException(JobVertex jobVertex, Integer 
newParallelism) throws FlinkException {
+               public void accept(JobVertex jobVertex, Integer newParallelism) 
throws FlinkException {
                        if (jobVertex.getMaxParallelism() < newParallelism) {
                                throw new FlinkException("Cannot rescale vertex 
" + jobVertex.getName() +
                                        " because its maximum parallelism " + 
jobVertex.getMaxParallelism() +
@@ -42,7 +42,7 @@ public void acceptWithException(JobVertex jobVertex, Integer 
newParallelism) thr
        // the new parallelism will be the minimum of the given parallelism and 
the maximum parallelism
        RELAXED {
                @Override
-               public void acceptWithException(JobVertex jobVertex, Integer 
newParallelism) {
+               public void accept(JobVertex jobVertex, Integer newParallelism) 
{
                        
jobVertex.setParallelism(Math.min(jobVertex.getMaxParallelism(), 
newParallelism));
                }
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 28af072a10c..38da82cb7fb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -178,6 +178,7 @@ protected void initChannel(SocketChannel ch) {
                                .channel(NioServerSocketChannel.class)
                                .childHandler(initializer);
 
+                       log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
                        final ChannelFuture channel;
                        if (restBindAddress == null) {
                                channel = bootstrap.bind(restBindPort);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 4d783352459..9c8f7bd1428 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -23,6 +23,7 @@
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -89,6 +90,23 @@ public static ActorGateway retrieveLeaderGateway(
                }
        }
 
+       /**
+        * Retrieves the leader akka url and the current leader session ID. The 
values are stored in a
+        * {@link LeaderConnectionInfo} instance.
+        *
+        * @param leaderRetrievalService Leader retrieval service to retrieve 
the leader connection
+        *                               information
+        * @param timeout Timeout when to give up looking for the leader
+        * @return LeaderConnectionInfo containing the leader's akka URL and 
the current leader session
+        * ID
+        * @throws LeaderRetrievalException
+        */
+       public static LeaderConnectionInfo retrieveLeaderConnectionInfo(
+                       LeaderRetrievalService leaderRetrievalService,
+                       Time timeout) throws LeaderRetrievalException {
+               return retrieveLeaderConnectionInfo(leaderRetrievalService, 
FutureUtils.toFiniteDuration(timeout));
+       }
+
        /**
         * Retrieves the leader akka url and the current leader session ID. The 
values are stored in a
         * {@link LeaderConnectionInfo} instance.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 8c3d31fc51b..b9cd0c1b720 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -526,22 +526,13 @@ protected String getLockPath(String rootPath) {
                                
client.create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path));
                        } catch (KeeperException.NodeExistsException ignored) {
                                // we have already created the lock
-                       } catch (KeeperException.NoNodeException e) {
-                               throw new Exception("Cannot lock the node " + 
path + " since it does not exist.", e);
                        }
                }
 
                boolean success = false;
 
                try {
-                       byte[] data;
-
-                       try {
-                               data = client.getData().forPath(path);
-                       } catch (Exception e) {
-                               throw new Exception("Failed to retrieve state 
handle data under " + path +
-                                       " from ZooKeeper.", e);
-                       }
+                       byte[] data = client.getData().forPath(path);
 
                        try {
                                RetrievableStateHandle<T> 
retrievableStateHandle = InstantiationUtil.deserializeObject(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java
new file mode 100644
index 00000000000..92bf1afab68
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+
+import java.io.Serializable;
+
+/**
+ * {@link RetrievableStateStorageHelper} implementation for testing purposes.
+ *
+ * @param <T> type of the element to store
+ */
+public final class TestingRetrievableStateStorageHelper<T extends 
Serializable> implements RetrievableStateStorageHelper<T> {
+
+       @Override
+       public RetrievableStateHandle<T> store(T state) {
+               return new TestingRetrievableStateHandle<>(state);
+       }
+
+       private static final class TestingRetrievableStateHandle<T extends 
Serializable> implements RetrievableStateHandle<T> {
+
+               private static final long serialVersionUID = 
137053380713794300L;
+
+               private final T state;
+
+               private TestingRetrievableStateHandle(T state) {
+                       this.state = state;
+               }
+
+               @Override
+               public T retrieveState() {
+                       return state;
+               }
+
+               @Override
+               public void discardState() {
+                       // no op
+               }
+
+               @Override
+               public long getStateSize() {
+                       return 0;
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index f992d3b00c0..a9cba8861c0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -22,10 +22,8 @@
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
 import org.apache.flink.util.TestLogger;
 
@@ -36,8 +34,6 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.IOException;
-import java.io.Serializable;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -129,36 +125,4 @@ private ZooKeeperCompletedCheckpointStore 
createZooKeeperCheckpointStore(Curator
                        Executors.directExecutor());
        }
 
-       private static final class TestingRetrievableStateStorageHelper<T 
extends Serializable> implements RetrievableStateStorageHelper<T> {
-               @Override
-               public RetrievableStateHandle<T> store(T state) {
-                       return new TestingRetrievableStateHandle<>(state);
-               }
-
-               private static class TestingRetrievableStateHandle<T extends 
Serializable> implements RetrievableStateHandle<T> {
-
-                       private static final long serialVersionUID = 
137053380713794300L;
-
-                       private final T state;
-
-                       private TestingRetrievableStateHandle(T state) {
-                               this.state = state;
-                       }
-
-                       @Override
-                       public T retrieveState() throws IOException, 
ClassNotFoundException {
-                               return state;
-                       }
-
-                       @Override
-                       public void discardState() throws Exception {
-                               // no op
-                       }
-
-                       @Override
-                       public long getStateSize() {
-                               return 0;
-                       }
-               }
-       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index cb26f4862b1..335199a2807 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -302,9 +302,7 @@ public void removeJobGraph(JobID jobId) throws Exception {
                }
 
                @Override
-               public void releaseJobGraph(JobID jobId) throws Exception {
-                       throw new UnsupportedOperationException("Should not be 
called.");
-               }
+               public void releaseJobGraph(JobID jobId) throws Exception {}
 
                @Override
                public Collection<JobID> getJobIds() throws Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index d405fcdcf44..1af10b8c598 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -319,13 +319,13 @@ public void testOnAddedJobGraphWithFinishedJob() throws 
Throwable {
                runningJobsRegistry.setJobFinished(TEST_JOB_ID);
                dispatcher.onAddedJobGraph(TEST_JOB_ID);
 
-               final CompletableFuture<Throwable> errorFuture = 
fatalErrorHandler.getErrorFuture();
-
-               final Throwable throwable = errorFuture.get();
+               // wait until the recovery is over
+               dispatcher.getRecoverOperationFuture(TIMEOUT).get();
 
-               assertThat(throwable, instanceOf(DispatcherException.class));
+               final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
-               fatalErrorHandler.clearError();
+               // check that we did not start executing the added JobGraph
+               assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), 
is(empty()));
        }
 
        /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index 5141be039f7..6a623768bb2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.dispatcher;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -73,16 +72,20 @@
                        VoidHistoryServerArchivist.INSTANCE);
        }
 
-       @VisibleForTesting
        void completeJobExecution(ArchivedExecutionGraph 
archivedExecutionGraph) {
                runAsync(
                        () -> 
jobReachedGloballyTerminalState(archivedExecutionGraph));
        }
 
-       @VisibleForTesting
-       public CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID 
jobId, @Nonnull Time timeout) {
+       CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID jobId, 
@Nonnull Time timeout) {
                return callAsyncWithoutFencing(
                        () -> getJobTerminationFuture(jobId),
                        timeout).thenCompose(Function.identity());
        }
+
+       CompletableFuture<Void> getRecoverOperationFuture(@Nonnull Time 
timeout) {
+               return callAsyncWithoutFencing(
+                       this::getRecoveryOperation,
+                       timeout).thenCompose(Function.identity());
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
index dd0375886a9..b5662c064e8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -24,17 +24,25 @@
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
@@ -56,8 +64,13 @@
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -67,8 +80,8 @@
 
        private static final Time TIMEOUT = Time.seconds(10L);
 
-       @ClassRule
-       public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new 
ZooKeeperResource();
+       @Rule
+       public final ZooKeeperResource zooKeeperResource = new 
ZooKeeperResource();
 
        @ClassRule
        public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
@@ -87,14 +100,14 @@
        @BeforeClass
        public static void setupClass() throws IOException {
                configuration = new Configuration();
-               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
ZOO_KEEPER_RESOURCE.getConnectString());
                
configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
TEMPORARY_FOLDER.newFolder().getAbsolutePath());
                rpcService = new TestingRpcService();
                blobServer = new BlobServer(configuration, new VoidBlobStore());
        }
 
        @Before
-       public void setup() {
+       public void setup() throws Exception {
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zooKeeperResource.getConnectString());
                testingFatalErrorHandler = new TestingFatalErrorHandler();
        }
 
@@ -139,7 +152,9 @@ public void testSubmittedJobGraphRelease() throws Exception 
{
                        final TestingLeaderElectionService 
leaderElectionService = new TestingLeaderElectionService();
                        
testingHighAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
 
-                       final TestingDispatcher dispatcher = 
createDispatcher(testingHighAvailabilityServices);
+                       final TestingDispatcher dispatcher = createDispatcher(
+                               testingHighAvailabilityServices,
+                               new TestingJobManagerRunnerFactory(new 
CompletableFuture<>(), new CompletableFuture<>()));
 
                        dispatcher.start();
 
@@ -167,7 +182,7 @@ public void testSubmittedJobGraphRelease() throws Exception 
{
                                // recover the job
                                final SubmittedJobGraph submittedJobGraph = 
otherSubmittedJobGraphStore.recoverJobGraph(jobId);
 
-                               assertThat(submittedJobGraph, 
Matchers.is(Matchers.notNullValue()));
+                               assertThat(submittedJobGraph, 
is(notNullValue()));
 
                                // check that the other submitted job graph 
store can remove the job graph after the original leader
                                // has lost its leadership
@@ -184,20 +199,145 @@ public void testSubmittedJobGraphRelease() throws 
Exception {
                }
        }
 
+       /**
+        * Tests that a standby Dispatcher does not interfere with the clean up 
of a completed
+        * job.
+        */
+       @Test
+       public void testStandbyDispatcherJobExecution() throws Exception {
+               try (final TestingHighAvailabilityServices haServices1 = new 
TestingHighAvailabilityServices();
+                       final TestingHighAvailabilityServices haServices2 = new 
TestingHighAvailabilityServices();
+                       final CuratorFramework curatorFramework = 
ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+                       final ZooKeeperSubmittedJobGraphStore 
submittedJobGraphStore1 = 
ZooKeeperUtils.createSubmittedJobGraphs(curatorFramework, configuration);
+                       
haServices1.setSubmittedJobGraphStore(submittedJobGraphStore1);
+                       final TestingLeaderElectionService 
leaderElectionService1 = new TestingLeaderElectionService();
+                       
haServices1.setDispatcherLeaderElectionService(leaderElectionService1);
+
+                       final ZooKeeperSubmittedJobGraphStore 
submittedJobGraphStore2 = 
ZooKeeperUtils.createSubmittedJobGraphs(curatorFramework, configuration);
+                       
haServices2.setSubmittedJobGraphStore(submittedJobGraphStore2);
+                       final TestingLeaderElectionService 
leaderElectionService2 = new TestingLeaderElectionService();
+                       
haServices2.setDispatcherLeaderElectionService(leaderElectionService2);
+
+                       final CompletableFuture<JobGraph> jobGraphFuture = new 
CompletableFuture<>();
+                       final CompletableFuture<ArchivedExecutionGraph> 
resultFuture = new CompletableFuture<>();
+                       final TestingDispatcher dispatcher1 = createDispatcher(
+                               haServices1,
+                               new 
TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture));
+
+                       final TestingDispatcher dispatcher2 = createDispatcher(
+                               haServices2,
+                               new TestingJobManagerRunnerFactory(new 
CompletableFuture<>(), new CompletableFuture<>()));
+
+                       try {
+                               dispatcher1.start();
+                               dispatcher2.start();
+
+                               
leaderElectionService1.isLeader(UUID.randomUUID()).get();
+                               final DispatcherGateway dispatcherGateway1 = 
dispatcher1.getSelfGateway(DispatcherGateway.class);
+
+                               final JobGraph jobGraph = 
DispatcherHATest.createNonEmptyJobGraph();
+
+                               dispatcherGateway1.submitJob(jobGraph, 
TIMEOUT).get();
+
+                               final CompletableFuture<JobResult> 
jobResultFuture = dispatcherGateway1.requestJobResult(jobGraph.getJobID(), 
TIMEOUT);
+
+                               jobGraphFuture.get();
+
+                               // complete the job
+                               resultFuture.complete(new 
ArchivedExecutionGraphBuilder().setJobID(jobGraph.getJobID()).setState(JobStatus.FINISHED).build());
+
+                               final JobResult jobResult = 
jobResultFuture.get();
+
+                               assertThat(jobResult.isSuccess(), is(true));
+
+                               // wait for the completion of the job
+                               
dispatcher1.getJobTerminationFuture(jobGraph.getJobID(), TIMEOUT).get();
+
+                               // change leadership
+                               leaderElectionService1.notLeader();
+                               
leaderElectionService2.isLeader(UUID.randomUUID()).get();
+
+                               // Dispatcher 2 should not recover any jobs
+                               final DispatcherGateway dispatcherGateway2 = 
dispatcher2.getSelfGateway(DispatcherGateway.class);
+                               
assertThat(dispatcherGateway2.listJobs(TIMEOUT).get(), is(empty()));
+                       } finally {
+                               RpcUtils.terminateRpcEndpoint(dispatcher1, 
TIMEOUT);
+                               RpcUtils.terminateRpcEndpoint(dispatcher2, 
TIMEOUT);
+                       }
+               }
+       }
+
+       /**
+        * Tests that a standby {@link Dispatcher} can recover all submitted 
jobs.
+        */
+       @Test
+       public void testStandbyDispatcherJobRecovery() throws Exception {
+               try (CuratorFramework curatorFramework = 
ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+                       HighAvailabilityServices haServices = null;
+                       Dispatcher dispatcher1 = null;
+                       Dispatcher dispatcher2 = null;
+
+                       try {
+                               haServices = new 
ZooKeeperHaServices(curatorFramework, rpcService.getExecutor(), configuration, 
new VoidBlobStore());
+
+                               final CompletableFuture<JobGraph> 
jobGraphFuture1 = new CompletableFuture<>();
+                               dispatcher1 = createDispatcher(
+                                       haServices,
+                                       new 
TestingJobManagerRunnerFactory(jobGraphFuture1, new CompletableFuture<>()));
+                               final CompletableFuture<JobGraph> 
jobGraphFuture2 = new CompletableFuture<>();
+                               dispatcher2 = createDispatcher(
+                                       haServices,
+                                       new 
TestingJobManagerRunnerFactory(jobGraphFuture2, new CompletableFuture<>()));
+
+                               dispatcher1.start();
+                               dispatcher2.start();
+
+                               final LeaderConnectionInfo leaderConnectionInfo 
= 
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(),
 TIMEOUT);
+
+                               final DispatcherGateway dispatcherGateway = 
rpcService.connect(leaderConnectionInfo.getAddress(), 
DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()), 
DispatcherGateway.class).get();
+
+                               final JobGraph nonEmptyJobGraph = 
DispatcherHATest.createNonEmptyJobGraph();
+                               dispatcherGateway.submitJob(nonEmptyJobGraph, 
TIMEOUT).get();
+
+                               if 
(dispatcher1.getAddress().equals(leaderConnectionInfo.getAddress())) {
+                                       dispatcher1.shutDown();
+                                       
assertThat(jobGraphFuture2.get().getJobID(), 
is(equalTo(nonEmptyJobGraph.getJobID())));
+                               } else {
+                                       dispatcher2.shutDown();
+                                       
assertThat(jobGraphFuture1.get().getJobID(), 
is(equalTo(nonEmptyJobGraph.getJobID())));
+                               }
+                       } finally {
+                               if (dispatcher1 != null) {
+                                       
RpcUtils.terminateRpcEndpoint(dispatcher1, TIMEOUT);
+                               }
+
+                               if (dispatcher2 != null) {
+                                       
RpcUtils.terminateRpcEndpoint(dispatcher2, TIMEOUT);
+                               }
+
+                               if (haServices != null) {
+                                       haServices.close();
+                               }
+                       }
+               }
+       }
+
        @Nonnull
-       private TestingDispatcher 
createDispatcher(TestingHighAvailabilityServices 
testingHighAvailabilityServices) throws Exception {
+       private TestingDispatcher createDispatcher(HighAvailabilityServices 
highAvailabilityServices, Dispatcher.JobManagerRunnerFactory 
jobManagerRunnerFactory) throws Exception {
                return new TestingDispatcher(
                        rpcService,
-                       Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
+                       Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName() 
+ UUID.randomUUID(),
                        configuration,
-                       testingHighAvailabilityServices,
+                       highAvailabilityServices,
                        new TestingResourceManagerGateway(),
                        blobServer,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
                        null,
                        new MemoryArchivedExecutionGraphStore(),
-                       new TestingJobManagerRunnerFactory(new 
CompletableFuture<>(), new CompletableFuture<>()),
+                       jobManagerRunnerFactory,
                        testingFatalErrorHandler);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
index da63b7fe046..906e9d5a663 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
@@ -46,7 +46,7 @@ public void testEntrypointClusterConfigurationParsing() 
throws FlinkParseExcepti
                final String value = "value";
                final String arg1 = "arg1";
                final String arg2 = "arg2";
-               final String[] args = {"--configDir", configDir, "-r", 
String.valueOf(restPort), String.format("-D%s=%s", key, value), arg1, arg2};
+               final String[] args = {"--configDir", configDir, 
"--executionMode", "cluster", "--host", "localhost",  "-r", 
String.valueOf(restPort), String.format("-D%s=%s", key, value), arg1, arg2};
 
                final EntrypointClusterConfiguration clusterConfiguration = 
commandLineParser.parse(args);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
new file mode 100644
index 00000000000..fae84591b55
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import 
org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link ZooKeeperSubmittedJobGraphStore}.
+ */
+public class ZooKeeperSubmittedJobGraphStoreTest extends TestLogger {
+
+       @Rule
+       public ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
+
+       private Configuration configuration;
+
+       @Before
+       public void setup() {
+               configuration = new Configuration();
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zooKeeperResource.getConnectString());
+       }
+
+       /**
+        * Tests that we fail with an exception if the job cannot be removed 
from the
+        * ZooKeeperSubmittedJobGraphStore.
+        *
+        * <p>Tests that a close ZooKeeperSubmittedJobGraphStore no longer 
holds any locks.
+        */
+       @Test
+       public void testJobGraphRemovalFailureAndLockRelease() throws Exception 
{
+               try (final CuratorFramework client = 
ZooKeeperUtils.startCuratorFramework(configuration)) {
+                       final 
TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage = new 
TestingRetrievableStateStorageHelper<>();
+                       final ZooKeeperSubmittedJobGraphStore 
submittedJobGraphStore = createSubmittedJobGraphStore(client, stateStorage);
+                       submittedJobGraphStore.start(null);
+                       final ZooKeeperSubmittedJobGraphStore 
otherSubmittedJobGraphStore = createSubmittedJobGraphStore(client, 
stateStorage);
+                       otherSubmittedJobGraphStore.start(null);
+
+                       final SubmittedJobGraph jobGraph = new 
SubmittedJobGraph(new JobGraph(), null);
+                       submittedJobGraphStore.putJobGraph(jobGraph);
+
+                       final SubmittedJobGraph recoveredJobGraph = 
otherSubmittedJobGraphStore.recoverJobGraph(jobGraph.getJobId());
+
+                       assertThat(recoveredJobGraph, is(notNullValue()));
+
+                       try {
+                               
otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobId());
+                               fail("It should not be possible to remove the 
JobGraph since the first store still has a lock on it.");
+                       } catch (Exception ignored) {
+                               // expected
+                       }
+
+                       submittedJobGraphStore.stop();
+
+                       // now we should be able to delete the job graph
+                       
otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobId());
+
+                       
assertThat(otherSubmittedJobGraphStore.recoverJobGraph(recoveredJobGraph.getJobId()),
 is(nullValue()));
+
+                       otherSubmittedJobGraphStore.stop();
+               }
+       }
+
+       @Nonnull
+       public ZooKeeperSubmittedJobGraphStore 
createSubmittedJobGraphStore(CuratorFramework client, 
TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws 
Exception {
+               return new ZooKeeperSubmittedJobGraphStore(
+                       client,
+                       "/foobar",
+                       stateStorage);
+       }
+
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
index 3b9c5786ca4..bf8751547ee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -76,7 +76,7 @@ public synchronized SubmittedJobGraph recoverJobGraph(JobID 
jobId) throws Except
                verifyIsStarted();
 
                if (recoverJobGraphFunction != null) {
-                       return 
recoverJobGraphFunction.applyWithException(jobId, storedJobs);
+                       return recoverJobGraphFunction.apply(jobId, storedJobs);
                } else {
                        return requireNonNull(
                                storedJobs.get(jobId),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fail with exception if job cannot be removed by 
> ZooKeeperSubmittedJobGraphStore#removeJobGraph
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-10329
>                 URL: https://issues.apache.org/jira/browse/FLINK-10329
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.5.3, 1.6.0, 1.7.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Callers of {{ZooKeeperSubmittedJobGraph#removeJobGraph}} expect that we fail 
> with an exception if the {{JobGraph}} cannot be removed. This is not the case 
> since we call internally {{ZooKeeperStateHandleStore#releaseAndTryRemove}}. 
> If this method returns {{false}}, then we need to fail with an exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to