tillrohrmann commented on a change in pull request #13217:
URL: https://github.com/apache/flink/pull/13217#discussion_r478403026



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
##########
@@ -351,4 +352,53 @@ public static ArchivedExecutionGraph 
createFrom(ExecutionGraph executionGraph) {
                        executionGraph.getCheckpointStatsSnapshot(),
                        executionGraph.getStateBackendName().orElse(null));
        }
+
+       /**
+        * Create a sparse ArchivedExecutionGraph for a job while it is still 
initializing.
+        * Most fields will be empty, only job status and error-related fields 
are set.
+        */
+       public static ArchivedExecutionGraph createFromInitializingJob(
+               JobID jobId,
+               String jobName,
+               JobStatus jobStatus,
+               @Nullable Throwable throwable,
+               long initializationTimestamp) {
+
+               long failureTime = System.currentTimeMillis();

Review comment:
       Move this into the if branch.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static junit.framework.Assert.fail;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Test for the {@link DispatcherJob} class.
+ */
+public class DispatcherJobTest extends TestLogger {
+
+       protected final Logger log = LoggerFactory.getLogger(getClass());
+
+       private static final Time TIMEOUT = Time.seconds(10L);
+       private static final JobID TEST_JOB_ID = new JobID();
+
+       @Test
+       public void testStatusWhenInitializing() throws
+               Exception {

Review comment:
       nit: this is just a personal question of taste: I think I would leave 
`Exception` on the same line as `throws`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static junit.framework.Assert.fail;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Test for the {@link DispatcherJob} class.
+ */
+public class DispatcherJobTest extends TestLogger {
+
+       protected final Logger log = LoggerFactory.getLogger(getClass());
+
+       private static final Time TIMEOUT = Time.seconds(10L);
+       private static final JobID TEST_JOB_ID = new JobID();
+
+       @Test
+       public void testStatusWhenInitializing() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+               Assert.assertThat(dispatcherJob.getResultFuture().isDone(), 
is(false));
+               assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+       }
+
+       @Test
+       public void testStatusWhenRunning() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               // finish initialization
+               testContext.setRunning();
+
+               assertJobStatus(dispatcherJob, JobStatus.RUNNING);
+
+               // result future not done
+               Assert.assertThat(dispatcherJob.getResultFuture().isDone(), 
is(false));
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+       }
+
+       @Test
+       public void testStatusWhenJobFinished() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               // finish job
+               testContext.setRunning();
+               testContext.finishJob();
+
+               assertJobStatus(dispatcherJob, JobStatus.FINISHED);
+
+               // assert result future done
+               ArchivedExecutionGraph aeg = 
dispatcherJob.getResultFuture().get();
+
+               Assert.assertThat(aeg.getState(), is(JobStatus.FINISHED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileInitializing() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+               assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+
+               CompletableFuture<Acknowledge> cancelFuture = 
dispatcherJob.cancel(
+                       TIMEOUT);
+
+               Assert.assertThat(cancelFuture.isDone(), is(false));
+               Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+
+               testContext.setRunning();
+               testContext.finishCancellation();
+
+               // assert that cancel future completes
+               cancelFuture.get();
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+               // assert that the result future completes
+               
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(), 
is(JobStatus.CANCELED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileRunning() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               testContext.setRunning();
+               CompletableFuture<Acknowledge> cancelFuture = 
dispatcherJob.cancel(TIMEOUT);
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+               testContext.finishCancellation();
+
+               cancelFuture.get();
+               assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+               
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(), 
is(JobStatus.CANCELED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileFailed() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               RuntimeException exception = new RuntimeException("Artificial 
failure in runner initialization");
+               testContext.failInitialization(exception);
+
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+               CommonTestUtils.assertThrows("Artificial failure", 
ExecutionException.class, () -> dispatcherJob.cancel(TIMEOUT).get());
+
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+       }
+
+       @Test
+       public void testErrorWhileInitializing() throws Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.dispatcherJob;
+
+               // now fail
+               RuntimeException exception = new RuntimeException("Artificial 
failure in runner initialization");
+               testContext.failInitialization(exception);
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+               ArchivedExecutionGraph aeg = 
dispatcherJob.getResultFuture().get();
+               
Assert.assertThat(aeg.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader()),
 is(exception));
+
+               Assert.assertTrue(dispatcherJob.closeAsync().isDone() && 
dispatcherJob.closeAsync().isCompletedExceptionally());
+       }
+
+       @Test
+       public void testCloseWhileInitializingSuccessfully() throws Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               CompletableFuture<Void> closeFuture = 
dispatcherJob.closeAsync();
+               Assert.assertThat(closeFuture.isDone(), is(false));
+
+               // set job running, so that we can cancel it
+               testContext.setRunning();
+
+               // assert future completes now
+               closeFuture.get();
+
+               // ensure the result future is complete (how it completes is up 
to the JobManager)
+               CompletableFuture<ArchivedExecutionGraph> resultFuture = 
dispatcherJob.getResultFuture();
+               CommonTestUtils.assertThrows("has not been finished", 
ExecutionException.class,
+                       resultFuture::get);
+       }
+
+       @Test
+       public void testCloseWhileInitializingErroneously() throws Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               CompletableFuture<Void> closeFuture = 
dispatcherJob.closeAsync();
+               Assert.assertThat(closeFuture.isDone(), is(false));
+
+               testContext.failInitialization(new RuntimeException("fail"));
+
+               // assert future completes now
+               CommonTestUtils.assertThrows("fail", ExecutionException.class,
+                       closeFuture::get);
+
+               // ensure the result future is complete
+               
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(), 
is(JobStatus.FAILED));
+       }
+
+       @Test
+       public void testCloseWhileInitializingErroneouslyForRecovery() {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.RECOVERY);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               CompletableFuture<Void> closeFuture = 
dispatcherJob.closeAsync();
+
+               testContext.failInitialization(new RuntimeException("fail"));
+
+               CommonTestUtils.assertThrows("fail", ExecutionException.class,
+                       closeFuture::get);
+               // ensure the result future is completing exceptionally when 
using RECOVERY execution
+               CommonTestUtils.assertThrows("fail", ExecutionException.class,
+                       () -> dispatcherJob.getResultFuture().get());
+       }
+
+       @Test
+       public void testCloseWhileRunning() throws Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               // complete JobManager runner future to indicate to the 
DispatcherJob that the Runner has been initialized
+               testContext.setRunning();
+
+               CompletableFuture<Void> closeFuture = 
dispatcherJob.closeAsync();
+
+               closeFuture.get();
+
+               // result future should complete exceptionally.
+               CompletableFuture<ArchivedExecutionGraph> resultFuture = 
dispatcherJob.getResultFuture();
+               CommonTestUtils.assertThrows("has not been finished", 
ExecutionException.class,
+                       resultFuture::get);
+       }
+
+       @Test(expected = IllegalStateException.class)
+       public void testUnavailableJobMasterGateway() {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+               dispatcherJob.getJobMasterGateway();
+       }
+
+       private TestContext createTestContext(Dispatcher.ExecutionType type) {
+               final JobVertex testVertex = new JobVertex("testVertex");
+               testVertex.setInvokableClass(NoOpInvokable.class);
+
+               JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob", 
testVertex);
+               CompletableFuture<JobManagerRunner> 
jobManagerRunnerCompletableFuture = new CompletableFuture<>();
+               DispatcherJob dispatcherJob = 
DispatcherJob.createFor(jobManagerRunnerCompletableFuture,
+                       jobGraph.getJobID(), jobGraph.getName(), type);
+
+               return new TestContext(
+                       jobManagerRunnerCompletableFuture,
+                       dispatcherJob,
+                       jobGraph);
+       }
+
+       private static class TestContext {
+               private final CompletableFuture<JobManagerRunner> 
jobManagerRunnerCompletableFuture;
+               private final DispatcherJob dispatcherJob;
+               private final JobGraph jobGraph;
+               private final TestingJobMasterGateway 
mockRunningJobMasterGateway;
+               private final CompletableFuture<Acknowledge> cancellationFuture;
+
+               private JobStatus internalJobStatus = JobStatus.INITIALIZING;
+
+               public TestContext(
+                       CompletableFuture<JobManagerRunner> 
jobManagerRunnerCompletableFuture,
+                       DispatcherJob dispatcherJob,
+                       JobGraph jobGraph) {
+                       this.jobManagerRunnerCompletableFuture = 
jobManagerRunnerCompletableFuture;
+                       this.dispatcherJob = dispatcherJob;
+                       this.jobGraph = jobGraph;
+
+                       this.cancellationFuture = new CompletableFuture<>();
+                       this.mockRunningJobMasterGateway = new 
TestingJobMasterGatewayBuilder()
+                               .setRequestJobSupplier(() -> 
CompletableFuture.completedFuture(ArchivedExecutionGraph.createFromInitializingJob(getJobID(),
 "test", internalJobStatus, null, 1337)))
+                               .setRequestJobDetailsSupplier(() -> {
+                                       JobDetails jobDetails = new 
JobDetails(getJobID(), "", 0, 0, 0, internalJobStatus, 0,
+                                               new int[]{0, 0, 0, 0, 0, 0, 0, 
0, 0}, 0);
+                                       return 
CompletableFuture.completedFuture(jobDetails);
+                               })
+                               // once JobManagerRunner is initialized, 
complete result future with CANCELLED AEG and ack cancellation.
+                               .setCancelFunction(() -> {
+                                       internalJobStatus = 
JobStatus.CANCELLING;
+                                       return cancellationFuture;
+                               })
+                               .build();
+               }
+
+               public JobID getJobID() {
+                       return jobGraph.getJobID();
+               }
+
+               public void failInitialization(Throwable ex) {
+                       
jobManagerRunnerCompletableFuture.completeExceptionally(ex);
+               }
+
+               public DispatcherJob getDispatcherJob() {
+                       return dispatcherJob;
+               }
+
+               public void setRunning() {
+                       internalJobStatus = JobStatus.RUNNING;
+                       TestingJobManagerRunner jobManagerRunner =
+                               new TestingJobManagerRunner(getJobID(), false);
+                       
jobManagerRunner.getJobMasterGateway().complete(mockRunningJobMasterGateway);
+                       
jobManagerRunnerCompletableFuture.complete(jobManagerRunner);
+               }
+
+               public void finishJob() {
+                       try {
+                               internalJobStatus = JobStatus.FINISHED;
+                               
jobManagerRunnerCompletableFuture.get().getResultFuture()

Review comment:
       `CompletableFuture.join()` throws an unchecked exception which might 
make this a bit easier here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -544,10 +705,13 @@ public void 
testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception {
                        .build();
 
                dispatcher.start();
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
 
                final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
-               final CompletableFuture<Acknowledge> submissionFuture = 
dispatcherGateway.submitJob(jobGraph, TIMEOUT);
-               submissionFuture.get();
+               dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+               CommonTestUtils.waitUntilJobManagerIsInitialized(() -> 
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get());

Review comment:
       Why do we have to wait for this here?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -329,6 +345,125 @@ public void 
testJobSubmissionWithPartialResourceConfigured() throws Exception {
                }
        }
 
+       @Test(timeout = 5_000L)
+       public void testNonBlockingJobSubmission() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+               DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               // create a job graph of a job that blocks forever
+               final BlockingJobVertex blockingJobVertex = new 
BlockingJobVertex("testVertex");
+               blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+               JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID, 
"blockingTestJob", blockingJobVertex);
+
+               dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+               // ensure INITIALIZING status from status
+               CompletableFuture<JobStatus> jobStatusFuture = 
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+               Assert.assertEquals(JobStatus.INITIALIZING, 
jobStatusFuture.get());
+
+               // ensure correct JobDetails
+               MultipleJobsDetails multiDetails = 
dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
+               Assert.assertEquals(1, multiDetails.getJobs().size());
+               Assert.assertEquals(blockingJobGraph.getJobID(), 
multiDetails.getJobs().iterator().next().getJobId());
+
+               // submission has succeeded, let the initialization finish.
+               blockingJobVertex.unblock();
+
+               // wait till job is running
+               CommonTestUtils.waitUntilJobManagerIsInitialized(() -> 
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT).get());
+
+               dispatcherGateway.cancelJob(blockingJobGraph.getJobID(), 
TIMEOUT).get();
+       }
+
+       @Test(timeout = 5_000L)
+       public void testInvalidCallDuringInitialization() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+               DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               // create a job graph of a job that blocks forever
+               final BlockingJobVertex blockingJobVertex = new 
BlockingJobVertex("testVertex");
+               blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+               JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID, 
"blockingTestJob", blockingJobVertex);
+
+               dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+               // ensure INITIALIZING status
+               CompletableFuture<JobStatus> jobStatusFuture = 
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+               Assert.assertEquals(JobStatus.INITIALIZING, 
jobStatusFuture.get());
+
+               // this call is supposed to fail
+               boolean exceptionSeen = false;

