asfgit closed pull request #6699: [FLINK-10314] Making JobManagerRunner 
creation non-blocking in Dispatcher
URL: https://github.com/apache/flink/pull/6699
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
index 678ef9f78b6..83846a62b64 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
@@ -32,6 +32,19 @@ private FunctionUtils() {
                throw new UnsupportedOperationException("This class should 
never be instantiated.");
        }
 
+       private static final Function<Object, Void> NULL_FN = ignored -> null;
+
+       /**
+        * Function which returns {@code null} (type: Void).
+        *
+        * @param <T> input type
+        * @return Function which returns {@code null}.
+        */
+       @SuppressWarnings("unchecked")
+       public static <T> Function<T, Void> nullFn() {
+               return (Function<T, Void>) NULL_FN;
+       }
+
        /**
         * Convert at {@link FunctionWithException} into a {@link Function}.
         *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5279e502a93..a1da2131e48 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -63,10 +63,11 @@
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.CheckedSupplier;
 import org.apache.flink.util.function.FunctionUtils;
-import org.apache.flink.util.function.ThrowingConsumer;
-import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -110,7 +111,7 @@
 
        private final FatalErrorHandler fatalErrorHandler;
 
-       private final Map<JobID, JobManagerRunner> jobManagerRunners;
+       private final Map<JobID, CompletableFuture<JobManagerRunner>> 
jobManagerRunnerFutures;
 
        private final LeaderElectionService leaderElectionService;
 
@@ -166,7 +167,7 @@ public Dispatcher(
 
                this.runningJobsRegistry = 
highAvailabilityServices.getRunningJobsRegistry();
 
-               jobManagerRunners = new HashMap<>(16);
+               jobManagerRunnerFutures = new HashMap<>(16);
 
                leaderElectionService = 
highAvailabilityServices.getDispatcherLeaderElectionService();
 
@@ -248,7 +249,7 @@ public void start() throws Exception {
                        return FutureUtils.completedExceptionally(new 
FlinkException(String.format("Failed to retrieve job scheduling status for job 
%s.", jobId), e));
                }
 
-               if (jobSchedulingStatus == 
RunningJobsRegistry.JobSchedulingStatus.DONE || 
jobManagerRunners.containsKey(jobId)) {
+               if (jobSchedulingStatus == 
RunningJobsRegistry.JobSchedulingStatus.DONE || 
jobManagerRunnerFutures.containsKey(jobId)) {
                        return FutureUtils.completedExceptionally(
                                new JobSubmissionException(jobId, 
String.format("Job has already been submitted and is in state %s.", 
jobSchedulingStatus)));
                } else {
@@ -257,58 +258,72 @@ public void start() throws Exception {
 
                        return persistAndRunFuture.exceptionally(
                                (Throwable throwable) -> {
+                                       final Throwable strippedThrowable = 
ExceptionUtils.stripCompletionException(throwable);
+                                       log.error("Failed to submit job {}.", 
jobId, strippedThrowable);
                                        throw new CompletionException(
-                                               new 
JobSubmissionException(jobId, "Failed to submit job.", throwable));
+                                               new 
JobSubmissionException(jobId, "Failed to submit job.", strippedThrowable));
                                });
                }
        }
 
-       private void persistAndRunJob(JobGraph jobGraph) throws Exception {
+       private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) 
throws Exception {
                submittedJobGraphStore.putJobGraph(new 
SubmittedJobGraph(jobGraph, null));
 
-               try {
-                       runJob(jobGraph);
-               } catch (Exception e) {
-                       try {
+               final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
+
+               return 
runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, 
Throwable throwable) -> {
+                       if (throwable != null) {
                                
submittedJobGraphStore.removeJobGraph(jobGraph.getJobID());
-                       } catch (Exception ie) {
-                               e.addSuppressed(ie);
                        }
-
-                       throw e;
-               }
+               }));
        }
 
-       private void runJob(JobGraph jobGraph) throws Exception {
-               
Preconditions.checkState(!jobManagerRunners.containsKey(jobGraph.getJobID()));
+       private CompletableFuture<Void> runJob(JobGraph jobGraph) {
+               
Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
 
-               final JobManagerRunner jobManagerRunner = 
createJobManagerRunner(jobGraph);
+               final CompletableFuture<JobManagerRunner> 
jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
 
-               jobManagerRunner.start();
+               jobManagerRunnerFutures.put(jobGraph.getJobID(), 
jobManagerRunnerFuture);
 
-               jobManagerRunners.put(jobGraph.getJobID(), jobManagerRunner);
+               return jobManagerRunnerFuture
+                       .thenApply(FunctionUtils.nullFn())
+                       .whenCompleteAsync(
+                               (ignored, throwable) -> {
+                                       if (throwable != null) {
+                                               
jobManagerRunnerFutures.remove(jobGraph.getJobID());
+                                       }
+                               },
+                               getMainThreadExecutor());
        }
 
-       private JobManagerRunner createJobManagerRunner(JobGraph jobGraph) 
throws Exception {
-               final JobID jobId = jobGraph.getJobID();
-
-               final JobManagerRunner jobManagerRunner = 
jobManagerRunnerFactory.createJobManagerRunner(
-                       ResourceID.generate(),
-                       jobGraph,
-                       configuration,
-                       getRpcService(),
-                       highAvailabilityServices,
-                       heartbeatServices,
-                       blobServer,
-                       jobManagerSharedServices,
-                       new 
DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
-                       fatalErrorHandler);
+       private CompletableFuture<JobManagerRunner> 
createJobManagerRunner(JobGraph jobGraph) {
+               final RpcService rpcService = getRpcService();
+
+               final CompletableFuture<JobManagerRunner> 
jobManagerRunnerFuture = CompletableFuture.supplyAsync(
+                       CheckedSupplier.unchecked(() ->
+                               jobManagerRunnerFactory.createJobManagerRunner(
+                                       ResourceID.generate(),
+                                       jobGraph,
+                                       configuration,
+                                       rpcService,
+                                       highAvailabilityServices,
+                                       heartbeatServices,
+                                       blobServer,
+                                       jobManagerSharedServices,
+                                       new 
DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
+                                       fatalErrorHandler)),
+                       rpcService.getExecutor());
+
+               return 
jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
+       }
 
+       private JobManagerRunner startJobManagerRunner(JobManagerRunner 
jobManagerRunner) throws Exception {
+               final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
                jobManagerRunner.getResultFuture().whenCompleteAsync(
                        (ArchivedExecutionGraph archivedExecutionGraph, 
Throwable throwable) -> {
                                // check if we are still the active 
JobManagerRunner by checking the identity
                                //noinspection ObjectEquality
-                               if (jobManagerRunner == 
jobManagerRunners.get(jobId)) {
+                               if (jobManagerRunner == 
jobManagerRunnerFutures.get(jobId).getNow(null)) {
                                        if (archivedExecutionGraph != null) {
                                                
jobReachedGloballyTerminalState(archivedExecutionGraph);
                                        } else {
@@ -325,13 +340,15 @@ private JobManagerRunner createJobManagerRunner(JobGraph 
jobGraph) throws Except
                                }
                        }, getMainThreadExecutor());
 
+               jobManagerRunner.start();
+
                return jobManagerRunner;
        }
 
        @Override
        public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
                return CompletableFuture.completedFuture(
-                       Collections.unmodifiableSet(new 
HashSet<>(jobManagerRunners.keySet())));
+                       Collections.unmodifiableSet(new 
HashSet<>(jobManagerRunnerFutures.keySet())));
        }
 
        @Override
@@ -481,9 +498,9 @@ private JobManagerRunner createJobManagerRunner(JobGraph 
jobGraph) throws Except
 
        @Override
        public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time 
timeout) {
-               final JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
+               final CompletableFuture<JobManagerRunner> 
jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
 
-               if (jobManagerRunner == null) {
+               if (jobManagerRunnerFuture == null) {
                        final ArchivedExecutionGraph archivedExecutionGraph = 
archivedExecutionGraphStore.get(jobId);
 
                        if (archivedExecutionGraph == null) {
@@ -492,7 +509,7 @@ private JobManagerRunner createJobManagerRunner(JobGraph 
jobGraph) throws Except
                                return 
CompletableFuture.completedFuture(JobResult.createFrom(archivedExecutionGraph));
                        }
                } else {
-                       return 
jobManagerRunner.getResultFuture().thenApply(JobResult::createFrom);
+                       return 
jobManagerRunnerFuture.thenCompose(JobManagerRunner::getResultFuture).thenApply(JobResult::createFrom);
                }
        }
 
@@ -566,11 +583,11 @@ private void 
registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableF
        }
 
        private CompletableFuture<Void> removeJob(JobID jobId, boolean 
cleanupHA) {
-               JobManagerRunner jobManagerRunner = 
jobManagerRunners.remove(jobId);
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = 
jobManagerRunnerFutures.remove(jobId);
 
                final CompletableFuture<Void> jobManagerRunnerTerminationFuture;
-               if (jobManagerRunner != null) {
-                       jobManagerRunnerTerminationFuture = 
jobManagerRunner.closeAsync();
+               if (jobManagerRunnerFuture != null) {
+                       jobManagerRunnerTerminationFuture = 
jobManagerRunnerFuture.thenCompose(JobManagerRunner::closeAsync);
                } else {
                        jobManagerRunnerTerminationFuture = 
CompletableFuture.completedFuture(null);
                }
@@ -616,7 +633,7 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA) 
{
        private void terminateJobManagerRunners() {
                log.info("Stopping all currently running jobs of dispatcher 
{}.", getAddress());
 
-               final HashSet<JobID> jobsToRemove = new 
HashSet<>(jobManagerRunners.keySet());
+               final HashSet<JobID> jobsToRemove = new 
HashSet<>(jobManagerRunnerFutures.keySet());
 
                for (JobID jobId : jobsToRemove) {
                        removeJobAndRegisterTerminationFuture(jobId, false);
@@ -739,16 +756,16 @@ private void jobMasterFailed(JobID jobId, Throwable 
cause) {
        }
 
        private CompletableFuture<JobMasterGateway> 
getJobMasterGatewayFuture(JobID jobId) {
-               final JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
+               final CompletableFuture<JobManagerRunner> 
jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
 
-               if (jobManagerRunner == null) {
+               if (jobManagerRunnerFuture == null) {
                        return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
                } else {
-                       final CompletableFuture<JobMasterGateway> 
leaderGatewayFuture = jobManagerRunner.getLeaderGatewayFuture();
+                       final CompletableFuture<JobMasterGateway> 
leaderGatewayFuture = 
jobManagerRunnerFuture.thenCompose(JobManagerRunner::getLeaderGatewayFuture);
                        return leaderGatewayFuture.thenApplyAsync(
                                (JobMasterGateway jobMasterGateway) -> {
                                        // check whether the retrieved 
JobMasterGateway belongs still to a running JobMaster
-                                       if 
(jobManagerRunners.containsKey(jobId)) {
+                                       if 
(jobManagerRunnerFutures.containsKey(jobId)) {
                                                return jobMasterGateway;
                                        } else {
                                                throw new 
CompletionException(new FlinkJobNotFoundException(jobId));
@@ -764,12 +781,12 @@ private void jobMasterFailed(JobID jobId, Throwable 
cause) {
 
        @Nonnull
        private <T> List<CompletableFuture<Optional<T>>> 
queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>> 
queryFunction) {
-               final int numberJobsRunning = jobManagerRunners.size();
+               final int numberJobsRunning = jobManagerRunnerFutures.size();
 
                ArrayList<CompletableFuture<Optional<T>>> 
optionalJobInformation = new ArrayList<>(
                        numberJobsRunning);
 
-               for (JobID jobId : jobManagerRunners.keySet()) {
+               for (JobID jobId : jobManagerRunnerFutures.keySet()) {
                        final CompletableFuture<JobMasterGateway> 
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
 
                        final CompletableFuture<Optional<T>> optionalRequest = 
jobMasterGatewayFuture
@@ -836,10 +853,10 @@ public void grantLeadership(final UUID 
newLeaderSessionID) {
                        log.debug("Dispatcher {} accepted leadership with 
fencing token {}. Start recovered jobs.", getAddress(), dispatcherId);
                        setNewFencingToken(dispatcherId);
 
-                       Collection<CompletableFuture<Void>> runFutures = new 
ArrayList<>(recoveredJobs.size());
+                       Collection<CompletableFuture<?>> runFutures = new 
ArrayList<>(recoveredJobs.size());
 
                        for (JobGraph recoveredJob : recoveredJobs) {
-                               final CompletableFuture<Void> runFuture = 
waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, 
this::runJob);
+                               final CompletableFuture<?> runFuture = 
waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, 
this::runJob);
                                runFutures.add(runFuture);
                        }
 
@@ -850,7 +867,7 @@ public void grantLeadership(final UUID newLeaderSessionID) {
                }
        }
 
-       private CompletableFuture<Void> waitForTerminatingJobManager(JobID 
jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
+       private CompletableFuture<Void> waitForTerminatingJobManager(JobID 
jobId, JobGraph jobGraph, FunctionWithException<JobGraph, 
CompletableFuture<Void>, ?> action) {
                final CompletableFuture<Void> jobManagerTerminationFuture = 
getJobTerminationFuture(jobId)
                        .exceptionally((Throwable throwable) -> {
                                throw new CompletionException(
@@ -858,16 +875,16 @@ public void grantLeadership(final UUID 
newLeaderSessionID) {
                                                String.format("Termination of 
previous JobManager for job %s failed. Cannot submit job under the same job 
id.", jobId),
                                                throwable)); });
 
-               return jobManagerTerminationFuture.thenRunAsync(
-                       ThrowingRunnable.unchecked(() -> {
+               return jobManagerTerminationFuture.thenComposeAsync(
+                       FunctionUtils.uncheckedFunction((ignored) -> {
                                jobManagerTerminationFutures.remove(jobId);
-                               action.accept(jobGraph);
+                               return action.apply(jobGraph);
                        }),
                        getMainThreadExecutor());
        }
 
        CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
-               if (jobManagerRunners.containsKey(jobId)) {
+               if (jobManagerRunnerFutures.containsKey(jobId)) {
                        return FutureUtils.completedExceptionally(new 
DispatcherException(String.format("Job with job id %s is still running.", 
jobId)));
                } else {
                        return jobManagerTerminationFutures.getOrDefault(jobId, 
CompletableFuture.completedFuture(null));
@@ -923,7 +940,7 @@ public void handleError(final Exception exception) {
        public void onAddedJobGraph(final JobID jobId) {
                runAsync(
                        () -> {
-                               if (!jobManagerRunners.containsKey(jobId)) {
+                               if 
(!jobManagerRunnerFutures.containsKey(jobId)) {
                                        // IMPORTANT: onAddedJobGraph can 
generate false positives and, thus, we must expect that
                                        // the specified job is already removed 
from the SubmittedJobGraphStore. In this case,
                                        // SubmittedJobGraphStore.recoverJob 
returns null.
@@ -962,7 +979,7 @@ public void onAddedJobGraph(final JobID jobId) {
        private CompletableFuture<Boolean> tryRunRecoveredJobGraph(JobGraph 
jobGraph, DispatcherId dispatcherId) throws Exception {
                if (leaderElectionService.hasLeadership(dispatcherId.toUUID())) 
{
                        final JobID jobId = jobGraph.getJobID();
-                       if (jobManagerRunners.containsKey(jobId)) {
+                       if (jobManagerRunnerFutures.containsKey(jobId)) {
                                // we must not release the job graph lock since 
it can only be locked once and
                                // is currently being executed. Once we support 
multiple locks, we must release
                                // the JobGraph here
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 335199a2807..c825451d946 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -163,7 +163,7 @@ private HATestingDispatcher 
createHADispatcher(TestingHighAvailabilityServices h
                        
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
                        null,
                        new MemoryArchivedExecutionGraphStore(),
-                       new TestingJobManagerRunnerFactory(new 
CompletableFuture<>(), new CompletableFuture<>()),
+                       new TestingJobManagerRunnerFactory(new 
CompletableFuture<>(), new CompletableFuture<>(), 
CompletableFuture.completedFuture(null)),
                        testingFatalErrorHandler,
                        fencingTokens);
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index d09ab8df728..5c4ac34a314 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -29,26 +29,19 @@
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.TestingBlobStore;
 import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
-import 
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -79,8 +72,6 @@
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests the resource cleanup by the {@link Dispatcher}.
@@ -188,7 +179,7 @@ public void setup() throws Exception {
                        
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
                        null,
                        new MemoryArchivedExecutionGraphStore(),
-                       new TestingJobManagerRunnerFactory(resultFuture, 
terminationFuture),
+                       new TestingJobManagerRunnerFactory(new 
CompletableFuture<>(), resultFuture, terminationFuture),
                        fatalErrorHandler);
 
                dispatcher.start();
@@ -447,28 +438,6 @@ public void 
testHABlobsAreRemovedIfHAJobGraphRemovalSucceeds() throws Exception
                assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
        }
 
-       private static final class TestingJobManagerRunnerFactory implements 
Dispatcher.JobManagerRunnerFactory {
-
-               private final CompletableFuture<ArchivedExecutionGraph> 
resultFuture;
-
-               private final CompletableFuture<Void> terminationFuture;
-
-               private 
TestingJobManagerRunnerFactory(CompletableFuture<ArchivedExecutionGraph> 
resultFuture, CompletableFuture<Void> terminationFuture) {
-                       this.resultFuture = resultFuture;
-                       this.terminationFuture = terminationFuture;
-               }
-
-               @Override
-               public JobManagerRunner createJobManagerRunner(ResourceID 
resourceId, JobGraph jobGraph, Configuration configuration, RpcService 
rpcService, HighAvailabilityServices highAvailabilityServices, 
HeartbeatServices heartbeatServices, BlobServer blobServer, 
JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory 
jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) {
-                       final JobManagerRunner jobManagerRunnerMock = 
mock(JobManagerRunner.class);
-
-                       
when(jobManagerRunnerMock.getResultFuture()).thenReturn(resultFuture);
-                       
when(jobManagerRunnerMock.closeAsync()).thenReturn(terminationFuture);
-
-                       return jobManagerRunnerMock;
-               }
-       }
-
        private static final class TestingBlobServer extends BlobServer {
 
                private final CompletableFuture<JobID> cleanupJobFuture;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 1af10b8c598..24426761b2e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
@@ -64,6 +65,7 @@
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.junit.After;
 import org.junit.AfterClass;
@@ -86,7 +88,9 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -144,6 +148,10 @@
        /** Instance under test. */
        private TestingDispatcher dispatcher;
 
+       private TestingHighAvailabilityServices haServices;
+
+       private HeartbeatServices heartbeatServices;
+
        @BeforeClass
        public static void setupClass() {
                rpcService = new TestingRpcService();
@@ -166,13 +174,13 @@ public void setUp() throws Exception {
                jobGraph.setAllowQueuedScheduling(true);
 
                fatalErrorHandler = new TestingFatalErrorHandler();
-               final HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 10000L);
+               heartbeatServices = new HeartbeatServices(1000L, 10000L);
                submittedJobGraphStore = new FaultySubmittedJobGraphStore();
 
                dispatcherLeaderElectionService = new 
TestingLeaderElectionService();
                jobMasterLeaderElectionService = new 
TestingLeaderElectionService();
 
-               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+               haServices = new TestingHighAvailabilityServices();
                
haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
                haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
                haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, 
jobMasterLeaderElectionService);
@@ -188,14 +196,18 @@ public void setUp() throws Exception {
 
                createdJobManagerRunnerLatch = new CountDownLatch(2);
                blobServer = new BlobServer(configuration, new VoidBlobStore());
+       }
 
-               dispatcher = createDispatcher(heartbeatServices, haServices);
-
+       @Nonnull
+       private TestingDispatcher createAndStartDispatcher(HeartbeatServices 
heartbeatServices, TestingHighAvailabilityServices haServices, 
Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+               final TestingDispatcher dispatcher = 
createDispatcher(heartbeatServices, haServices, jobManagerRunnerFactory);
                dispatcher.start();
+
+               return dispatcher;
        }
 
        @Nonnull
-       private TestingDispatcher createDispatcher(HeartbeatServices 
heartbeatServices, TestingHighAvailabilityServices haServices) throws Exception 
{
+       private TestingDispatcher createDispatcher(HeartbeatServices 
heartbeatServices, TestingHighAvailabilityServices haServices, 
Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
                return new TestingDispatcher(
                        rpcService,
                        Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
@@ -207,7 +219,7 @@ private TestingDispatcher 
createDispatcher(HeartbeatServices heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
                        null,
                        new MemoryArchivedExecutionGraphStore(),
-                       new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch),
+                       jobManagerRunnerFactory,
                        fatalErrorHandler);
        }
 
@@ -216,7 +228,13 @@ public void tearDown() throws Exception {
                try {
                        fatalErrorHandler.rethrowError();
                } finally {
-                       RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT);
+                       if (dispatcher != null) {
+                               RpcUtils.terminateRpcEndpoint(dispatcher, 
TIMEOUT);
+                       }
+               }
+
+               if (haServices != null) {
+                       haServices.closeAndCleanupAllData();
                }
 
                if (blobServer != null) {
@@ -230,6 +248,8 @@ public void tearDown() throws Exception {
         */
        @Test
        public void testJobSubmission() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+
                CompletableFuture<UUID> leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
 
                // wait for the leader to be elected
@@ -251,6 +271,8 @@ public void testJobSubmission() throws Exception {
         */
        @Test
        public void testLeaderElection() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+
                CompletableFuture<Void> jobIdsFuture = new 
CompletableFuture<>();
                submittedJobGraphStore.setJobIdsFunction(
                        (Collection<JobID> jobIds) -> {
@@ -270,6 +292,8 @@ public void testLeaderElection() throws Exception {
         */
        @Test
        public void testSubmittedJobGraphListener() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+
                
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
                final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
@@ -294,6 +318,8 @@ public void testSubmittedJobGraphListener() throws 
Exception {
 
        @Test
        public void testOnAddedJobGraphRecoveryFailure() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+
                final FlinkException expectedFailure = new 
FlinkException("Expected failure");
                submittedJobGraphStore.setRecoveryFailure(expectedFailure);
 
@@ -313,6 +339,8 @@ public void testOnAddedJobGraphRecoveryFailure() throws 
Exception {
 
        @Test
        public void testOnAddedJobGraphWithFinishedJob() throws Throwable {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+
                
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
                submittedJobGraphStore.putJobGraph(new 
SubmittedJobGraph(jobGraph, null));
@@ -333,6 +361,8 @@ public void testOnAddedJobGraphWithFinishedJob() throws 
Throwable {
         */
        @Test
        public void testCacheJobExecutionResult() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+
                
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
                final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
@@ -358,6 +388,8 @@ public void testCacheJobExecutionResult() throws Exception {
 
        @Test
        public void testThrowExceptionIfJobExecutionResultNotFound() throws 
Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+
                
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
                final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
@@ -374,6 +406,8 @@ public void 
testThrowExceptionIfJobExecutionResultNotFound() throws Exception {
         */
        @Test
        public void testJobRecovery() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+
                final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
                // elect the initial dispatcher as the leader
@@ -413,6 +447,8 @@ public void testSavepointDisposal() throws Exception {
                final URI externalPointer = createTestingSavepoint();
                final Path savepointPath = Paths.get(externalPointer);
 
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+
                final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
                
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
@@ -447,7 +483,9 @@ private URI createTestingSavepoint() throws IOException, 
URISyntaxException {
         * to it. See FLINK-8887.
         */
        @Test
-       public void testWaitingForJobMasterLeadership() throws 
ExecutionException, InterruptedException {
+       public void testWaitingForJobMasterLeadership() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+
                final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
                
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
@@ -476,6 +514,8 @@ public void testWaitingForJobMasterLeadership() throws 
ExecutionException, Inter
         */
        @Test
        public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+
                final FlinkException testException = new FlinkException("Test 
exception");
                submittedJobGraphStore.setJobIdsFunction(
                        (Collection<JobID> jobIds) -> {
@@ -500,6 +540,8 @@ public void testFatalErrorAfterJobIdRecoveryFailure() 
throws Exception {
        public void testFatalErrorAfterJobRecoveryFailure() throws Exception {
                final FlinkException testException = new FlinkException("Test 
exception");
 
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+
                final SubmittedJobGraph submittedJobGraph = new 
SubmittedJobGraph(jobGraph, null);
                submittedJobGraphStore.putJobGraph(submittedJobGraph);
 
@@ -526,6 +568,8 @@ public void testFatalErrorAfterJobRecoveryFailure() throws 
Exception {
        public void testJobSubmissionErrorAfterJobRecovery() throws Exception {
                final FlinkException testException = new FlinkException("Test 
exception");
 
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+
                final JobGraph failingJobGraph = 
createFailingJobGraph(testException);
 
                final SubmittedJobGraph submittedJobGraph = new 
SubmittedJobGraph(failingJobGraph, null);
@@ -540,6 +584,102 @@ public void testJobSubmissionErrorAfterJobRecovery() 
throws Exception {
                fatalErrorHandler.clearError();
        }
 
+       /**
+        * Tests that a blocking {@link JobManagerRunner} creation, e.g. due to 
blocking FileSystem access,
+        * does not block the {@link Dispatcher}.
+        *
+        * <p>See FLINK-10314
+        */
+       @Test
+       public void testBlockingJobManagerRunner() throws Exception {
+               final OneShotLatch jobManagerRunnerCreationLatch = new 
OneShotLatch();
+               dispatcher = createAndStartDispatcher(
+                       heartbeatServices,
+                       haServices,
+                       new 
BlockingJobManagerRunnerFactory(jobManagerRunnerCreationLatch::await));
+
+               
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+               final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               final CompletableFuture<Acknowledge> submissionFuture = 
dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+
+               assertThat(submissionFuture.isDone(), is(false));
+
+               final CompletableFuture<Collection<String>> 
metricQueryServicePathsFuture = 
dispatcherGateway.requestMetricQueryServicePaths(Time.seconds(5L));
+
+               assertThat(metricQueryServicePathsFuture.get(), is(empty()));
+
+               assertThat(submissionFuture.isDone(), is(false));
+
+               jobManagerRunnerCreationLatch.trigger();
+
+               submissionFuture.get();
+       }
+
+       /**
+        * Tests that a failing {@link JobManagerRunner} will be properly 
cleaned up.
+        */
+       @Test
+       public void testFailingJobManagerRunnerCleanup() throws Exception {
+               final FlinkException testException = new FlinkException("Test 
exception.");
+               final ArrayBlockingQueue<Optional<Exception>> queue = new 
ArrayBlockingQueue<>(2);
+
+               dispatcher = createAndStartDispatcher(
+                       heartbeatServices,
+                       haServices,
+                       new BlockingJobManagerRunnerFactory(() -> {
+                               final Optional<Exception> take = queue.take();
+                               final Exception exception = take.orElse(null);
+
+                               if (exception != null) {
+                                       throw exception;
+                               }
+                       }));
+
+               
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+               final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               CompletableFuture<Acknowledge> submissionFuture = 
dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+
+               assertThat(submissionFuture.isDone(), is(false));
+
+               queue.offer(Optional.of(testException));
+
+               try {
+                       submissionFuture.get();
+                       fail("Should fail because we could not instantiate the 
JobManagerRunner.");
+               } catch (Exception e) {
+                       assertThat(ExceptionUtils.findThrowable(e, t -> 
t.equals(testException)).isPresent(), is(true));
+               }
+
+               submissionFuture = dispatcherGateway.submitJob(jobGraph, 
TIMEOUT);
+
+               queue.offer(Optional.empty());
+
+               submissionFuture.get();
+       }
+
+       private final class BlockingJobManagerRunnerFactory extends 
TestingJobManagerRunnerFactory {
+
+               @Nonnull
+               private final ThrowingRunnable<Exception> 
jobManagerRunnerCreationLatch;
+
+               BlockingJobManagerRunnerFactory(@Nonnull 
ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) {
+                       super(new CompletableFuture<>(), new 
CompletableFuture<>(), CompletableFuture.completedFuture(null));
+
+                       this.jobManagerRunnerCreationLatch = 
jobManagerRunnerCreationLatch;
+               }
+
+               @Override
+               public JobManagerRunner createJobManagerRunner(ResourceID 
resourceId, JobGraph jobGraph, Configuration configuration, RpcService 
rpcService, HighAvailabilityServices highAvailabilityServices, 
HeartbeatServices heartbeatServices, BlobServer blobServer, 
JobManagerSharedServices jobManagerSharedServices, 
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, 
FatalErrorHandler fatalErrorHandler) throws Exception {
+                       jobManagerRunnerCreationLatch.run();
+
+                       return super.createJobManagerRunner(resourceId, 
jobGraph, configuration, rpcService, highAvailabilityServices, 
heartbeatServices, blobServer, jobManagerSharedServices, 
jobManagerJobMetricGroupFactory, fatalErrorHandler);
+               }
+       }
+
        private void electDispatcher() {
                UUID expectedLeaderSessionId = UUID.randomUUID();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 39c06f36ecf..eed23ed7179 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -125,7 +125,7 @@ public void setup() throws Exception {
                jobGraphFuture = new CompletableFuture<>();
                resultFuture = new CompletableFuture<>();
 
-               testingJobManagerRunnerFactory = new 
TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture);
+               testingJobManagerRunnerFactory = new 
TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture, 
CompletableFuture.completedFuture(null));
        }
 
        @After
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index f9be8883589..992f08713c6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -40,14 +40,16 @@
  * {@link 
org.apache.flink.runtime.dispatcher.Dispatcher.JobManagerRunnerFactory} 
implementation for
  * testing purposes.
  */
-final class TestingJobManagerRunnerFactory implements 
Dispatcher.JobManagerRunnerFactory {
+class TestingJobManagerRunnerFactory implements 
Dispatcher.JobManagerRunnerFactory {
 
        private final CompletableFuture<JobGraph> jobGraphFuture;
        private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
+       private final CompletableFuture<Void> terminationFuture;
 
-       TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> 
jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture) {
+       TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> 
jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture, 
CompletableFuture<Void> terminationFuture) {
                this.jobGraphFuture = jobGraphFuture;
                this.resultFuture = resultFuture;
+               this.terminationFuture = terminationFuture;
        }
 
        @Override
@@ -66,7 +68,8 @@ public JobManagerRunner createJobManagerRunner(
 
                final JobManagerRunner mock = mock(JobManagerRunner.class);
                when(mock.getResultFuture()).thenReturn(resultFuture);
-               
when(mock.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+               when(mock.closeAsync()).thenReturn(terminationFuture);
+               when(mock.getJobGraph()).thenReturn(jobGraph);
 
                return mock;
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
index b5662c064e8..9c23f9d7aa2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -154,7 +154,7 @@ public void testSubmittedJobGraphRelease() throws Exception 
{
 
                        final TestingDispatcher dispatcher = createDispatcher(
                                testingHighAvailabilityServices,
-                               new TestingJobManagerRunnerFactory(new 
CompletableFuture<>(), new CompletableFuture<>()));
+                               new TestingJobManagerRunnerFactory(new 
CompletableFuture<>(), new CompletableFuture<>(), 
CompletableFuture.completedFuture(null)));
 
                        dispatcher.start();
 
@@ -223,11 +223,11 @@ public void testStandbyDispatcherJobExecution() throws 
Exception {
                        final CompletableFuture<ArchivedExecutionGraph> 
resultFuture = new CompletableFuture<>();
                        final TestingDispatcher dispatcher1 = createDispatcher(
                                haServices1,
-                               new 
TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture));
+                               new 
TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture, 
CompletableFuture.completedFuture(null)));
 
                        final TestingDispatcher dispatcher2 = createDispatcher(
                                haServices2,
-                               new TestingJobManagerRunnerFactory(new 
CompletableFuture<>(), new CompletableFuture<>()));
+                               new TestingJobManagerRunnerFactory(new 
CompletableFuture<>(), new CompletableFuture<>(), 
CompletableFuture.completedFuture(null)));
 
                        try {
                                dispatcher1.start();
@@ -285,11 +285,11 @@ public void testStandbyDispatcherJobRecovery() throws 
Exception {
                                final CompletableFuture<JobGraph> 
jobGraphFuture1 = new CompletableFuture<>();
                                dispatcher1 = createDispatcher(
                                        haServices,
-                                       new 
TestingJobManagerRunnerFactory(jobGraphFuture1, new CompletableFuture<>()));
+                                       new 
TestingJobManagerRunnerFactory(jobGraphFuture1, new CompletableFuture<>(), 
CompletableFuture.completedFuture(null)));
                                final CompletableFuture<JobGraph> 
jobGraphFuture2 = new CompletableFuture<>();
                                dispatcher2 = createDispatcher(
                                        haServices,
-                                       new 
TestingJobManagerRunnerFactory(jobGraphFuture2, new CompletableFuture<>()));
+                                       new 
TestingJobManagerRunnerFactory(jobGraphFuture2, new CompletableFuture<>(), 
CompletableFuture.completedFuture(null)));
 
                                dispatcher1.start();
                                dispatcher2.start();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index c38ea5d99c0..950a4e13674 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -298,7 +298,7 @@ public void disconnectJobManager(JobID jobId, Exception 
cause) {
 
        @Override
        public CompletableFuture<ResourceOverview> requestResourceOverview(Time 
timeout) {
-               return FutureUtils.completedExceptionally(new 
UnsupportedOperationException("Not yet implemented"));
+               return CompletableFuture.completedFuture(new 
ResourceOverview(1, 1, 1));
        }
 
        @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to