tillrohrmann commented on a change in pull request #13217:
URL: https://github.com/apache/flink/pull/13217#discussion_r481192301



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
##########
@@ -122,6 +123,14 @@ public EmbeddedExecutor(
                                timeout);
 
                return jobSubmissionFuture
+                               
.thenApplyAsync(FunctionUtils.uncheckedFunction(jobId -> {
+                                       
org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(
+                                               () -> 
dispatcherGateway.requestJobStatus(jobId, timeout).get(),
+                                               () -> 
dispatcherGateway.requestJobResult(jobId, timeout).get(),
+                                               userCodeClassloader
+                                       );

Review comment:
       nit: `);` could be on the previous line.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -516,59 +498,52 @@ private JobManagerRunner 
startJobManagerRunner(JobManagerRunner jobManagerRunner
 
        @Override
        public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time 
timeout) {
-
-               final CompletableFuture<JobMasterGateway> 
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
-               final CompletableFuture<JobStatus> jobStatusFuture = 
jobMasterGatewayFuture.thenCompose(
-                       (JobMasterGateway jobMasterGateway) -> 
jobMasterGateway.requestJobStatus(timeout));
-
-               return jobStatusFuture.exceptionally(
-                       (Throwable throwable) -> {
-                               final JobDetails jobDetails = 
archivedExecutionGraphStore.getAvailableJobDetails(jobId);
-
-                               // check whether it is a completed job
-                               if (jobDetails == null) {
-                                       throw new 
CompletionException(ExceptionUtils.stripCompletionException(throwable));
-                               } else {
-                                       return jobDetails.getStatus();
-                               }
-                       });
+               Optional<DispatcherJob> maybeJob = getDispatcherJob(jobId);
+               if (maybeJob.isPresent()) {
+                       return maybeJob.get().requestJobStatus(timeout);
+               } else {
+                       // is it a completed job?
+                       final JobDetails jobDetails = 
archivedExecutionGraphStore.getAvailableJobDetails(jobId);
+                       if (jobDetails == null) {
+                               return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+                       } else {
+                               return 
CompletableFuture.completedFuture(jobDetails.getStatus());
+                       }
+               }
        }
 
        @Override
        public CompletableFuture<OperatorBackPressureStatsResponse> 
requestOperatorBackPressureStats(
                        final JobID jobId,
                        final JobVertexID jobVertexId) {
-               final CompletableFuture<JobMasterGateway> 
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
-               return jobMasterGatewayFuture.thenCompose((JobMasterGateway 
jobMasterGateway) -> 
jobMasterGateway.requestOperatorBackPressureStats(jobVertexId));
+               return performOperationOnJobMasterGateway(jobId, gateway -> 
gateway.requestOperatorBackPressureStats(jobVertexId));
        }
 
        @Override
        public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID 
jobId, Time timeout) {
-               final CompletableFuture<JobMasterGateway> 
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
-               final CompletableFuture<ArchivedExecutionGraph> 
archivedExecutionGraphFuture = jobMasterGatewayFuture.thenCompose(
-                       (JobMasterGateway jobMasterGateway) -> 
jobMasterGateway.requestJob(timeout));
-
-               return archivedExecutionGraphFuture.exceptionally(
-                       (Throwable throwable) -> {
-                               final ArchivedExecutionGraph 
serializableExecutionGraph = archivedExecutionGraphStore.get(jobId);
-
-                               // check whether it is a completed job
-                               if (serializableExecutionGraph == null) {
-                                       throw new 
CompletionException(ExceptionUtils.stripCompletionException(throwable));
-                               } else {
-                                       return serializableExecutionGraph;
-                               }
-                       });
+               Function<Throwable, ArchivedExecutionGraph> 
checkExecutionGraphStoreeOnException = throwable ->  {

Review comment:
       ```suggestion
                Function<Throwable, ArchivedExecutionGraph> 
checkExecutionGraphStoreOnException = throwable ->  {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+
+import javax.annotation.Nullable;
+
+/**
+ * Container for returning the ArchivedExecutionGraph and a flag whether the 
initialization failed.
+ * For initialization failures, the throwable is also attached, to avoid 
deserializing it from the ArchivedExecutionGraph.
+ */
+public class DispatcherJobResult {
+       private final boolean initializationFailure;
+       private final ArchivedExecutionGraph archivedExecutionGraph;
+       @Nullable
+       private final Throwable throwable;
+
+       private DispatcherJobResult(ArchivedExecutionGraph 
archivedExecutionGraph, Throwable throwable, boolean initializationFailure) {
+               this.archivedExecutionGraph = archivedExecutionGraph;
+               this.initializationFailure = initializationFailure;
+               this.throwable = throwable;
+       }
+
+       public boolean isInitializationFailure() {
+               return initializationFailure;
+       }
+
+       public ArchivedExecutionGraph getArchivedExecutionGraph() {
+               return archivedExecutionGraph;
+       }
+
+       public Throwable getThrowable() {

Review comment:
       ```suggestion
        public Throwable getInitializationFailure() {
   ```

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
##########
@@ -80,19 +75,6 @@ private static JobGraph getWorkingJobGraph() {
                return new JobGraph("Working testing job", jobVertex);
        }
 
-       // 
--------------------------------------------------------------------------------------------
-
-       private final boolean detached;
-
-       public JobSubmissionFailsITCase(boolean detached) {
-               this.detached = detached;
-       }
-
-       @Parameterized.Parameters(name = "Detached mode = {0}")
-       public static Collection<Boolean[]> executionModes(){
-               return Arrays.asList(new Boolean[]{false},
-                               new Boolean[]{true});
-       }

Review comment:
       Why can we remove the detached test case here? Aren't we losing test 
coverage by removing this code?

##########
File path: flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
##########
@@ -111,4 +120,42 @@ public static void executeProgram(
                        
Thread.currentThread().setContextClassLoader(contextClassLoader);
                }
        }
+
+       /**
+        * This method blocks until the job status is not INITIALIZING anymore.
+        * @param jobStatusSupplier supplier returning the job status.
+        * @param jobResultSupplier supplier returning the job result. This 
will only be called if the job reaches the FAILED state.
+        * @throws JobInitializationException If the initialization failed or 
RuntimeException if this method has an internal error.

Review comment:
       ```suggestion
         * @throws JobInitializationException If the initialization failed
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -516,59 +498,52 @@ private JobManagerRunner 
startJobManagerRunner(JobManagerRunner jobManagerRunner
 
        @Override
        public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time 
timeout) {
-
-               final CompletableFuture<JobMasterGateway> 
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
-               final CompletableFuture<JobStatus> jobStatusFuture = 
jobMasterGatewayFuture.thenCompose(
-                       (JobMasterGateway jobMasterGateway) -> 
jobMasterGateway.requestJobStatus(timeout));
-
-               return jobStatusFuture.exceptionally(
-                       (Throwable throwable) -> {
-                               final JobDetails jobDetails = 
archivedExecutionGraphStore.getAvailableJobDetails(jobId);
-
-                               // check whether it is a completed job
-                               if (jobDetails == null) {
-                                       throw new 
CompletionException(ExceptionUtils.stripCompletionException(throwable));
-                               } else {
-                                       return jobDetails.getStatus();
-                               }
-                       });
+               Optional<DispatcherJob> maybeJob = getDispatcherJob(jobId);
+               if (maybeJob.isPresent()) {
+                       return maybeJob.get().requestJobStatus(timeout);
+               } else {
+                       // is it a completed job?
+                       final JobDetails jobDetails = 
archivedExecutionGraphStore.getAvailableJobDetails(jobId);
+                       if (jobDetails == null) {
+                               return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+                       } else {
+                               return 
CompletableFuture.completedFuture(jobDetails.getStatus());
+                       }
+               }
        }
 
        @Override
        public CompletableFuture<OperatorBackPressureStatsResponse> 
requestOperatorBackPressureStats(
                        final JobID jobId,
                        final JobVertexID jobVertexId) {
-               final CompletableFuture<JobMasterGateway> 
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
-               return jobMasterGatewayFuture.thenCompose((JobMasterGateway 
jobMasterGateway) -> 
jobMasterGateway.requestOperatorBackPressureStats(jobVertexId));
+               return performOperationOnJobMasterGateway(jobId, gateway -> 
gateway.requestOperatorBackPressureStats(jobVertexId));
        }
 
        @Override
        public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID 
jobId, Time timeout) {
-               final CompletableFuture<JobMasterGateway> 
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
-               final CompletableFuture<ArchivedExecutionGraph> 
archivedExecutionGraphFuture = jobMasterGatewayFuture.thenCompose(
-                       (JobMasterGateway jobMasterGateway) -> 
jobMasterGateway.requestJob(timeout));
-
-               return archivedExecutionGraphFuture.exceptionally(
-                       (Throwable throwable) -> {
-                               final ArchivedExecutionGraph 
serializableExecutionGraph = archivedExecutionGraphStore.get(jobId);
-
-                               // check whether it is a completed job
-                               if (serializableExecutionGraph == null) {
-                                       throw new 
CompletionException(ExceptionUtils.stripCompletionException(throwable));
-                               } else {
-                                       return serializableExecutionGraph;
-                               }
-                       });
+               Function<Throwable, ArchivedExecutionGraph> 
checkExecutionGraphStoreeOnException = throwable ->  {
+                       // check whether it is a completed job
+                       final ArchivedExecutionGraph archivedExecutionGraph = 
archivedExecutionGraphStore.get(jobId);
+                       if (archivedExecutionGraph == null) {
+                               throw new 
CompletionException(ExceptionUtils.stripCompletionException(throwable));
+                       } else {
+                               return archivedExecutionGraph;
+                       }
+               };
+               Optional<DispatcherJob> maybeJob = getDispatcherJob(jobId);
+               if (maybeJob.isPresent()) {
+                       DispatcherJob job = maybeJob.get();
+                       return 
job.requestJob(timeout).exceptionally(checkExecutionGraphStoreeOnException);
+               } else {
+                       return 
CompletableFuture.completedFuture(checkExecutionGraphStoreeOnException.apply(new
 FlinkJobNotFoundException(jobId)));
+               }

Review comment:
       I different way to express this logic could be:
   
   ```
   return maybeJob.map(job -> job.requestJob(timeout))
                        .orElse(FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId)))
                        .exceptionally(checkExecutionGraphStoreOnException);
   ```

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.client.JobInitializationException;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * Test for the ClientUtils.
+ */
+public class ClientUtilsTest extends TestLogger {
+
+       /**
+        * Ensure that the waitUntilJobInitializationFinished() method throws 
JobInitializationException.
+        */
+       @Test
+       public void 
testWaitUntilJobInitializationFinished_throwsInitializationException() {
+               JobID jobID = new JobID();
+
+               Iterator<JobStatus> statusSequenceIterator = Arrays.asList(
+                       JobStatus.INITIALIZING,
+                       JobStatus.INITIALIZING,
+                       JobStatus.FAILED).iterator();
+
+               CommonTestUtils.assertThrows("Something is wrong", 
JobInitializationException.class, () -> {
+                       ClientUtils.waitUntilJobInitializationFinished(
+                               statusSequenceIterator::next, () -> {

Review comment:
       line break after `statusSequenceIterator::next,` could make the code 
more readable.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.client.JobInitializationException;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * Test for the ClientUtils.
+ */
+public class ClientUtilsTest extends TestLogger {
+
+       /**
+        * Ensure that the waitUntilJobInitializationFinished() method throws 
JobInitializationException.
+        */
+       @Test
+       public void 
testWaitUntilJobInitializationFinished_throwsInitializationException() {
+               JobID jobID = new JobID();
+
+               Iterator<JobStatus> statusSequenceIterator = Arrays.asList(
+                       JobStatus.INITIALIZING,
+                       JobStatus.INITIALIZING,
+                       JobStatus.FAILED).iterator();
+
+               CommonTestUtils.assertThrows("Something is wrong", 
JobInitializationException.class, () -> {
+                       ClientUtils.waitUntilJobInitializationFinished(
+                               statusSequenceIterator::next, () -> {
+                                       SerializedThrowable throwable = new 
SerializedThrowable(new JobInitializationException(
+                                               jobID,
+                                               "Something is wrong",
+                                               new RuntimeException("Err")));
+                                       return new 
JobResult.Builder().jobId(jobID).serializedThrowable(throwable).netRuntime(1).build();

Review comment:
       I'd move this out of the lambda to make it a bit easier.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -687,7 +840,39 @@ public JobManagerRunner createJobManagerRunner(
                                jobManagerSharedServices,
                                jobManagerJobMetricGroupFactory,
                                fatalErrorHandler);
+
+                       return jobManagerRunner;

Review comment:
       nit: this could be reverted.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+       private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+       private final CompletableFuture<JobManagerRunner> 
jobManagerRunnerFuture;
+       private final CompletableFuture<DispatcherJobResult> jobResultFuture;
+       private final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+
+       private final long initializationTimestamp;
+       private final JobID jobId;
+       private final String jobName;
+
+       private final Object lock = new Object();
+
+       // internal field to track job status during initialization. Is not 
updated anymore after
+       // job is initialized, cancelled or failed.
+       @GuardedBy("lock")
+       private DispatcherJobStatus jobStatus = 
DispatcherJobStatus.INITIALIZING;
+
+       private enum DispatcherJobStatus {
+               // We are waiting for the JobManagerRunner to be initialized
+               INITIALIZING(JobStatus.INITIALIZING),
+               // JobManagerRunner is initialized
+               JOB_MANAGER_RUNNER_INITIALIZED(null),
+               // waiting for cancellation. We stay in this status until the 
job result future completed,
+               // then we consider the JobManager to be initialized.
+               CANCELLING(JobStatus.CANCELLING);
+
+               @Nullable
+               private final JobStatus jobStatus;
+
+               DispatcherJobStatus(JobStatus jobStatus) {
+                       this.jobStatus = jobStatus;
+               }
+
+               public JobStatus asJobStatus() {
+                       if (jobStatus == null) {
+                               throw new IllegalStateException("This state is 
not defined as a 'JobStatus'");
+                       }
+                       return jobStatus;
+               }
+       }
+
+       static DispatcherJob createFor(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName) {
+               return new DispatcherJob(jobManagerRunnerFuture, jobId, 
jobName);
+       }
+
+       private DispatcherJob(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName) {
+               this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+               this.jobId = jobId;
+               this.jobName = jobName;
+               this.initializationTimestamp = System.currentTimeMillis();
+               this.jobResultFuture = new CompletableFuture<>();
+
+               
FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner,
 throwable) -> {
+                       // JM has been initialized, or the initialization failed
+                       synchronized (lock) {
+                               if (jobStatus != 
DispatcherJobStatus.CANCELLING) {
+                                       jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+                               }
+
+                               if (throwable == null) { // initialization 
succeeded
+                                       // Forward result future
+                                       
jobManagerRunner.getResultFuture().whenComplete((archivedExecutionGraph, 
resultThrowable) -> {
+                                               if (archivedExecutionGraph != 
null) {
+                                                       
jobResultFuture.complete(DispatcherJobResult.forSuccess(archivedExecutionGraph));
+                                               } else {
+                                                       
jobResultFuture.completeExceptionally(ExceptionUtils.stripCompletionException(resultThrowable));
+                                               }
+                                       });

Review comment:
       One could express this block a bit more succinctly:
   
   ```
   FutureUtils.forward(
        
jobManagerRunner.getResultFuture().thenApply(DispatcherJobResult::forSuccess),
        jobResultFuture);
   ```

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
##########
@@ -81,13 +82,20 @@ private PerJobMiniClusterFactory(
        /**
         * Starts a {@link MiniCluster} and submits a job.
         */
-       public CompletableFuture<JobClient> submitJob(JobGraph jobGraph) throws 
Exception {
+       public CompletableFuture<JobClient> submitJob(JobGraph jobGraph, 
ClassLoader userCodeClassloader) throws Exception {
                MiniClusterConfiguration miniClusterConfig = 
getMiniClusterConfig(jobGraph.getMaximumParallelism());
                MiniCluster miniCluster = 
miniClusterFactory.apply(miniClusterConfig);
                miniCluster.start();
 
                return miniCluster
                        .submitJob(jobGraph)
+                       
.thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> {
+                               
org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(
+                                       () -> 
miniCluster.getJobStatus(submissionResult.getJobID()).get(),
+                                       () -> 
miniCluster.requestJobResult(submissionResult.getJobID()).get(),
+                                       userCodeClassloader);
+                               return submissionResult;

Review comment:
       Do we also have to do this for `MiniCluster.executeJobBlocking`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+
+import javax.annotation.Nullable;
+
+/**
+ * Container for returning the ArchivedExecutionGraph and a flag whether the 
initialization failed.
+ * For initialization failures, the throwable is also attached, to avoid 
deserializing it from the ArchivedExecutionGraph.
+ */
+public class DispatcherJobResult {
+       private final boolean initializationFailure;
+       private final ArchivedExecutionGraph archivedExecutionGraph;
+       @Nullable
+       private final Throwable throwable;
+
+       private DispatcherJobResult(ArchivedExecutionGraph 
archivedExecutionGraph, Throwable throwable, boolean initializationFailure) {

Review comment:
       ```suggestion
        private DispatcherJobResult(ArchivedExecutionGraph 
archivedExecutionGraph, @Nullable Throwable throwable, boolean 
initializationFailure) {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+       private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+       private final CompletableFuture<JobManagerRunner> 
jobManagerRunnerFuture;
+       private final CompletableFuture<DispatcherJobResult> jobResultFuture;
+       private final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+
+       private final long initializationTimestamp;
+       private final JobID jobId;
+       private final String jobName;
+
+       private final Object lock = new Object();
+
+       // internal field to track job status during initialization. Is not 
updated anymore after
+       // job is initialized, cancelled or failed.
+       @GuardedBy("lock")
+       private DispatcherJobStatus jobStatus = 
DispatcherJobStatus.INITIALIZING;
+
+       private enum DispatcherJobStatus {
+               // We are waiting for the JobManagerRunner to be initialized
+               INITIALIZING(JobStatus.INITIALIZING),
+               // JobManagerRunner is initialized
+               JOB_MANAGER_RUNNER_INITIALIZED(null),
+               // waiting for cancellation. We stay in this status until the 
job result future completed,
+               // then we consider the JobManager to be initialized.
+               CANCELLING(JobStatus.CANCELLING);
+
+               @Nullable
+               private final JobStatus jobStatus;
+
+               DispatcherJobStatus(JobStatus jobStatus) {
+                       this.jobStatus = jobStatus;
+               }
+
+               public JobStatus asJobStatus() {
+                       if (jobStatus == null) {
+                               throw new IllegalStateException("This state is 
not defined as a 'JobStatus'");
+                       }
+                       return jobStatus;
+               }
+       }
+
+       static DispatcherJob createFor(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName) {
+               return new DispatcherJob(jobManagerRunnerFuture, jobId, 
jobName);
+       }
+
+       private DispatcherJob(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName) {
+               this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+               this.jobId = jobId;
+               this.jobName = jobName;
+               this.initializationTimestamp = System.currentTimeMillis();
+               this.jobResultFuture = new CompletableFuture<>();
+
+               
FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner,
 throwable) -> {
+                       // JM has been initialized, or the initialization failed
+                       synchronized (lock) {
+                               if (jobStatus != 
DispatcherJobStatus.CANCELLING) {
+                                       jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+                               }
+
+                               if (throwable == null) { // initialization 
succeeded
+                                       // Forward result future
+                                       
jobManagerRunner.getResultFuture().whenComplete((archivedExecutionGraph, 
resultThrowable) -> {
+                                               if (archivedExecutionGraph != 
null) {
+                                                       
jobResultFuture.complete(DispatcherJobResult.forSuccess(archivedExecutionGraph));
+                                               } else {
+                                                       
jobResultFuture.completeExceptionally(ExceptionUtils.stripCompletionException(resultThrowable));
+                                               }
+                                       });
+                               } else { // failure during initialization
+                                       final Throwable strippedThrowable = 
ExceptionUtils.stripCompletionException(throwable);
+                                       ArchivedExecutionGraph 
archivedExecutionGraph = ArchivedExecutionGraph.createFromInitializingJob(
+                                               jobId,
+                                               jobName,
+                                               JobStatus.FAILED,
+                                               strippedThrowable,
+                                               initializationTimestamp);
+                                       
jobResultFuture.complete(DispatcherJobResult.forInitializationFailure(archivedExecutionGraph,
 strippedThrowable));
+                               }
+                       }
+                       return null;
+               }));
+       }
+
+       public CompletableFuture<DispatcherJobResult> getResultFuture() {
+               return jobResultFuture;
+       }
+
+       public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+               return requestJobStatus(timeout).thenApply(status -> {
+                       int[] tasksPerState = new 
int[ExecutionState.values().length];
+                       synchronized (lock) {
+                               return new JobDetails(
+                                       jobId,
+                                       jobName,
+                                       initializationTimestamp,
+                                       0,
+                                       0,
+                                       status,
+                                       0,
+                                       tasksPerState,
+                                       0);
+                       }
+               });
+       }
+
+       /**
+        * Cancel job.
+        * A cancellation will be scheduled if the initialization is not 
completed.
+        * The returned future will complete exceptionally if the 
JobManagerRunner initialization failed.
+        */
+       public CompletableFuture<Acknowledge> cancel(Time timeout) {
+               synchronized (lock) {
+                       if (isInitialized()) {
+                               return 
getJobMasterGateway().thenCompose(jobMasterGateway -> 
jobMasterGateway.cancel(timeout));
+                       } else {
+                               log.info("Cancellation during initialization 
requested for job {}. Job will be cancelled once JobManager has been 
initialized.", jobId);
+
+                               // cancel job
+                               CompletableFuture<Acknowledge> cancelFuture = 
jobManagerRunnerFuture
+                                       
.thenCompose(JobManagerRunner::getJobMasterGateway)
+                                       .thenCompose(jobMasterGateway -> 
jobMasterGateway.cancel(RpcUtils.INF_TIMEOUT));
+                               cancelFuture.whenComplete((ignored, 
cancelThrowable) -> {
+                                       if (cancelThrowable != null) {
+                                               log.warn("Cancellation of job 
{} failed", jobId, cancelThrowable);
+                                       }
+                               });
+                               jobStatus = DispatcherJobStatus.CANCELLING;
+                               
jobResultFuture.whenComplete(((archivedExecutionGraph, throwable) -> {
+                                       synchronized (lock) {
+                                               if (archivedExecutionGraph != 
null) {
+                                                       jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+                                               }
+                                       }
+                               }));

Review comment:
       Why is this special casing needed? Why wouldn't it work if we 
unconditionally set `jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED` in the handle callback when 
creating the `DispatcherJob`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Test for the {@link DispatcherJob} class.
+ */
+public class DispatcherJobTest extends TestLogger {
+
+       private static final Time TIMEOUT = Time.seconds(10L);
+       private static final JobID TEST_JOB_ID = new JobID();
+
+       @Test
+       public void testStatusWhenInitializing() throws Exception {
+               TestContext testContext = createTestContext();
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+               Assert.assertThat(dispatcherJob.getResultFuture().isDone(), 
is(false));
+               assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+       }
+
+       @Test
+       public void testStatusWhenRunning() throws Exception {
+               TestContext testContext = createTestContext();
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               // finish initialization
+               testContext.setRunning();
+
+               assertJobStatus(dispatcherJob, JobStatus.RUNNING);
+
+               // result future not done
+               Assert.assertThat(dispatcherJob.getResultFuture().isDone(), 
is(false));
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+       }
+
+       @Test
+       public void testStatusWhenJobFinished() throws Exception {
+               TestContext testContext = createTestContext();
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               // finish job
+               testContext.setRunning();
+               testContext.finishJob();
+
+               assertJobStatus(dispatcherJob, JobStatus.FINISHED);
+
+               // assert result future done
+               DispatcherJobResult result = 
dispatcherJob.getResultFuture().get();
+
+               
Assert.assertThat(result.getArchivedExecutionGraph().getState(), 
is(JobStatus.FINISHED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileInitializing() throws 
Exception {
+               TestContext testContext = createTestContext();
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+               assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+
+               CompletableFuture<Acknowledge> cancelFuture = 
dispatcherJob.cancel(
+                       TIMEOUT);
+
+               Assert.assertThat(cancelFuture.isDone(), is(false));
+               Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+
+               testContext.setRunning();
+               testContext.finishCancellation();
+
+               // assert that cancel future completes
+               cancelFuture.get();
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+               // assert that the result future completes
+               
Assert.assertThat(dispatcherJob.getResultFuture().get().getArchivedExecutionGraph().getState(),
 is(JobStatus.CANCELED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileRunning() throws Exception {
+               TestContext testContext = createTestContext();
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               testContext.setRunning();
+               CompletableFuture<Acknowledge> cancelFuture = 
dispatcherJob.cancel(TIMEOUT);
+
+               assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+               testContext.finishCancellation();
+
+               cancelFuture.get();
+               assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+               
Assert.assertThat(dispatcherJob.getResultFuture().get().getArchivedExecutionGraph().getState(),
 is(JobStatus.CANCELED));
+       }
+
+       @Test
+       public void testStatusWhenCancellingWhileFailed() throws Exception {
+               TestContext testContext = createTestContext();
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               RuntimeException exception = new RuntimeException("Artificial 
failure in runner initialization");
+               testContext.failInitialization(exception);
+
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+               CommonTestUtils.assertThrows("Artificial failure", 
ExecutionException.class, () -> dispatcherJob.cancel(TIMEOUT).get());
+
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+       }
+
+       @Test
+       public void testErrorWhileInitializing() throws Exception {
+               TestContext testContext = createTestContext();
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               // now fail
+               RuntimeException exception = new RuntimeException("Artificial 
failure in runner initialization");
+               testContext.failInitialization(exception);
+
+               Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+               assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+               ArchivedExecutionGraph aeg = 
dispatcherJob.getResultFuture().get().getArchivedExecutionGraph();
+               
Assert.assertThat(aeg.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader()),
 is(exception));
+       }
+
+       @Test
+       public void testCloseWhileInitializingSuccessfully() throws Exception {
+               TestContext testContext = createTestContext();
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               CompletableFuture<Void> closeFuture = 
dispatcherJob.closeAsync();
+               Assert.assertThat(closeFuture.isDone(), is(false));
+
+               // set job running, so that we can cancel it
+               testContext.setRunning();
+
+               // assert future completes now
+               closeFuture.get();
+
+               // ensure the result future is complete (how it completes is up 
to the JobManager)
+               CompletableFuture<DispatcherJobResult> resultFuture = 
dispatcherJob.getResultFuture();
+               CommonTestUtils.assertThrows("has not been finished", 
ExecutionException.class,
+                       resultFuture::get);
+       }
+
+       @Test
+       public void testCloseWhileInitializingErroneously() throws Exception {
+               TestContext testContext = createTestContext();
+               DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+               CompletableFuture<Void> closeFuture = 
dispatcherJob.closeAsync();
+               Assert.assertThat(closeFuture.isDone(), is(false));
+
+               testContext.failInitialization(new RuntimeException("fail"));
+
+               // assert future completes now
+               closeFuture.get();
+
+               // ensure the result future is complete
+               DispatcherJobResult result = 
dispatcherJob.getResultFuture().get();
+               Assert.assertThat(result.isInitializationFailure(), is(true));
+               
Assert.assertThat(result.getArchivedExecutionGraph().getState(), 
is(JobStatus.FAILED));

Review comment:
       I would make this a separate test case which check what the resultFuture 
value is if there is an initialization error.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+       private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+       private final CompletableFuture<JobManagerRunner> 
jobManagerRunnerFuture;
+       private final CompletableFuture<DispatcherJobResult> jobResultFuture;
+       private final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+
+       private final long initializationTimestamp;
+       private final JobID jobId;
+       private final String jobName;
+
+       private final Object lock = new Object();
+
+       // internal field to track job status during initialization. Is not 
updated anymore after
+       // job is initialized, cancelled or failed.
+       @GuardedBy("lock")
+       private DispatcherJobStatus jobStatus = 
DispatcherJobStatus.INITIALIZING;
+
+       private enum DispatcherJobStatus {
+               // We are waiting for the JobManagerRunner to be initialized
+               INITIALIZING(JobStatus.INITIALIZING),
+               // JobManagerRunner is initialized
+               JOB_MANAGER_RUNNER_INITIALIZED(null),
+               // waiting for cancellation. We stay in this status until the 
job result future completed,
+               // then we consider the JobManager to be initialized.
+               CANCELLING(JobStatus.CANCELLING);
+
+               @Nullable
+               private final JobStatus jobStatus;
+
+               DispatcherJobStatus(JobStatus jobStatus) {
+                       this.jobStatus = jobStatus;
+               }
+
+               public JobStatus asJobStatus() {
+                       if (jobStatus == null) {
+                               throw new IllegalStateException("This state is 
not defined as a 'JobStatus'");
+                       }
+                       return jobStatus;
+               }
+       }
+
+       static DispatcherJob createFor(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName) {
+               return new DispatcherJob(jobManagerRunnerFuture, jobId, 
jobName);
+       }
+
+       private DispatcherJob(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName) {
+               this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+               this.jobId = jobId;
+               this.jobName = jobName;
+               this.initializationTimestamp = System.currentTimeMillis();
+               this.jobResultFuture = new CompletableFuture<>();
+
+               
FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner,
 throwable) -> {
+                       // JM has been initialized, or the initialization failed
+                       synchronized (lock) {
+                               if (jobStatus != 
DispatcherJobStatus.CANCELLING) {
+                                       jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+                               }
+
+                               if (throwable == null) { // initialization 
succeeded
+                                       // Forward result future
+                                       
jobManagerRunner.getResultFuture().whenComplete((archivedExecutionGraph, 
resultThrowable) -> {
+                                               if (archivedExecutionGraph != 
null) {
+                                                       
jobResultFuture.complete(DispatcherJobResult.forSuccess(archivedExecutionGraph));
+                                               } else {
+                                                       
jobResultFuture.completeExceptionally(ExceptionUtils.stripCompletionException(resultThrowable));
+                                               }
+                                       });
+                               } else { // failure during initialization
+                                       final Throwable strippedThrowable = 
ExceptionUtils.stripCompletionException(throwable);
+                                       ArchivedExecutionGraph 
archivedExecutionGraph = ArchivedExecutionGraph.createFromInitializingJob(
+                                               jobId,
+                                               jobName,
+                                               JobStatus.FAILED,
+                                               strippedThrowable,
+                                               initializationTimestamp);
+                                       
jobResultFuture.complete(DispatcherJobResult.forInitializationFailure(archivedExecutionGraph,
 strippedThrowable));
+                               }
+                       }
+                       return null;
+               }));
+       }
+
+       public CompletableFuture<DispatcherJobResult> getResultFuture() {
+               return jobResultFuture;
+       }
+
+       public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+               return requestJobStatus(timeout).thenApply(status -> {
+                       int[] tasksPerState = new 
int[ExecutionState.values().length];
+                       synchronized (lock) {
+                               return new JobDetails(
+                                       jobId,
+                                       jobName,
+                                       initializationTimestamp,
+                                       0,
+                                       0,
+                                       status,
+                                       0,
+                                       tasksPerState,
+                                       0);
+                       }
+               });
+       }
+
+       /**
+        * Cancel job.
+        * A cancellation will be scheduled if the initialization is not 
completed.
+        * The returned future will complete exceptionally if the 
JobManagerRunner initialization failed.
+        */
+       public CompletableFuture<Acknowledge> cancel(Time timeout) {
+               synchronized (lock) {
+                       if (isInitialized()) {
+                               return 
getJobMasterGateway().thenCompose(jobMasterGateway -> 
jobMasterGateway.cancel(timeout));
+                       } else {
+                               log.info("Cancellation during initialization 
requested for job {}. Job will be cancelled once JobManager has been 
initialized.", jobId);
+
+                               // cancel job
+                               CompletableFuture<Acknowledge> cancelFuture = 
jobManagerRunnerFuture
+                                       
.thenCompose(JobManagerRunner::getJobMasterGateway)
+                                       .thenCompose(jobMasterGateway -> 
jobMasterGateway.cancel(RpcUtils.INF_TIMEOUT));

Review comment:
       Why are we using `RpcUtils.INF_TIMEOUT` here instead of `timeout`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+
+import javax.annotation.Nullable;
+
+/**
+ * Container for returning the ArchivedExecutionGraph and a flag whether the 
initialization failed.
+ * For initialization failures, the throwable is also attached, to avoid 
deserializing it from the ArchivedExecutionGraph.
+ */
+public class DispatcherJobResult {
+       private final boolean initializationFailure;
+       private final ArchivedExecutionGraph archivedExecutionGraph;
+       @Nullable
+       private final Throwable throwable;
+
+       private DispatcherJobResult(ArchivedExecutionGraph 
archivedExecutionGraph, Throwable throwable, boolean initializationFailure) {
+               this.archivedExecutionGraph = archivedExecutionGraph;
+               this.initializationFailure = initializationFailure;
+               this.throwable = throwable;
+       }
+
+       public boolean isInitializationFailure() {
+               return initializationFailure;
+       }
+
+       public ArchivedExecutionGraph getArchivedExecutionGraph() {
+               return archivedExecutionGraph;
+       }
+
+       public Throwable getThrowable() {
+               return throwable;
+       }
+
+       public static DispatcherJobResult 
forInitializationFailure(ArchivedExecutionGraph archivedExecutionGraph, 
Throwable throwable) {
+               return new DispatcherJobResult(archivedExecutionGraph, 
throwable, true);
+       }
+
+       public static DispatcherJobResult forSuccess(ArchivedExecutionGraph 
archivedExecutionGraph) {
+               return new DispatcherJobResult(archivedExecutionGraph, null, 
false);
+       }

Review comment:
       Nice.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+
+import javax.annotation.Nullable;
+
+/**
+ * Container for returning the ArchivedExecutionGraph and a flag whether the 
initialization failed.

Review comment:
       ```suggestion
    * Container for returning the {@link ArchivedExecutionGraph} and a flag 
whether the initialization has failed.
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+
+import javax.annotation.Nullable;
+
+/**
+ * Container for returning the ArchivedExecutionGraph and a flag whether the 
initialization failed.
+ * For initialization failures, the throwable is also attached, to avoid 
deserializing it from the ArchivedExecutionGraph.
+ */
+public class DispatcherJobResult {
+       private final boolean initializationFailure;

Review comment:
       Couldn't we get rid of this field by saying `throwable` encodes (being 
set or not) whether it was an initialization failure or not?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+
+import javax.annotation.Nullable;
+
+/**
+ * Container for returning the ArchivedExecutionGraph and a flag whether the 
initialization failed.
+ * For initialization failures, the throwable is also attached, to avoid 
deserializing it from the ArchivedExecutionGraph.
+ */
+public class DispatcherJobResult {
+       private final boolean initializationFailure;
+       private final ArchivedExecutionGraph archivedExecutionGraph;
+       @Nullable
+       private final Throwable throwable;
+
+       private DispatcherJobResult(ArchivedExecutionGraph 
archivedExecutionGraph, Throwable throwable, boolean initializationFailure) {
+               this.archivedExecutionGraph = archivedExecutionGraph;
+               this.initializationFailure = initializationFailure;
+               this.throwable = throwable;
+       }
+
+       public boolean isInitializationFailure() {
+               return initializationFailure;
+       }
+
+       public ArchivedExecutionGraph getArchivedExecutionGraph() {
+               return archivedExecutionGraph;
+       }
+
+       public Throwable getThrowable() {
+               return throwable;

Review comment:
       before returning `throwable` we should add a check statement which 
ensures that `throwable` is non null.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
##########
@@ -81,13 +82,20 @@ private PerJobMiniClusterFactory(
        /**
         * Starts a {@link MiniCluster} and submits a job.
         */
-       public CompletableFuture<JobClient> submitJob(JobGraph jobGraph) throws 
Exception {
+       public CompletableFuture<JobClient> submitJob(JobGraph jobGraph, 
ClassLoader userCodeClassloader) throws Exception {
                MiniClusterConfiguration miniClusterConfig = 
getMiniClusterConfig(jobGraph.getMaximumParallelism());
                MiniCluster miniCluster = 
miniClusterFactory.apply(miniClusterConfig);
                miniCluster.start();
 
                return miniCluster
                        .submitJob(jobGraph)
+                       
.thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> {
+                               
org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(
+                                       () -> 
miniCluster.getJobStatus(submissionResult.getJobID()).get(),
+                                       () -> 
miniCluster.requestJobResult(submissionResult.getJobID()).get(),
+                                       userCodeClassloader);
+                               return submissionResult;

Review comment:
       Given that no test fails, I guess it should be fine.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+
+import javax.annotation.Nullable;
+
+/**
+ * Container for returning the ArchivedExecutionGraph and a flag whether the 
initialization failed.
+ * For initialization failures, the throwable is also attached, to avoid 
deserializing it from the ArchivedExecutionGraph.
+ */
+public class DispatcherJobResult {

Review comment:
       Could probably be package private.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+       private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+       private final CompletableFuture<JobManagerRunner> 
jobManagerRunnerFuture;
+       private final CompletableFuture<DispatcherJobResult> jobResultFuture;
+       private final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+
+       private final long initializationTimestamp;
+       private final JobID jobId;
+       private final String jobName;
+
+       private final Object lock = new Object();
+
+       // internal field to track job status during initialization. Is not 
updated anymore after
+       // job is initialized, cancelled or failed.
+       @GuardedBy("lock")
+       private DispatcherJobStatus jobStatus = 
DispatcherJobStatus.INITIALIZING;
+
+       private enum DispatcherJobStatus {
+               // We are waiting for the JobManagerRunner to be initialized
+               INITIALIZING(JobStatus.INITIALIZING),
+               // JobManagerRunner is initialized
+               JOB_MANAGER_RUNNER_INITIALIZED(null),
+               // waiting for cancellation. We stay in this status until the 
job result future completed,
+               // then we consider the JobManager to be initialized.
+               CANCELLING(JobStatus.CANCELLING);
+
+               @Nullable
+               private final JobStatus jobStatus;
+
+               DispatcherJobStatus(JobStatus jobStatus) {
+                       this.jobStatus = jobStatus;
+               }
+
+               public JobStatus asJobStatus() {
+                       if (jobStatus == null) {
+                               throw new IllegalStateException("This state is 
not defined as a 'JobStatus'");
+                       }
+                       return jobStatus;
+               }
+       }
+
+       static DispatcherJob createFor(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName) {

Review comment:
       Just a very minor comment: By indenting the argument list with two tabs, 
one can make it easier to distinguish from the method body.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to