Review comment:
       by adding a `fail` after `dG.triggerSavepoint`, we could get rid of 
`exceptionSeen`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
##########
@@ -31,11 +31,11 @@
 /**
  * {@link Dispatcher} implementation used for testing purposes.
  */
-class TestingDispatcher extends Dispatcher {
+public class TestingDispatcher extends Dispatcher {

Review comment:
       Please revert after updating the `ClientUtilsTest`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+       private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+       private final CompletableFuture<JobManagerRunner> 
jobManagerRunnerFuture;
+       private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+       private final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+
+       private final long initializationTimestamp;
+       private final JobID jobId;
+       private final String jobName;
+
+       private final Object lock = new Object();
+
+       // internal field to track job status during initialization. Is not 
updated anymore after
+       // job is initialized, cancelled or failed.
+       @GuardedBy("lock")
+       private DispatcherJobStatus jobStatus = 
DispatcherJobStatus.INITIALIZING;
+
+       private enum DispatcherJobStatus {
+               // We are waiting for the JobManagerRunner to be initialized
+               INITIALIZING(JobStatus.INITIALIZING),
+               // JobManagerRunner is initialized
+               JOB_MANAGER_RUNNER_INITIALIZED(null),
+               // waiting for cancellation. We stay in this status until the 
job result future completed,
+               // then we consider the JobManager to be initialized.
+               CANCELLING(JobStatus.CANCELLING);
+
+               @Nullable
+               private final JobStatus jobStatus;
+
+               DispatcherJobStatus(JobStatus jobStatus) {
+                       this.jobStatus = jobStatus;
+               }
+
+               public JobStatus asJobStatus() {
+                       if (jobStatus == null) {
+                               throw new IllegalStateException("This state is 
not defined as a 'JobStatus'");
+                       }
+                       return jobStatus;
+               }
+       }
+
+       static DispatcherJob createFor(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName,
+               Dispatcher.ExecutionType executionType) {
+               return new DispatcherJob(jobManagerRunnerFuture, jobId, 
jobName, executionType);
+       }
+
+       private DispatcherJob(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName,
+               Dispatcher.ExecutionType executionType) {
+               this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+               this.jobId = jobId;
+               this.jobName = jobName;
+               this.initializationTimestamp = System.currentTimeMillis();
+               this.jobResultFuture = new CompletableFuture<>();
+
+               
FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner,
 throwable) -> {
+                       // JM has been initialized, or the initialization failed
+                       synchronized (lock) {
+                               if (jobStatus != 
DispatcherJobStatus.CANCELLING) {
+                                       jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+                               }
+                               if (throwable == null) {
+                                       // Forward result future
+                                       
FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+                               } else { // failure during initialization
+                                       if (executionType == 
Dispatcher.ExecutionType.RECOVERY) {
+                                               
jobResultFuture.completeExceptionally(throwable);
+                                       } else {
+                                               
jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+                                                       jobId,
+                                                       jobName,
+                                                       JobStatus.FAILED,
+                                                       throwable,
+                                                       
initializationTimestamp));
+                                       }
+                               }
+                       }
+                       return null;
+               }));
+       }
+
+       public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+               return jobResultFuture;
+       }
+
+       public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+               return requestJobStatus(timeout).thenApply(status -> {
+                       int[] tasksPerState = new 
int[ExecutionState.values().length];
+                       synchronized (lock) {
+                               return new JobDetails(
+                                       jobId,
+                                       jobName,
+                                       initializationTimestamp,
+                                       0,
+                                       0,
+                                       status,
+                                       0,
+                                       tasksPerState,
+                                       0);
+                       }
+               });
+       }
+
+       /**
+        * Cancel job.
+        * A cancellation will be scheduled if the initialization is not 
completed.
+        * The returned future will complete exceptionally if the 
JobManagerRunner initialization failed.
+        */
+       public CompletableFuture<Acknowledge> cancel(Time timeout) {
+               synchronized (lock) {
+                       if (isInitialized()) {
+                               return 
getJobMasterGateway().thenCompose(jobMasterGateway -> 
jobMasterGateway.cancel(timeout));
+                       } else {
+                               log.info("Cancellation during initialization 
requested for job {}. Job will be cancelled once JobManager has been 
initialized.", jobId);
+
+                               // cancel job
+                               jobManagerRunnerFuture
+                                       
.thenCompose(JobManagerRunner::getJobMasterGateway)
+                                       .thenCompose(jobMasterGateway -> 
jobMasterGateway.cancel(RpcUtils.INF_TIMEOUT))
+                                       .whenComplete((ignored, 
cancelThrowable) -> {
+                                               if (cancelThrowable != null) {
+                                                       log.warn("Cancellation 
of job {} failed", jobId, cancelThrowable);
+                                               }
+                                       });
+                               jobStatus = DispatcherJobStatus.CANCELLING;
+                               
jobResultFuture.whenComplete(((archivedExecutionGraph, throwable) -> {
+                                       if (archivedExecutionGraph != null) {
+                                               jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+                                       }
+                               }));
+                               return jobResultFuture.thenApply(ignored -> 
Acknowledge.get());
+                       }
+               }
+       }
+
+       public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+               return 
requestJob(timeout).thenApply(ArchivedExecutionGraph::getState);
+       }
+
+       /**
+        * Returns a future completing to the ArchivedExecutionGraph of the job.
+        */
+       public CompletableFuture<ArchivedExecutionGraph> requestJob(Time 
timeout) {
+               synchronized (lock) {
+                       if (isInitialized()) {
+                               if (jobResultFuture.isDone()) { // job is not 
running anymore
+                                       return jobResultFuture;
+                               }
+                               return 
getJobMasterGateway().thenCompose(jobMasterGateway -> 
jobMasterGateway.requestJob(
+                                       timeout));
+                       } else {
+                               Preconditions.checkState(this.jobStatus == 
DispatcherJobStatus.INITIALIZING || jobStatus == 
DispatcherJobStatus.CANCELLING);
+                               return CompletableFuture.completedFuture(
+                                       
ArchivedExecutionGraph.createFromInitializingJob(
+                                               jobId,
+                                               jobName,
+                                               jobStatus.asJobStatus(),
+                                               null,
+                                               initializationTimestamp));
+                       }
+               }
+       }
+
+       /**
+        * The job is initialized once the JobManager runner has been 
initialized.
+        * It is also initialized if the runner initialization failed, or of it 
has been
+        * canceled (and the cancellation is complete).
+        */
+       public boolean isInitialized() {
+               synchronized (lock) {
+                       return jobStatus == 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+               }
+       }
+
+       /**
+        * Returns the {@link JobMasterGateway} from the JobManagerRunner.
+        * This method will fail with an {@link IllegalStateException} if the 
job is initialized.
+        * The returned future will complete exceptionally if the 
JobManagerRunner initialization failed.
+        * @return the {@link JobMasterGateway}.
+        */
+       public CompletableFuture<JobMasterGateway> getJobMasterGateway() {
+               Preconditions.checkState(
+                       isInitialized(),
+                       "JobMaster Gateway is not available during 
initialization");
+               return 
jobManagerRunnerFuture.thenCompose(JobManagerRunner::getJobMasterGateway);
+       }
+
+       @Override
+       public CompletableFuture<Void> closeAsync() {
+               
FutureUtils.assertNoException(jobManagerRunnerFuture.handle((runner, throwable) 
-> {
+                       if (throwable == null) {
+                               // init was successful: close jobManager runner.
+                               CompletableFuture<Void> jobManagerRunnerClose = 
jobManagerRunnerFuture.thenCompose(
+                                       AutoCloseableAsync::closeAsync);
+                               FutureUtils.forward(jobManagerRunnerClose, 
terminationFuture);
+                       } else {
+                               // initialization has failed: forward failure.
+                               
terminationFuture.completeExceptionally(throwable);

Review comment:
       I might have given you bad advice here. Forwarding an initialization 
problem which is not an abnormal condition for the `Dispatcher` might lead to a 
failure reported during the shut down of the `Dispatcher`. I think it is fine 
to say that a failed job manager initialization is an acceptable condition and, 
hence, to simply complete the `terminationFuture` here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -329,6 +345,125 @@ public void 
testJobSubmissionWithPartialResourceConfigured() throws Exception {
                }
        }
 
+       @Test(timeout = 5_000L)
+       public void testNonBlockingJobSubmission() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+               DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               // create a job graph of a job that blocks forever
+               final BlockingJobVertex blockingJobVertex = new 
BlockingJobVertex("testVertex");
+               blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+               JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID, 
"blockingTestJob", blockingJobVertex);
+
+               dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+               // ensure INITIALIZING status from status
+               CompletableFuture<JobStatus> jobStatusFuture = 
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+               Assert.assertEquals(JobStatus.INITIALIZING, 
jobStatusFuture.get());
+
+               // ensure correct JobDetails
+               MultipleJobsDetails multiDetails = 
dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
+               Assert.assertEquals(1, multiDetails.getJobs().size());
+               Assert.assertEquals(blockingJobGraph.getJobID(), 
multiDetails.getJobs().iterator().next().getJobId());
+
+               // submission has succeeded, let the initialization finish.
+               blockingJobVertex.unblock();
+
+               // wait till job is running
+               CommonTestUtils.waitUntilJobManagerIsInitialized(() -> 
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT).get());
+
+               dispatcherGateway.cancelJob(blockingJobGraph.getJobID(), 
TIMEOUT).get();
+       }
+
+       @Test(timeout = 5_000L)
+       public void testInvalidCallDuringInitialization() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+               DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               // create a job graph of a job that blocks forever
+               final BlockingJobVertex blockingJobVertex = new 
BlockingJobVertex("testVertex");
+               blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+               JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID, 
"blockingTestJob", blockingJobVertex);

Review comment:
       Duplicated code. I'd suggest to create a method which can be reused.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -835,51 +796,50 @@ private void jobMasterFailed(JobID jobId, Throwable 
cause) {
        }
 
        @Nonnull
-       private <T> List<CompletableFuture<Optional<T>>> 
queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>> 
queryFunction) {
-               final int numberJobsRunning = jobManagerRunnerFutures.size();
+       private <T> List<CompletableFuture<Optional<T>>> 
queryJobMastersForInformation(Function<DispatcherJob, CompletableFuture<T>> 
queryFunction) {
 
                ArrayList<CompletableFuture<Optional<T>>> 
optionalJobInformation = new ArrayList<>(
-                       numberJobsRunning);
+                       runningJobs.size());
 
-               for (JobID jobId : jobManagerRunnerFutures.keySet()) {
-                       final CompletableFuture<JobMasterGateway> 
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
-                       final CompletableFuture<Optional<T>> optionalRequest = 
jobMasterGatewayFuture
-                               .thenCompose(queryFunction::apply)
-                               .handle((T value, Throwable throwable) -> 
Optional.ofNullable(value));
-
-                       optionalJobInformation.add(optionalRequest);
+               for (DispatcherJob job : runningJobs.values()) {
+                       final CompletableFuture<Optional<T>> queryResult = 
queryFunction.apply(job)
+                                       .handle((T value, Throwable t) -> 
Optional.ofNullable(value));
+                       optionalJobInformation.add(queryResult);
                }
                return optionalJobInformation;
        }
 
-       private CompletableFuture<Void> waitForTerminatingJobManager(JobID 
jobId, JobGraph jobGraph, FunctionWithException<JobGraph, 
CompletableFuture<Void>, ?> action) {
+       private CompletableFuture<Void> waitForTerminatingJobManager(JobID 
jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
                final CompletableFuture<Void> jobManagerTerminationFuture = 
getJobTerminationFuture(jobId)
                        .exceptionally((Throwable throwable) -> {
-                               throw new CompletionException(
-                                       new DispatcherException(
-                                               String.format("Termination of 
previous JobManager for job %s failed. Cannot submit job under the same job 
id.", jobId),
-                                               throwable)); });
-
-               return jobManagerTerminationFuture.thenComposeAsync(
-                       FunctionUtils.uncheckedFunction((ignored) -> {
-                               jobManagerTerminationFutures.remove(jobId);
-                               return action.apply(jobGraph);
+                               if (!ExceptionUtils.findThrowable(throwable, 
JobInitializationException.class).isPresent()) {
+                                       throw new CompletionException(
+                                               new DispatcherException(
+                                                       
String.format("Termination of previous JobManager for job %s failed. Cannot 
submit job under the same job id.", jobId),
+                                                       throwable));
+                               }

Review comment:
       If we don't complete the `DispatcherJob.terminationFuture` with failures 
from the `JobManagerRunner`, then we don't have do to this special casing here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static junit.framework.Assert.fail;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Test for the {@link DispatcherJob} class.
+ */
+public class DispatcherJobTest extends TestLogger {
+
+       protected final Logger log = LoggerFactory.getLogger(getClass());
+
+       private static final Time TIMEOUT = Time.seconds(10L);
+       private static final JobID TEST_JOB_ID = new JobID();
+
+       @Test
+       public void testStatusWhenInitializing() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+               Assert.assertThat(dispatcherJob.getResultFuture().isDone(), 
is(false));
+               assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+       }
+
+       @Test
+       public void testStatusWhenRunning() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               // finish initialization
+               testContext.setRunning();
+
+               assertJobStatus(dispatcherJob, JobStatus.RUNNING);
+
+               // result future not done
+               Assert.assertThat(dispatcherJob.getResultFuture().isDone(), 
is(false));
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+       }
+
+       @Test
+       public void testStatusWhenJobFinished() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               // finish job
+               testContext.setRunning();
+               testContext.finishJob();
+
+               assertJobStatus(dispatcherJob, JobStatus.FINISHED);
+
+               // assert result future done
+               ArchivedExecutionGraph aeg = 
dispatcherJob.getResultFuture().get();
+
+               Assert.assertThat(aeg.getState(), is(JobStatus.FINISHED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileInitializing() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+               assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+
+               CompletableFuture<Acknowledge> cancelFuture = 
dispatcherJob.cancel(
+                       TIMEOUT);
+
+               Assert.assertThat(cancelFuture.isDone(), is(false));
+               Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+
+               testContext.setRunning();
+               testContext.finishCancellation();
+
+               // assert that cancel future completes
+               cancelFuture.get();
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+               // assert that the result future completes
+               
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(), 
is(JobStatus.CANCELED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileRunning() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               testContext.setRunning();
+               CompletableFuture<Acknowledge> cancelFuture = 
dispatcherJob.cancel(TIMEOUT);
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+               testContext.finishCancellation();
+
+               cancelFuture.get();
+               assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+               
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(), 
is(JobStatus.CANCELED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileFailed() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               RuntimeException exception = new RuntimeException("Artificial 
failure in runner initialization");
+               testContext.failInitialization(exception);
+
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+               CommonTestUtils.assertThrows("Artificial failure", 
ExecutionException.class, () -> dispatcherJob.cancel(TIMEOUT).get());
+
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+       }
+
+       @Test
+       public void testErrorWhileInitializing() throws Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.dispatcherJob;
+
+               // now fail
+               RuntimeException exception = new RuntimeException("Artificial 
failure in runner initialization");
+               testContext.failInitialization(exception);
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+               ArchivedExecutionGraph aeg = 
dispatcherJob.getResultFuture().get();
+               
Assert.assertThat(aeg.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader()),
 is(exception));
+
+               Assert.assertTrue(dispatcherJob.closeAsync().isDone() && 
dispatcherJob.closeAsync().isCompletedExceptionally());

Review comment:
       I am not sure whether this should be the case.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -329,6 +345,125 @@ public void 
testJobSubmissionWithPartialResourceConfigured() throws Exception {
                }
        }
 
+       @Test(timeout = 5_000L)
+       public void testNonBlockingJobSubmission() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+               DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               // create a job graph of a job that blocks forever
+               final BlockingJobVertex blockingJobVertex = new 
BlockingJobVertex("testVertex");
+               blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+               JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID, 
"blockingTestJob", blockingJobVertex);
+
+               dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+               // ensure INITIALIZING status from status
+               CompletableFuture<JobStatus> jobStatusFuture = 
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+               Assert.assertEquals(JobStatus.INITIALIZING, 
jobStatusFuture.get());
+
+               // ensure correct JobDetails
+               MultipleJobsDetails multiDetails = 
dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
+               Assert.assertEquals(1, multiDetails.getJobs().size());
+               Assert.assertEquals(blockingJobGraph.getJobID(), 
multiDetails.getJobs().iterator().next().getJobId());
+
+               // submission has succeeded, let the initialization finish.
+               blockingJobVertex.unblock();
+
+               // wait till job is running
+               CommonTestUtils.waitUntilJobManagerIsInitialized(() -> 
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT).get());
+
+               dispatcherGateway.cancelJob(blockingJobGraph.getJobID(), 
TIMEOUT).get();

Review comment:
       What does this call test?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static junit.framework.Assert.fail;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Test for the {@link DispatcherJob} class.
+ */
+public class DispatcherJobTest extends TestLogger {
+
+       protected final Logger log = LoggerFactory.getLogger(getClass());
+
+       private static final Time TIMEOUT = Time.seconds(10L);
+       private static final JobID TEST_JOB_ID = new JobID();
+
+       @Test
+       public void testStatusWhenInitializing() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+               Assert.assertThat(dispatcherJob.getResultFuture().isDone(), 
is(false));
+               assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+       }
+
+       @Test
+       public void testStatusWhenRunning() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               // finish initialization
+               testContext.setRunning();
+
+               assertJobStatus(dispatcherJob, JobStatus.RUNNING);
+
+               // result future not done
+               Assert.assertThat(dispatcherJob.getResultFuture().isDone(), 
is(false));
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+       }
+
+       @Test
+       public void testStatusWhenJobFinished() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               // finish job
+               testContext.setRunning();
+               testContext.finishJob();
+
+               assertJobStatus(dispatcherJob, JobStatus.FINISHED);
+
+               // assert result future done
+               ArchivedExecutionGraph aeg = 
dispatcherJob.getResultFuture().get();
+
+               Assert.assertThat(aeg.getState(), is(JobStatus.FINISHED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileInitializing() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+               assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+
+               CompletableFuture<Acknowledge> cancelFuture = 
dispatcherJob.cancel(
+                       TIMEOUT);
+
+               Assert.assertThat(cancelFuture.isDone(), is(false));
+               Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+
+               testContext.setRunning();
+               testContext.finishCancellation();
+
+               // assert that cancel future completes
+               cancelFuture.get();
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+               // assert that the result future completes
+               
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(), 
is(JobStatus.CANCELED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileRunning() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               testContext.setRunning();
+               CompletableFuture<Acknowledge> cancelFuture = 
dispatcherJob.cancel(TIMEOUT);
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+               testContext.finishCancellation();
+
+               cancelFuture.get();
+               assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+               
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(), 
is(JobStatus.CANCELED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileFailed() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               RuntimeException exception = new RuntimeException("Artificial 
failure in runner initialization");
+               testContext.failInitialization(exception);
+
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+               CommonTestUtils.assertThrows("Artificial failure", 
ExecutionException.class, () -> dispatcherJob.cancel(TIMEOUT).get());
+
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+       }
+
+       @Test
+       public void testErrorWhileInitializing() throws Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.dispatcherJob;
+
+               // now fail
+               RuntimeException exception = new RuntimeException("Artificial 
failure in runner initialization");
+               testContext.failInitialization(exception);
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+               ArchivedExecutionGraph aeg = 
dispatcherJob.getResultFuture().get();
+               
Assert.assertThat(aeg.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader()),
 is(exception));
+
+               Assert.assertTrue(dispatcherJob.closeAsync().isDone() && 
dispatcherJob.closeAsync().isCompletedExceptionally());
+       }
+
+       @Test
+       public void testCloseWhileInitializingSuccessfully() throws Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               CompletableFuture<Void> closeFuture = 
dispatcherJob.closeAsync();
+               Assert.assertThat(closeFuture.isDone(), is(false));
+
+               // set job running, so that we can cancel it
+               testContext.setRunning();
+
+               // assert future completes now
+               closeFuture.get();
+
+               // ensure the result future is complete (how it completes is up 
to the JobManager)
+               CompletableFuture<ArchivedExecutionGraph> resultFuture = 
dispatcherJob.getResultFuture();
+               CommonTestUtils.assertThrows("has not been finished", 
ExecutionException.class,
+                       resultFuture::get);
+       }
+
+       @Test
+       public void testCloseWhileInitializingErroneously() throws Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               CompletableFuture<Void> closeFuture = 
dispatcherJob.closeAsync();
+               Assert.assertThat(closeFuture.isDone(), is(false));
+
+               testContext.failInitialization(new RuntimeException("fail"));
+
+               // assert future completes now
+               CommonTestUtils.assertThrows("fail", ExecutionException.class,
+                       closeFuture::get);
+
+               // ensure the result future is complete
+               
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(), 
is(JobStatus.FAILED));
+       }
+
+       @Test
+       public void testCloseWhileInitializingErroneouslyForRecovery() {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.RECOVERY);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               CompletableFuture<Void> closeFuture = 
dispatcherJob.closeAsync();
+
+               testContext.failInitialization(new RuntimeException("fail"));
+
+               CommonTestUtils.assertThrows("fail", ExecutionException.class,
+                       closeFuture::get);
+               // ensure the result future is completing exceptionally when 
using RECOVERY execution
+               CommonTestUtils.assertThrows("fail", ExecutionException.class,
+                       () -> dispatcherJob.getResultFuture().get());
+       }
+
+       @Test
+       public void testCloseWhileRunning() throws Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               // complete JobManager runner future to indicate to the 
DispatcherJob that the Runner has been initialized
+               testContext.setRunning();
+
+               CompletableFuture<Void> closeFuture = 
dispatcherJob.closeAsync();
+
+               closeFuture.get();
+
+               // result future should complete exceptionally.
+               CompletableFuture<ArchivedExecutionGraph> resultFuture = 
dispatcherJob.getResultFuture();
+               CommonTestUtils.assertThrows("has not been finished", 
ExecutionException.class,
+                       resultFuture::get);
+       }
+
+       @Test(expected = IllegalStateException.class)
+       public void testUnavailableJobMasterGateway() {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+               dispatcherJob.getJobMasterGateway();
+       }
+
+       private TestContext createTestContext(Dispatcher.ExecutionType type) {
+               final JobVertex testVertex = new JobVertex("testVertex");
+               testVertex.setInvokableClass(NoOpInvokable.class);
+
+               JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob", 
testVertex);
+               CompletableFuture<JobManagerRunner> 
jobManagerRunnerCompletableFuture = new CompletableFuture<>();
+               DispatcherJob dispatcherJob = 
DispatcherJob.createFor(jobManagerRunnerCompletableFuture,
+                       jobGraph.getJobID(), jobGraph.getName(), type);
+
+               return new TestContext(
+                       jobManagerRunnerCompletableFuture,
+                       dispatcherJob,
+                       jobGraph);
+       }
+
+       private static class TestContext {
+               private final CompletableFuture<JobManagerRunner> 
jobManagerRunnerCompletableFuture;
+               private final DispatcherJob dispatcherJob;
+               private final JobGraph jobGraph;
+               private final TestingJobMasterGateway 
mockRunningJobMasterGateway;
+               private final CompletableFuture<Acknowledge> cancellationFuture;
+
+               private JobStatus internalJobStatus = JobStatus.INITIALIZING;
+
+               public TestContext(
+                       CompletableFuture<JobManagerRunner> 
jobManagerRunnerCompletableFuture,
+                       DispatcherJob dispatcherJob,
+                       JobGraph jobGraph) {
+                       this.jobManagerRunnerCompletableFuture = 
jobManagerRunnerCompletableFuture;
+                       this.dispatcherJob = dispatcherJob;
+                       this.jobGraph = jobGraph;
+
+                       this.cancellationFuture = new CompletableFuture<>();
+                       this.mockRunningJobMasterGateway = new 
TestingJobMasterGatewayBuilder()
+                               .setRequestJobSupplier(() -> 
CompletableFuture.completedFuture(ArchivedExecutionGraph.createFromInitializingJob(getJobID(),
 "test", internalJobStatus, null, 1337)))
+                               .setRequestJobDetailsSupplier(() -> {
+                                       JobDetails jobDetails = new 
JobDetails(getJobID(), "", 0, 0, 0, internalJobStatus, 0,
+                                               new int[]{0, 0, 0, 0, 0, 0, 0, 
0, 0}, 0);
+                                       return 
CompletableFuture.completedFuture(jobDetails);
+                               })
+                               // once JobManagerRunner is initialized, 
complete result future with CANCELLED AEG and ack cancellation.
+                               .setCancelFunction(() -> {
+                                       internalJobStatus = 
JobStatus.CANCELLING;
+                                       return cancellationFuture;
+                               })
+                               .build();
+               }
+
+               public JobID getJobID() {
+                       return jobGraph.getJobID();
+               }
+
+               public void failInitialization(Throwable ex) {
+                       
jobManagerRunnerCompletableFuture.completeExceptionally(ex);
+               }
+
+               public DispatcherJob getDispatcherJob() {
+                       return dispatcherJob;
+               }
+
+               public void setRunning() {
+                       internalJobStatus = JobStatus.RUNNING;
+                       TestingJobManagerRunner jobManagerRunner =
+                               new TestingJobManagerRunner(getJobID(), false);
+                       
jobManagerRunner.getJobMasterGateway().complete(mockRunningJobMasterGateway);
+                       
jobManagerRunnerCompletableFuture.complete(jobManagerRunner);
+               }
+
+               public void finishJob() {
+                       try {
+                               internalJobStatus = JobStatus.FINISHED;
+                               
jobManagerRunnerCompletableFuture.get().getResultFuture()
+                                       
.complete(ArchivedExecutionGraph.createFromInitializingJob(getJobID(), "test", 
JobStatus.FINISHED, null, 1337));

Review comment:
       Same hack as in `setRunning`. I'd suggest to pass in a `jobResultFuture 
to the `JobManagerRunner` which is created to complete 
`jobManagerRunnerCompletableFuture`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -419,19 +554,29 @@ public void testWaitingForJobMasterLeadership() throws 
Exception {
 
                dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
 
-               final CompletableFuture<JobStatus> jobStatusFuture = 
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT);
+               CompletableFuture<JobStatus> jobStatusFuture = 
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT);
 
                assertThat(jobStatusFuture.isDone(), is(false));

Review comment:
       Are you sure that this holds now? I thought it would return 
`JobStatus.INITIALIZING` at least.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static junit.framework.Assert.fail;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Test for the {@link DispatcherJob} class.
+ */
+public class DispatcherJobTest extends TestLogger {
+
+       protected final Logger log = LoggerFactory.getLogger(getClass());
+
+       private static final Time TIMEOUT = Time.seconds(10L);
+       private static final JobID TEST_JOB_ID = new JobID();
+
+       @Test
+       public void testStatusWhenInitializing() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+               Assert.assertThat(dispatcherJob.getResultFuture().isDone(), 
is(false));
+               assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+       }
+
+       @Test
+       public void testStatusWhenRunning() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               // finish initialization
+               testContext.setRunning();
+
+               assertJobStatus(dispatcherJob, JobStatus.RUNNING);
+
+               // result future not done
+               Assert.assertThat(dispatcherJob.getResultFuture().isDone(), 
is(false));
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+       }
+
+       @Test
+       public void testStatusWhenJobFinished() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               // finish job
+               testContext.setRunning();
+               testContext.finishJob();
+
+               assertJobStatus(dispatcherJob, JobStatus.FINISHED);
+
+               // assert result future done
+               ArchivedExecutionGraph aeg = 
dispatcherJob.getResultFuture().get();
+
+               Assert.assertThat(aeg.getState(), is(JobStatus.FINISHED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileInitializing() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+               assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+
+               CompletableFuture<Acknowledge> cancelFuture = 
dispatcherJob.cancel(
+                       TIMEOUT);
+
+               Assert.assertThat(cancelFuture.isDone(), is(false));
+               Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+
+               testContext.setRunning();
+               testContext.finishCancellation();
+
+               // assert that cancel future completes
+               cancelFuture.get();
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+               // assert that the result future completes
+               
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(), 
is(JobStatus.CANCELED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileRunning() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               testContext.setRunning();
+               CompletableFuture<Acknowledge> cancelFuture = 
dispatcherJob.cancel(TIMEOUT);
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+               testContext.finishCancellation();
+
+               cancelFuture.get();
+               assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+               
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(), 
is(JobStatus.CANCELED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileFailed() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               RuntimeException exception = new RuntimeException("Artificial 
failure in runner initialization");
+               testContext.failInitialization(exception);
+
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+               CommonTestUtils.assertThrows("Artificial failure", 
ExecutionException.class, () -> dispatcherJob.cancel(TIMEOUT).get());
+
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+       }
+
+       @Test
+       public void testErrorWhileInitializing() throws Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.dispatcherJob;
+
+               // now fail
+               RuntimeException exception = new RuntimeException("Artificial 
failure in runner initialization");
+               testContext.failInitialization(exception);
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+               ArchivedExecutionGraph aeg = 
dispatcherJob.getResultFuture().get();
+               
Assert.assertThat(aeg.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader()),
 is(exception));
+
+               Assert.assertTrue(dispatcherJob.closeAsync().isDone() && 
dispatcherJob.closeAsync().isCompletedExceptionally());
+       }
+
+       @Test
+       public void testCloseWhileInitializingSuccessfully() throws Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               CompletableFuture<Void> closeFuture = 
dispatcherJob.closeAsync();
+               Assert.assertThat(closeFuture.isDone(), is(false));
+
+               // set job running, so that we can cancel it
+               testContext.setRunning();
+
+               // assert future completes now
+               closeFuture.get();
+
+               // ensure the result future is complete (how it completes is up 
to the JobManager)
+               CompletableFuture<ArchivedExecutionGraph> resultFuture = 
dispatcherJob.getResultFuture();
+               CommonTestUtils.assertThrows("has not been finished", 
ExecutionException.class,
+                       resultFuture::get);
+       }
+
+       @Test
+       public void testCloseWhileInitializingErroneously() throws Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               CompletableFuture<Void> closeFuture = 
dispatcherJob.closeAsync();
+               Assert.assertThat(closeFuture.isDone(), is(false));
+
+               testContext.failInitialization(new RuntimeException("fail"));
+
+               // assert future completes now
+               CommonTestUtils.assertThrows("fail", ExecutionException.class,
+                       closeFuture::get);
+
+               // ensure the result future is complete
+               
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(), 
is(JobStatus.FAILED));
+       }
+
+       @Test
+       public void testCloseWhileInitializingErroneouslyForRecovery() {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.RECOVERY);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               CompletableFuture<Void> closeFuture = 
dispatcherJob.closeAsync();
+
+               testContext.failInitialization(new RuntimeException("fail"));
+
+               CommonTestUtils.assertThrows("fail", ExecutionException.class,
+                       closeFuture::get);
+               // ensure the result future is completing exceptionally when 
using RECOVERY execution
+               CommonTestUtils.assertThrows("fail", ExecutionException.class,
+                       () -> dispatcherJob.getResultFuture().get());
+       }
+
+       @Test
+       public void testCloseWhileRunning() throws Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               // complete JobManager runner future to indicate to the 
DispatcherJob that the Runner has been initialized
+               testContext.setRunning();
+
+               CompletableFuture<Void> closeFuture = 
dispatcherJob.closeAsync();
+
+               closeFuture.get();
+
+               // result future should complete exceptionally.
+               CompletableFuture<ArchivedExecutionGraph> resultFuture = 
dispatcherJob.getResultFuture();
+               CommonTestUtils.assertThrows("has not been finished", 
ExecutionException.class,
+                       resultFuture::get);
+       }
+
+       @Test(expected = IllegalStateException.class)
+       public void testUnavailableJobMasterGateway() {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+               dispatcherJob.getJobMasterGateway();
+       }
+
+       private TestContext createTestContext(Dispatcher.ExecutionType type) {
+               final JobVertex testVertex = new JobVertex("testVertex");
+               testVertex.setInvokableClass(NoOpInvokable.class);
+
+               JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob", 
testVertex);
+               CompletableFuture<JobManagerRunner> 
jobManagerRunnerCompletableFuture = new CompletableFuture<>();
+               DispatcherJob dispatcherJob = 
DispatcherJob.createFor(jobManagerRunnerCompletableFuture,
+                       jobGraph.getJobID(), jobGraph.getName(), type);
+
+               return new TestContext(
+                       jobManagerRunnerCompletableFuture,
+                       dispatcherJob,
+                       jobGraph);
+       }
+
+       private static class TestContext {
+               private final CompletableFuture<JobManagerRunner> 
jobManagerRunnerCompletableFuture;
+               private final DispatcherJob dispatcherJob;
+               private final JobGraph jobGraph;
+               private final TestingJobMasterGateway 
mockRunningJobMasterGateway;
+               private final CompletableFuture<Acknowledge> cancellationFuture;
+
+               private JobStatus internalJobStatus = JobStatus.INITIALIZING;
+
+               public TestContext(
+                       CompletableFuture<JobManagerRunner> 
jobManagerRunnerCompletableFuture,
+                       DispatcherJob dispatcherJob,
+                       JobGraph jobGraph) {
+                       this.jobManagerRunnerCompletableFuture = 
jobManagerRunnerCompletableFuture;
+                       this.dispatcherJob = dispatcherJob;
+                       this.jobGraph = jobGraph;
+
+                       this.cancellationFuture = new CompletableFuture<>();
+                       this.mockRunningJobMasterGateway = new 
TestingJobMasterGatewayBuilder()
+                               .setRequestJobSupplier(() -> 
CompletableFuture.completedFuture(ArchivedExecutionGraph.createFromInitializingJob(getJobID(),
 "test", internalJobStatus, null, 1337)))
+                               .setRequestJobDetailsSupplier(() -> {
+                                       JobDetails jobDetails = new 
JobDetails(getJobID(), "", 0, 0, 0, internalJobStatus, 0,
+                                               new int[]{0, 0, 0, 0, 0, 0, 0, 
0, 0}, 0);
+                                       return 
CompletableFuture.completedFuture(jobDetails);
+                               })
+                               // once JobManagerRunner is initialized, 
complete result future with CANCELLED AEG and ack cancellation.
+                               .setCancelFunction(() -> {
+                                       internalJobStatus = 
JobStatus.CANCELLING;
+                                       return cancellationFuture;
+                               })
+                               .build();
+               }
+
+               public JobID getJobID() {
+                       return jobGraph.getJobID();
+               }
+
+               public void failInitialization(Throwable ex) {
+                       
jobManagerRunnerCompletableFuture.completeExceptionally(ex);
+               }
+
+               public DispatcherJob getDispatcherJob() {
+                       return dispatcherJob;
+               }
+
+               public void setRunning() {
+                       internalJobStatus = JobStatus.RUNNING;
+                       TestingJobManagerRunner jobManagerRunner =
+                               new TestingJobManagerRunner(getJobID(), false);
+                       
jobManagerRunner.getJobMasterGateway().complete(mockRunningJobMasterGateway);

Review comment:
       This is a hack. I would suggest to add a 
`TestingJobManagerRunnerBuilder` where one can set the result of the 
`getJobMasterGateway()` function.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -329,6 +345,125 @@ public void 
testJobSubmissionWithPartialResourceConfigured() throws Exception {
                }
        }
 
+       @Test(timeout = 5_000L)
+       public void testNonBlockingJobSubmission() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+               DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               // create a job graph of a job that blocks forever
+               final BlockingJobVertex blockingJobVertex = new 
BlockingJobVertex("testVertex");
+               blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+               JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID, 
"blockingTestJob", blockingJobVertex);
+
+               dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+               // ensure INITIALIZING status from status
+               CompletableFuture<JobStatus> jobStatusFuture = 
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+               Assert.assertEquals(JobStatus.INITIALIZING, 
jobStatusFuture.get());
+
+               // ensure correct JobDetails
+               MultipleJobsDetails multiDetails = 
dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
+               Assert.assertEquals(1, multiDetails.getJobs().size());
+               Assert.assertEquals(blockingJobGraph.getJobID(), 
multiDetails.getJobs().iterator().next().getJobId());
+
+               // submission has succeeded, let the initialization finish.
+               blockingJobVertex.unblock();
+
+               // wait till job is running
+               CommonTestUtils.waitUntilJobManagerIsInitialized(() -> 
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT).get());
+
+               dispatcherGateway.cancelJob(blockingJobGraph.getJobID(), 
TIMEOUT).get();
+       }
+
+       @Test(timeout = 5_000L)
+       public void testInvalidCallDuringInitialization() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+               DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               // create a job graph of a job that blocks forever
+               final BlockingJobVertex blockingJobVertex = new 
BlockingJobVertex("testVertex");
+               blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+               JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID, 
"blockingTestJob", blockingJobVertex);
+
+               dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+               // ensure INITIALIZING status
+               CompletableFuture<JobStatus> jobStatusFuture = 
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+               Assert.assertEquals(JobStatus.INITIALIZING, 
jobStatusFuture.get());
+
+               // this call is supposed to fail
+               boolean exceptionSeen = false;
+               try {
+                       
dispatcherGateway.triggerSavepoint(blockingJobGraph.getJobID(), 
"file:///tmp/savepoint", false, TIMEOUT).get();
+               } catch (ExecutionException t) {
+                       Assert.assertTrue(t.getCause() instanceof 
UnavailableDispatcherOperationException);
+                       exceptionSeen = true;
+               }
+               Assert.assertTrue(exceptionSeen);
+
+               // submission has succeeded, let the initialization finish.
+               blockingJobVertex.unblock();
+       }
+
+       @Test(timeout = 5_000L)
+       public void testCancellationDuringInitialization() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+               DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               // create a job graph of a job that blocks forever
+               final BlockingJobVertex blockingJobVertex = new 
BlockingJobVertex("testVertex");
+               blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+               JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID, 
"blockingTestJob", blockingJobVertex);

Review comment:
       Duplicated code.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -285,12 +299,14 @@ public void tearDown() throws Exception {
        @Test
        public void testJobSubmission() throws Exception {
                dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
-
+               // grant leadership to job master so that we can call 
dispatcherGateway.requestJobStatus().
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
                DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
-               CompletableFuture<Acknowledge> acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+               dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
 
-               acknowledgeFuture.get();
+               CommonTestUtils.waitUntilCondition(() -> 
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get() == 
JobStatus.RUNNING,
+                       Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)), 
20L);

Review comment:
       Why don't we wait on `jobMasterLeaderElectionService.getStartFuture` 
instead of doing busy polling?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -603,6 +769,9 @@ public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() 
throws Exception {
                        .build();
                dispatcher.start();
 
+               // we need to elect a jobmaster leader to be able to cancel the 
job on the JobMaster.
+               
jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();

Review comment:
       Why is this needed? Why do we need to cancel the job here?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -561,10 +725,12 @@ public void 
testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception {
        @Test
        public void testJobSuspensionWhenDispatcherIsTerminated() throws 
Exception {
                dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
 
                DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
                dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+               CommonTestUtils.waitUntilJobManagerIsInitialized(() -> 
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get());

Review comment:
       Why do we have to wait on this here?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -509,28 +657,41 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
                                if (exception != null) {
                                        throw exception;
                                }
-                       }));
+                       }, getMockedRunningJobMasterGateway()));
 
                final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
-               CompletableFuture<Acknowledge> submissionFuture = 
dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+               dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
 
-               assertThat(submissionFuture.isDone(), is(false));
+               
assertThat(dispatcherGateway.requestJobStatus(jobGraph.getJobID(), 
TIMEOUT).get(), is(JobStatus.INITIALIZING));
 
                queue.offer(Optional.of(testException));
 
-               try {
-                       submissionFuture.get();
-                       fail("Should fail because we could not instantiate the 
JobManagerRunner.");
-               } catch (Exception e) {
-                       assertThat(ExceptionUtils.findThrowable(e, t -> 
t.equals(testException)).isPresent(), is(true));
-               }
-
-               submissionFuture = dispatcherGateway.submitJob(jobGraph, 
TIMEOUT);
+               // wait till job is failed
+               JobStatus status;
+               do {
+                       status = dispatcherGateway.requestJobStatus(
+                               jobGraph.getJobID(),
+                               TIMEOUT).get();
+                       Assert.assertThat(
+                               status,
+                               
either(Matchers.is(JobStatus.INITIALIZING)).or(Matchers.is(JobStatus.FAILED)));
+                       Thread.sleep(20);
+               } while (status != JobStatus.FAILED);

Review comment:
       Waiting on `dispatcherGateway.requestJobResult` could save use the busy 
waiting loop.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
##########
@@ -181,17 +186,21 @@ private void testBlobServerCleanup(final TestCase 
testCase) throws Exception {
                }
 
                final CompletableFuture<JobSubmissionResult> submissionFuture = 
miniCluster.submitJob(jobGraph);
+               final JobSubmissionResult jobSubmissionResult = 
submissionFuture.get();
+
+               // wait until job is submitted
+               CommonTestUtils.waitUntilCondition(() -> 
miniCluster.getJobStatus(jobGraph.getJobID()).get() != JobStatus.INITIALIZING,
+                       Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)), 
20L);

Review comment:
       Why do we have to wait on this?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -617,16 +786,23 @@ public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() 
throws Exception {
 
                @Nonnull
                private final ThrowingRunnable<Exception> 
jobManagerRunnerCreationLatch;
+               @Nullable
+               private final JobMasterGateway jobMasterGateway;
 
-               BlockingJobManagerRunnerFactory(@Nonnull 
ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) {
+               BlockingJobManagerRunnerFactory(@Nonnull 
ThrowingRunnable<Exception> jobManagerRunnerCreationLatch, @Nullable 
JobMasterGateway jobMasterGateway) {
                        this.jobManagerRunnerCreationLatch = 
jobManagerRunnerCreationLatch;
+                       this.jobMasterGateway = jobMasterGateway;
                }
 
                @Override
                public TestingJobManagerRunner createJobManagerRunner(JobGraph 
jobGraph, Configuration configuration, RpcService rpcService, 
HighAvailabilityServices highAvailabilityServices, HeartbeatServices 
heartbeatServices, JobManagerSharedServices jobManagerSharedServices, 
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, 
FatalErrorHandler fatalErrorHandler) throws Exception {
                        jobManagerRunnerCreationLatch.run();
 
-                       return super.createJobManagerRunner(jobGraph, 
configuration, rpcService, highAvailabilityServices, heartbeatServices, 
jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
+                       TestingJobManagerRunner runner = 
super.createJobManagerRunner(jobGraph, configuration, rpcService, 
highAvailabilityServices, heartbeatServices, jobManagerSharedServices, 
jobManagerJobMetricGroupFactory, fatalErrorHandler);
+                       if (jobMasterGateway != null) {
+                               
runner.getJobMasterGateway().complete(jobMasterGateway);
+                       }

Review comment:
       Let's not use this hack here. We don't know whether 
`getJobMasterGateway` will always return the same future.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -690,4 +866,66 @@ public JobManagerRunner createJobManagerRunner(
                }
        }
 
+       private static class BlockingJobVertex extends JobVertex {
+               private final Object lock = new Object();
+               private boolean blocking = true;
+               public BlockingJobVertex(String name) {
+                       super(name);
+               }
+
+               @Override
+               public void initializeOnMaster(ClassLoader loader) throws 
Exception {
+                       super.initializeOnMaster(loader);
+
+                       while (true) {
+                               synchronized (lock) {
+                                       if (!blocking) {
+                                               return;
+                                       }
+                                       lock.wait(10);
+                               }
+                       }

Review comment:
       I'd suggest to use a `OneShotLatch` here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -472,23 +618,25 @@ public void testBlockingJobManagerRunner() throws 
Exception {
                dispatcher = createAndStartDispatcher(
                        heartbeatServices,
                        haServices,
-                       new 
BlockingJobManagerRunnerFactory(jobManagerRunnerCreationLatch::await));
-
+                       new 
BlockingJobManagerRunnerFactory(jobManagerRunnerCreationLatch::await, 
getMockedRunningJobMasterGateway()));
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
                final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
-               final CompletableFuture<Acknowledge> submissionFuture = 
dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+               dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
 
-               assertThat(submissionFuture.isDone(), is(false));
+               
assertThat(dispatcherGateway.requestJobStatus(jobGraph.getJobID(), 
TIMEOUT).get(), is(JobStatus.INITIALIZING));
 
                final CompletableFuture<Collection<String>> 
metricQueryServiceAddressesFuture = 
dispatcherGateway.requestMetricQueryServiceAddresses(Time.seconds(5L));
 
                assertThat(metricQueryServiceAddressesFuture.get(), 
is(empty()));
 
-               assertThat(submissionFuture.isDone(), is(false));
+               
assertThat(dispatcherGateway.requestJobStatus(jobGraph.getJobID(), 
TIMEOUT).get(), is(JobStatus.INITIALIZING));
 
                jobManagerRunnerCreationLatch.trigger();
 
-               submissionFuture.get();
+               CommonTestUtils.waitUntilCondition(() -> 
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get() == 
JobStatus.RUNNING,
+                       Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)), 
5L);
+               
assertThat(dispatcherGateway.requestJobStatus(jobGraph.getJobID(), 
TIMEOUT).get(), is(JobStatus.RUNNING));

Review comment:
       That's a bit doppelt gemoppelt.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -419,19 +554,29 @@ public void testWaitingForJobMasterLeadership() throws 
Exception {
 
                dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
 
-               final CompletableFuture<JobStatus> jobStatusFuture = 
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT);
+               CompletableFuture<JobStatus> jobStatusFuture = 
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT);
 
                assertThat(jobStatusFuture.isDone(), is(false));
 
-               try {
-                       jobStatusFuture.get(10, TimeUnit.MILLISECONDS);
-                       fail("Should not complete.");
-               } catch (TimeoutException ignored) {
-                       // ignored
+               // Make sure that the jobstatus request is blocking after it 
has left the INITIALIZING status
+               boolean timeoutSeen = false;
+               while (!timeoutSeen) {
+                       try {
+                               jobStatusFuture = 
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT);
+                               JobStatus jobStatus = jobStatusFuture.get(10, 
TimeUnit.MILLISECONDS);
+                               if (jobStatus != JobStatus.INITIALIZING) {
+                                       fail("Should not complete.");
+                               } else {
+                                       Thread.sleep(10); // give more time to 
initialize
+                               }
+                       } catch (TimeoutException ignored) {
+                               timeoutSeen = true;
+                       }
                }
-
+               // Job is initialized. Make the master leader
                
jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
+               // ensure that the future is now completing

Review comment:
       Maybe it would be better to test the completion of the leader future 
directly on the `JobManagerRunnerImpl`.
   
   I have to admit that I am not a huge fan of busy waiting loops. Whenever one 
sees one, one should think whether one can't express the test differently.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
##########
@@ -166,21 +167,28 @@ public void testResourceCleanupUnderLeadershipChange() 
throws Exception {
                                defaultDispatcherRunnerFactory)) {
 
                                // initial run
-                               DispatcherGateway dispatcherGateway = 
grantLeadership(dispatcherLeaderElectionService);
+                               final DispatcherGateway dispatcherGateway = 
grantLeadership(dispatcherLeaderElectionService);
 
                                LOG.info("Initial job submission {}.", 
jobGraph.getJobID());
                                dispatcherGateway.submitJob(jobGraph, 
TESTING_TIMEOUT).get();
 
+                               // wait until job is running
+                               CommonTestUtils.waitUntilCondition(() ->
+                                       
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TESTING_TIMEOUT).get() 
== JobStatus.RUNNING, Deadline.fromNow(VERIFICATION_TIMEOUT), 20L);

Review comment:
       Why is this required here?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -329,6 +345,125 @@ public void 
testJobSubmissionWithPartialResourceConfigured() throws Exception {
                }
        }
 
+       @Test(timeout = 5_000L)
+       public void testNonBlockingJobSubmission() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+               DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               // create a job graph of a job that blocks forever
+               final BlockingJobVertex blockingJobVertex = new 
BlockingJobVertex("testVertex");
+               blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+               JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID, 
"blockingTestJob", blockingJobVertex);
+
+               dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+               // ensure INITIALIZING status from status
+               CompletableFuture<JobStatus> jobStatusFuture = 
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+               Assert.assertEquals(JobStatus.INITIALIZING, 
jobStatusFuture.get());
+
+               // ensure correct JobDetails
+               MultipleJobsDetails multiDetails = 
dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
+               Assert.assertEquals(1, multiDetails.getJobs().size());
+               Assert.assertEquals(blockingJobGraph.getJobID(), 
multiDetails.getJobs().iterator().next().getJobId());
+
+               // submission has succeeded, let the initialization finish.
+               blockingJobVertex.unblock();
+
+               // wait till job is running
+               CommonTestUtils.waitUntilJobManagerIsInitialized(() -> 
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT).get());
+
+               dispatcherGateway.cancelJob(blockingJobGraph.getJobID(), 
TIMEOUT).get();
+       }
+
+       @Test(timeout = 5_000L)
+       public void testInvalidCallDuringInitialization() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+               DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               // create a job graph of a job that blocks forever
+               final BlockingJobVertex blockingJobVertex = new 
BlockingJobVertex("testVertex");
+               blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+               JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID, 
"blockingTestJob", blockingJobVertex);
+
+               dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+               // ensure INITIALIZING status
+               CompletableFuture<JobStatus> jobStatusFuture = 
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+               Assert.assertEquals(JobStatus.INITIALIZING, 
jobStatusFuture.get());
+
+               // this call is supposed to fail
+               boolean exceptionSeen = false;
+               try {
+                       
dispatcherGateway.triggerSavepoint(blockingJobGraph.getJobID(), 
"file:///tmp/savepoint", false, TIMEOUT).get();
+               } catch (ExecutionException t) {
+                       Assert.assertTrue(t.getCause() instanceof 
UnavailableDispatcherOperationException);
+                       exceptionSeen = true;
+               }
+               Assert.assertTrue(exceptionSeen);
+
+               // submission has succeeded, let the initialization finish.
+               blockingJobVertex.unblock();
+       }
+
+       @Test(timeout = 5_000L)
+       public void testCancellationDuringInitialization() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+               DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               // create a job graph of a job that blocks forever
+               final BlockingJobVertex blockingJobVertex = new 
BlockingJobVertex("testVertex");
+               blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+               JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID, 
"blockingTestJob", blockingJobVertex);
+
+               dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+               
Assert.assertThat(dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(),
 TIMEOUT).get(), is(JobStatus.INITIALIZING));
+
+               // submission has succeeded, now cancel the job
+               CompletableFuture<Acknowledge> cancellationFuture = 
dispatcherGateway.cancelJob(blockingJobGraph.getJobID(), TIMEOUT);
+               
Assert.assertThat(dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(),
 TIMEOUT).get(), is(JobStatus.CANCELLING));
+               Assert.assertThat(cancellationFuture.isDone(), is(false));
+               // unblock
+               blockingJobVertex.unblock();
+               // wait until cancelled
+               cancellationFuture.get();
+               
Assert.assertThat(dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(),
 TIMEOUT).get(), is(JobStatus.CANCELED));
+       }
+
+       @Test
+       public void testErrorDuringInitialization() throws Exception {
+               dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+               DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               // create a job graph that fails during initialization
+               final FailingInitializationJobVertex 
failingInitializationJobVertex = new FailingInitializationJobVertex(
+                       "testVertex");
+               
failingInitializationJobVertex.setInvokableClass(NoOpInvokable.class);
+               JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID, 
"failingTestJob", failingInitializationJobVertex);
+
+               dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+               // wait till job has failed
+               JobStatus status;
+               do {
+                       CompletableFuture<JobStatus> statusFuture = 
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+                       status = statusFuture.get();
+                       Thread.sleep(50);
+                       Assert.assertThat(status, 
either(is(JobStatus.INITIALIZING)).or(is(JobStatus.FAILED)));
+               } while (status != JobStatus.FAILED);

Review comment:
       Can't we replace this with `dispatcherGateway.requestJobResult` and then 
waiting on the returned future?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
##########
@@ -134,14 +135,13 @@ public void 
leaderChange_afterJobSubmission_recoversSubmittedJob() throws Except
                        final UUID firstLeaderSessionId = UUID.randomUUID();
 
                        final DispatcherGateway firstDispatcherGateway = 
electLeaderAndRetrieveGateway(firstLeaderSessionId);
-
                        firstDispatcherGateway.submitJob(jobGraph, 
TIMEOUT).get();
+                       CommonTestUtils.waitUntilJobManagerIsInitialized(() -> 
firstDispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get());

Review comment:
       Why is this needed?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
##########
@@ -103,6 +104,8 @@ public void testReelectionOfDispatcher() throws Exception {
 
                submissionFuture.get();
 
+               CommonTestUtils.waitUntilJobManagerIsInitialized(() -> 
miniCluster.getJobStatus(jobId).get());

Review comment:
       Why is this required?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static junit.framework.Assert.fail;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Test for the {@link DispatcherJob} class.
+ */
+public class DispatcherJobTest extends TestLogger {
+
+       protected final Logger log = LoggerFactory.getLogger(getClass());
+
+       private static final Time TIMEOUT = Time.seconds(10L);
+       private static final JobID TEST_JOB_ID = new JobID();
+
+       @Test
+       public void testStatusWhenInitializing() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+               Assert.assertThat(dispatcherJob.getResultFuture().isDone(), 
is(false));
+               assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+       }
+
+       @Test
+       public void testStatusWhenRunning() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               // finish initialization
+               testContext.setRunning();
+
+               assertJobStatus(dispatcherJob, JobStatus.RUNNING);
+
+               // result future not done
+               Assert.assertThat(dispatcherJob.getResultFuture().isDone(), 
is(false));
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+       }
+
+       @Test
+       public void testStatusWhenJobFinished() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               // finish job
+               testContext.setRunning();
+               testContext.finishJob();
+
+               assertJobStatus(dispatcherJob, JobStatus.FINISHED);
+
+               // assert result future done
+               ArchivedExecutionGraph aeg = 
dispatcherJob.getResultFuture().get();
+
+               Assert.assertThat(aeg.getState(), is(JobStatus.FINISHED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileInitializing() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+               assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+
+               CompletableFuture<Acknowledge> cancelFuture = 
dispatcherJob.cancel(
+                       TIMEOUT);
+
+               Assert.assertThat(cancelFuture.isDone(), is(false));
+               Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+
+               testContext.setRunning();
+               testContext.finishCancellation();
+
+               // assert that cancel future completes
+               cancelFuture.get();
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+               // assert that the result future completes
+               
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(), 
is(JobStatus.CANCELED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileRunning() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               testContext.setRunning();
+               CompletableFuture<Acknowledge> cancelFuture = 
dispatcherJob.cancel(TIMEOUT);
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+               testContext.finishCancellation();
+
+               cancelFuture.get();
+               assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+               
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(), 
is(JobStatus.CANCELED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileFailed() throws
+               Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               RuntimeException exception = new RuntimeException("Artificial 
failure in runner initialization");
+               testContext.failInitialization(exception);
+
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+               CommonTestUtils.assertThrows("Artificial failure", 
ExecutionException.class, () -> dispatcherJob.cancel(TIMEOUT).get());
+
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+       }
+
+       @Test
+       public void testErrorWhileInitializing() throws Exception {
+               TestContext testContext = 
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+               DispatcherJob dispatcherJob = testContext.dispatcherJob;
+
+               // now fail
+               RuntimeException exception = new RuntimeException("Artificial 
failure in runner initialization");
+               testContext.failInitialization(exception);
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+               ArchivedExecutionGraph aeg = 
dispatcherJob.getResultFuture().get();
+               
Assert.assertThat(aeg.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader()),
 is(exception));
+
+               Assert.assertTrue(dispatcherJob.closeAsync().isDone() && 
dispatcherJob.closeAsync().isCompletedExceptionally());

Review comment:
       This condition is actually covered by 
`testCloseWhileInitializingErroneously`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
##########
@@ -351,4 +352,53 @@ public static ArchivedExecutionGraph 
createFrom(ExecutionGraph executionGraph) {
                        executionGraph.getCheckpointStatsSnapshot(),
                        executionGraph.getStateBackendName().orElse(null));
        }
+
+       /**
+        * Create a sparse ArchivedExecutionGraph for a job while it is still 
initializing.
+        * Most fields will be empty, only job status and error-related fields 
are set.
+        */
+       public static ArchivedExecutionGraph createFromInitializingJob(
+               JobID jobId,
+               String jobName,
+               JobStatus jobStatus,
+               @Nullable Throwable throwable,
+               long initializationTimestamp) {
+
+               long failureTime = System.currentTimeMillis();

Review comment:
       It does not matter if this will cost us 5ms.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
##########
@@ -135,6 +138,8 @@ public void testReelectionOfJobMaster() throws Exception {
 
                submissionFuture.get();
 
+               CommonTestUtils.waitUntilJobManagerIsInitialized(() -> 
miniCluster.getJobStatus(jobId).get());

Review comment:
       Same here?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/resources/log4j2-test.properties
##########
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = DEBUG

Review comment:
       Please revert

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
##########
@@ -132,6 +132,7 @@ public void testJobRecoveryWithFailingTaskExecutor() throws 
Exception {
        private CompletableFuture<JobResult> 
submitJobAndWaitUntilRunning(JobGraph jobGraph) throws Exception {
                miniCluster.submitJob(jobGraph).get();
 
+               CommonTestUtils.waitUntilJobManagerIsInitialized(() -> 
miniCluster.getJobStatus(jobGraph.getJobID()).get());

Review comment:
       Why is this needed here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to