[FLINK-8673] [flip6] Use JobManagerRunner#resultFuture for success and failure 
communication

This commit removes the OnCompletionActions and FatalErrorHandler from the
JobManagerRunner. Instead it communicates a successful job execution of the
failure case through the JobManagerRunner#resultFuture.

Furthermore, this commit no longer allows the JobManagerRunner to shut down 
itself.
All shut down logic must be triggered by the owner of the JobManagerRunner.

This closes #5510.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/075f5b69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/075f5b69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/075f5b69

Branch: refs/heads/master
Commit: 075f5b6930b86e110a32e290229b9800be72a3a7
Parents: 9b7e429
Author: Till Rohrmann <[email protected]>
Authored: Fri Feb 16 15:04:32 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sun Feb 18 10:12:52 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  60 ++----
 .../runtime/dispatcher/MiniDispatcher.java      |   4 +-
 .../runtime/jobmaster/JobManagerRunner.java     |  86 +++-----
 .../jobmaster/JobNotFinishedException.java      |  33 +++
 .../minicluster/MiniClusterJobDispatcher.java   |  44 +++-
 .../apache/flink/runtime/rpc/RpcService.java    |   4 +-
 .../runtime/dispatcher/DispatcherTest.java      |  21 +-
 .../runtime/dispatcher/MiniDispatcherTest.java  |  55 +++--
 .../runtime/jobmaster/JobManagerRunnerTest.java | 201 ++++++++++++++++++-
 9 files changed, 370 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/075f5b69/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 89b82ef..e751bc4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -34,11 +34,11 @@ import 
org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
+import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -243,10 +243,23 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                                        blobServer,
                                        jobManagerSharedServices,
                                        metricRegistry,
-                                       new 
DispatcherOnCompleteActions(jobGraph.getJobID()),
-                                       fatalErrorHandler,
                                        restAddress);
 
+                               
jobManagerRunner.getResultFuture().whenCompleteAsync(
+                                       (ArchivedExecutionGraph 
archivedExecutionGraph, Throwable throwable) -> {
+                                               if (archivedExecutionGraph != 
null) {
+                                                       
jobReachedGloballyTerminalState(archivedExecutionGraph);
+                                               } else {
+                                                       final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+
+                                                       if (strippedThrowable 
instanceof JobNotFinishedException) {
+                                                               
jobNotFinished(jobId);
+                                                       } else {
+                                                               
onFatalError(new FlinkException("JobManagerRunner for job " + jobId + " 
failed.", strippedThrowable));
+                                                       }
+                                               }
+                                       }, getMainThreadExecutor());
+
                                jobManagerRunner.start();
                        } catch (Exception e) {
                                try {
@@ -544,6 +557,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                        archivedExecutionGraph.getJobID(),
                        archivedExecutionGraph.getState());
 
+               log.info("Job {} reached globally terminal state {}.", 
archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState());
+
                try {
                        archivedExecutionGraphStore.put(archivedExecutionGraph);
                } catch (IOException e) {
@@ -563,7 +578,9 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                }
        }
 
-       protected void jobFinishedByOther(JobID jobId) {
+       protected void jobNotFinished(JobID jobId) {
+               log.info("Job {} was not finished by JobManager.", jobId);
+
                try {
                        removeJob(jobId, false);
                } catch (Exception e) {
@@ -668,35 +685,6 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        }
 
        //------------------------------------------------------
-       // Utility classes
-       //------------------------------------------------------
-
-       @VisibleForTesting
-       class DispatcherOnCompleteActions implements OnCompletionActions {
-
-               private final JobID jobId;
-
-               DispatcherOnCompleteActions(JobID jobId) {
-                       this.jobId = Preconditions.checkNotNull(jobId);
-               }
-
-               @Override
-               public void 
jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
-                       log.info("Job {} reached globally terminal state {}.", 
jobId, executionGraph.getState());
-
-                       runAsync(() -> 
Dispatcher.this.jobReachedGloballyTerminalState(executionGraph));
-               }
-
-               @Override
-               public void jobFinishedByOther() {
-                       log.info("Job {} was finished by other JobManager.", 
jobId);
-
-                       runAsync(
-                               () -> 
Dispatcher.this.jobFinishedByOther(jobId));
-               }
-       }
-
-       //------------------------------------------------------
        // Factories
        //------------------------------------------------------
 
@@ -715,8 +703,6 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                        BlobServer blobServer,
                        JobManagerSharedServices jobManagerServices,
                        MetricRegistry metricRegistry,
-                       OnCompletionActions onCompleteActions,
-                       FatalErrorHandler fatalErrorHandler,
                        @Nullable String restAddress) throws Exception;
        }
 
@@ -737,8 +723,6 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                                BlobServer blobServer,
                                JobManagerSharedServices jobManagerServices,
                                MetricRegistry metricRegistry,
-                               OnCompletionActions onCompleteActions,
-                               FatalErrorHandler fatalErrorHandler,
                                @Nullable String restAddress) throws Exception {
                        return new JobManagerRunner(
                                resourceId,
@@ -750,8 +734,6 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                                blobServer,
                                jobManagerServices,
                                metricRegistry,
-                               onCompleteActions,
-                               fatalErrorHandler,
                                restAddress);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/075f5b69/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 36732ce..c648131 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -124,8 +124,8 @@ public class MiniDispatcher extends Dispatcher {
        }
 
        @Override
-       protected void jobFinishedByOther(JobID jobId) {
-               super.jobFinishedByOther(jobId);
+       protected void jobNotFinished(JobID jobId) {
+               super.jobNotFinished(jobId);
 
                // shut down since we have done our job
                shutDown();

http://git-wip-us.apache.org/repos/asf/flink/blob/075f5b69/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 5e476bf..285cb4a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -51,7 +51,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -72,12 +71,6 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
        /** The job graph needs to run. */
        private final JobGraph jobGraph;
 
-       /** The listener to notify once the job completes - either successfully 
or unsuccessfully. */
-       private final OnCompletionActions toNotifyOnComplete;
-
-       /** The handler to call in case of fatal (unrecoverable) errors. */
-       private final FatalErrorHandler errorHandler;
-
        /** Used to check whether a job needs to be run. */
        private final RunningJobsRegistry runningJobsRegistry;
 
@@ -94,6 +87,8 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
 
        private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
 
+       private final CompletableFuture<Void> terminationFuture;
+
        /** flag marking the runner as shut down. */
        private volatile boolean shutdown;
 
@@ -119,17 +114,16 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        final BlobServer blobServer,
                        final JobManagerSharedServices jobManagerSharedServices,
                        final MetricRegistry metricRegistry,
-                       final OnCompletionActions toNotifyOnComplete,
-                       final FatalErrorHandler errorHandler,
                        @Nullable final String restAddress) throws Exception {
 
                JobManagerMetricGroup jobManagerMetrics = null;
 
+               this.resultFuture = new CompletableFuture<>();
+               this.terminationFuture = new CompletableFuture<>();
+
                // make sure we cleanly shut down out JobManager services if 
initialization fails
                try {
                        this.jobGraph = checkNotNull(jobGraph);
-                       this.toNotifyOnComplete = 
checkNotNull(toNotifyOnComplete);
-                       this.errorHandler = checkNotNull(errorHandler);
                        this.jobManagerSharedServices = 
checkNotNull(jobManagerSharedServices);
 
                        checkArgument(jobGraph.getNumberOfVertices() > 0, "The 
given job is empty");
@@ -176,8 +170,6 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                                userCodeLoader,
                                restAddress,
                                metricRegistry.getMetricQueryServicePath());
-
-                       this.resultFuture = new CompletableFuture<>();
                }
                catch (Throwable t) {
                        // clean up everything
@@ -185,6 +177,9 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                                jobManagerMetrics.close();
                        }
 
+                       terminationFuture.completeExceptionally(t);
+                       resultFuture.completeExceptionally(t);
+
                        throw new JobExecutionException(jobGraph.getJobID(), 
"Could not set up JobManager", t);
                }
        }
@@ -225,34 +220,43 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
 
        private CompletableFuture<Void> shutdownInternally() {
                synchronized (lock) {
-                       shutdown = true;
+                       if (!shutdown) {
+                               shutdown = true;
 
-                       jobManager.shutDown();
+                               jobManager.shutDown();
 
-                       return jobManager.getTerminationFuture()
-                               .thenAccept(
-                                       ignored -> {
-                                               Throwable exception = null;
+                               final CompletableFuture<Boolean> 
jobManagerTerminationFuture = jobManager.getTerminationFuture();
+
+                               jobManagerTerminationFuture.whenComplete(
+                                       (Boolean ignored, Throwable throwable) 
-> {
                                                try {
                                                        
leaderElectionService.stop();
                                                } catch (Throwable t) {
-                                                       exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
+                                                       throwable = 
ExceptionUtils.firstOrSuppressed(t, 
ExceptionUtils.stripCompletionException(throwable));
                                                }
 
                                                // make all registered metrics 
go away
                                                try {
                                                        
jobManagerMetricGroup.close();
                                                } catch (Throwable t) {
-                                                       exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
+                                                       throwable = 
ExceptionUtils.firstOrSuppressed(t, throwable);
                                                }
 
-                                               if (exception != null) {
-                                                       throw new 
CompletionException(new FlinkException("Could not properly shut down the 
JobManagerRunner.", exception));
+                                               if (throwable != null) {
+                                                       
terminationFuture.completeExceptionally(
+                                                               new 
FlinkException("Could not properly shut down the JobManagerRunner", throwable));
+                                               } else {
+                                                       
terminationFuture.complete(null);
                                                }
+                                       });
 
-                                               // cancel the result future if 
not already completed
-                                               resultFuture.cancel(false);
+                               terminationFuture.whenComplete(
+                                       (Void ignored, Throwable throwable) -> {
+                                               
resultFuture.completeExceptionally(new 
JobNotFinishedException(jobGraph.getJobID()));
                                        });
+                       }
+
+                       return terminationFuture;
                }
        }
 
@@ -267,16 +271,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
        public void jobReachedGloballyTerminalState(ArchivedExecutionGraph 
executionGraph) {
                // complete the result future with the terminal execution graph
                resultFuture.complete(executionGraph);
-
-               try {
-                       unregisterJobFromHighAvailability();
-                       shutdownInternally();
-               }
-               finally {
-                       if (toNotifyOnComplete != null) {
-                               
toNotifyOnComplete.jobReachedGloballyTerminalState(executionGraph);
-                       }
-               }
+               unregisterJobFromHighAvailability();
        }
 
        /**
@@ -284,14 +279,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
         */
        @Override
        public void jobFinishedByOther() {
-               try {
-                       shutdownInternally();
-               }
-               finally {
-                       if (toNotifyOnComplete != null) {
-                               toNotifyOnComplete.jobFinishedByOther();
-                       }
-               }
+               resultFuture.completeExceptionally(new 
JobNotFinishedException(jobGraph.getJobID()));
        }
 
        /**
@@ -306,17 +294,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        log.error("JobManager runner encountered a fatal 
error.", exception);
                } catch (Throwable ignored) {}
 
-               // in any case, notify our handler, so it can react fast
-               try {
-                       if (errorHandler != null) {
-                               errorHandler.onFatalError(exception);
-                       }
-               }
-               finally {
-                       // the shutdown may not even needed any more, if the 
fatal error
-                       // handler kills the process. that is fine, a process 
kill cleans up better than anything.
-                       shutdownInternally();
-               }
+               resultFuture.completeExceptionally(exception);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/075f5b69/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobNotFinishedException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobNotFinishedException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobNotFinishedException.java
new file mode 100644
index 0000000..3515f2f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobNotFinishedException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.JobException;
+
+/**
+ * Exception indicating that a Flink job has not been finished.
+ */
+public class JobNotFinishedException extends JobException {
+       private static final long serialVersionUID = 611413276562570622L;
+
+       public JobNotFinishedException(JobID jobId) {
+               super("The job (" + jobId + ") has been not been finished.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/075f5b69/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 6fba534..ef7a6e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
+import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -142,10 +143,10 @@ public class MiniClusterJobDispatcher {
                        MetricRegistry metricRegistry,
                        int numJobManagers,
                        RpcService[] rpcServices) throws Exception {
-               
+
                checkArgument(numJobManagers >= 1);
                checkArgument(rpcServices.length == numJobManagers);
-               
+
                this.configuration = checkNotNull(config);
                this.rpcServices = rpcServices;
                this.haServices = checkNotNull(haServices);
@@ -237,7 +238,7 @@ public class MiniClusterJobDispatcher {
         * This method runs a job in blocking mode. The method returns only 
after the job
         * completed successfully, or after it failed terminally.
         *
-        * @param job  The Flink job to execute 
+        * @param job  The Flink job to execute
         * @return The result of the job execution
         *
         * @throws JobExecutionException Thrown if anything went amiss during 
initial job launch,
@@ -245,7 +246,7 @@ public class MiniClusterJobDispatcher {
         */
        public JobExecutionResult runJobBlocking(JobGraph job) throws 
JobExecutionException, InterruptedException {
                checkNotNull(job);
-               
+
                LOG.info("Received job for blocking execution: {} ({})", 
job.getName(), job.getJobID());
                final BlockingJobSync sync = new 
BlockingJobSync(job.getJobID(), numJobManagers);
 
@@ -287,9 +288,32 @@ public class MiniClusterJobDispatcher {
                                        blobServer,
                                        jobManagerSharedServices,
                                        metricRegistry,
-                                       onCompletion,
-                                       errorHandler,
                                        null);
+
+                               final int index = i;
+
+                               runners[i].getResultFuture()
+                                       .whenComplete(
+                                               (ArchivedExecutionGraph 
archivedExecutionGraph, Throwable throwable) -> {
+                                                       try {
+                                                               
runners[index].shutdown();
+                                                       } catch (Exception e) {
+                                                               
errorHandler.onFatalError(e);
+                                                       }
+
+                                                       if 
(archivedExecutionGraph != null) {
+                                                               
onCompletion.jobReachedGloballyTerminalState(archivedExecutionGraph);
+                                                       } else {
+                                                               final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+
+                                                               if 
(strippedThrowable instanceof JobNotFinishedException) {
+                                                                       
onCompletion.jobFinishedByOther();
+                                                               } else {
+                                                                       
errorHandler.onFatalError(strippedThrowable);
+                                                               }
+                                                       }
+                                               });
+
                                runners[i].start();
                        }
                        catch (Throwable t) {
@@ -395,8 +419,8 @@ public class MiniClusterJobDispatcher {
         * This class is used to sync on blocking jobs across multiple runners.
         * Only after all runners reported back that they are finished, the
         * result will be released.
-        * 
-        * That way it is guaranteed that after the blocking job submit call 
returns,
+        *
+        * <p>That way it is guaranteed that after the blocking job submit call 
returns,
         * the dispatcher is immediately free to accept another job.
         */
        private static class BlockingJobSync implements OnCompletionActions, 
FatalErrorHandler {
@@ -408,7 +432,7 @@ public class MiniClusterJobDispatcher {
                private volatile Throwable runnerException;
 
                private volatile JobResult result;
-               
+
                BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
                        this.jobId = jobId;
                        this.jobMastersToWaitFor = new 
CountDownLatch(numJobMastersToWaitFor);
@@ -430,6 +454,8 @@ public class MiniClusterJobDispatcher {
                        if (runnerException == null) {
                                runnerException = exception;
                        }
+
+                       jobMastersToWaitFor.countDown();
                }
 
                public JobExecutionResult getResult() throws 
JobExecutionException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/flink/blob/075f5b69/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 9b2e318..089e4b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -130,13 +130,13 @@ public interface RpcService {
        /**
         * Gets the executor, provided by this RPC service. This executor can 
be used for example for
         * the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods 
of futures.
-        * 
+        *
         * <p><b>IMPORTANT:</b> This executor does not isolate the method 
invocations against
         * any concurrent invocations and is therefore not suitable to run 
completion methods of futures
         * that modify state of an {@link RpcEndpoint}. For such operations, 
one needs to use the
         * {@link RpcEndpoint#getMainThreadExecutor() 
MainThreadExecutionContext} of that
         * {@code RpcEndpoint}.
-        * 
+        *
         * @return The execution context provided by the RPC service
         */
        Executor getExecutor();

http://git-wip-us.apache.org/repos/asf/flink/blob/075f5b69/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 6ea6383..46cd9e2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.dispatcher;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.BlobServerOptions;
@@ -35,7 +36,6 @@ import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
@@ -129,12 +129,12 @@ public class DispatcherTest extends TestLogger {
        private TestingDispatcher dispatcher;
 
        @BeforeClass
-       public static void setup() {
+       public static void setupClass() {
                rpcService = new TestingRpcService();
        }
 
        @AfterClass
-       public static void teardown() {
+       public static void teardownClass() {
                if (rpcService != null) {
                        rpcService.stopService();
 
@@ -274,10 +274,7 @@ public class DispatcherTest extends TestLogger {
 
                final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
-               OnCompletionActions onCompletionActions;
-
                final JobID failedJobId = new JobID();
-               onCompletionActions = dispatcher.new 
DispatcherOnCompleteActions(failedJobId);
 
                final JobStatus expectedState = JobStatus.FAILED;
                final ArchivedExecutionGraph failedExecutionGraph = new 
ArchivedExecutionGraphBuilder()
@@ -286,7 +283,7 @@ public class DispatcherTest extends TestLogger {
                        .setFailureCause(new ErrorInfo(new 
RuntimeException("expected"), 1L))
                        .build();
 
-               
onCompletionActions.jobReachedGloballyTerminalState(failedExecutionGraph);
+               dispatcher.completeJobExecution(failedExecutionGraph);
 
                assertThat(
                        dispatcherGateway.requestJobStatus(failedJobId, 
TIMEOUT).get(),
@@ -398,6 +395,12 @@ public class DispatcherTest extends TestLogger {
                                super.recoverJobs();
                        }
                }
+
+               @VisibleForTesting
+               void completeJobExecution(ArchivedExecutionGraph 
archivedExecutionGraph) {
+                       runAsync(
+                               () -> 
jobReachedGloballyTerminalState(archivedExecutionGraph));
+               }
        }
 
        private static final class ExpectedJobIdJobManagerRunnerFactory 
implements Dispatcher.JobManagerRunnerFactory {
@@ -419,8 +422,6 @@ public class DispatcherTest extends TestLogger {
                                BlobServer blobServer,
                                JobManagerSharedServices 
jobManagerSharedServices,
                                MetricRegistry metricRegistry,
-                               OnCompletionActions onCompleteActions,
-                               FatalErrorHandler fatalErrorHandler,
                                @Nullable String restAddress) throws Exception {
                        assertEquals(expectedJobId, jobGraph.getJobID());
 
@@ -434,8 +435,6 @@ public class DispatcherTest extends TestLogger {
                                blobServer,
                                jobManagerSharedServices,
                                metricRegistry,
-                               onCompleteActions,
-                               fatalErrorHandler,
                                restAddress);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/075f5b69/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 1040eee..dfabc61 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -31,7 +31,6 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobResult;
@@ -40,7 +39,6 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
@@ -67,6 +65,7 @@ import java.util.concurrent.CompletableFuture;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link MiniDispatcher}.
@@ -74,18 +73,15 @@ import static org.mockito.Mockito.mock;
 @Category(Flip6.class)
 public class MiniDispatcherTest extends TestLogger {
 
-       private static final JobGraph jobGraph = new JobGraph();
-
-       private static final ArchivedExecutionGraph archivedExecutionGraph = 
new ArchivedExecutionGraphBuilder()
-               .setJobID(jobGraph.getJobID())
-               .setState(JobStatus.FINISHED)
-               .build();
-
        private static final Time timeout = Time.seconds(10L);
 
        @ClassRule
        public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+       private static JobGraph jobGraph;
+
+       private static ArchivedExecutionGraph archivedExecutionGraph;
+
        private static TestingRpcService rpcService;
 
        private static Configuration configuration;
@@ -100,6 +96,8 @@ public class MiniDispatcherTest extends TestLogger {
 
        private CompletableFuture<JobGraph> jobGraphFuture;
 
+       private CompletableFuture<ArchivedExecutionGraph> resultFuture;
+
        private TestingLeaderElectionService dispatcherLeaderElectionService;
 
        private TestingHighAvailabilityServices highAvailabilityServices;
@@ -110,6 +108,13 @@ public class MiniDispatcherTest extends TestLogger {
 
        @BeforeClass
        public static void setupClass() throws IOException {
+               jobGraph = new JobGraph();
+
+               archivedExecutionGraph = new ArchivedExecutionGraphBuilder()
+                       .setJobID(jobGraph.getJobID())
+                       .setState(JobStatus.FINISHED)
+                       .build();
+
                rpcService = new TestingRpcService();
                configuration = new Configuration();
 
@@ -127,8 +132,9 @@ public class MiniDispatcherTest extends TestLogger {
                
highAvailabilityServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
 
                jobGraphFuture = new CompletableFuture<>();
+               resultFuture = new CompletableFuture<>();
 
-               testingJobManagerRunnerFactory = new 
TestingJobManagerRunnerFactory(jobGraphFuture);
+               testingJobManagerRunnerFactory = new 
TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture);
        }
 
        @After
@@ -180,15 +186,13 @@ public class MiniDispatcherTest extends TestLogger {
                miniDispatcher.start();
 
                try {
-                       final Dispatcher.DispatcherOnCompleteActions 
completeActions = miniDispatcher.new 
DispatcherOnCompleteActions(jobGraph.getJobID());
-
                        // wait until the Dispatcher is the leader
                        
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
                        // wait until we have submitted the job
                        jobGraphFuture.get();
 
-                       
completeActions.jobReachedGloballyTerminalState(archivedExecutionGraph);
+                       resultFuture.complete(archivedExecutionGraph);
 
                        // wait until we terminate
                        miniDispatcher.getTerminationFuture().get();
@@ -208,15 +212,13 @@ public class MiniDispatcherTest extends TestLogger {
                miniDispatcher.start();
 
                try {
-                       final Dispatcher.DispatcherOnCompleteActions 
completeActions = miniDispatcher.new 
DispatcherOnCompleteActions(jobGraph.getJobID());
-
                        // wait until the Dispatcher is the leader
                        
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
                        // wait until we have submitted the job
                        jobGraphFuture.get();
 
-                       
completeActions.jobReachedGloballyTerminalState(archivedExecutionGraph);
+                       resultFuture.complete(archivedExecutionGraph);
 
                        final CompletableFuture<Boolean> terminationFuture = 
miniDispatcher.getTerminationFuture();
 
@@ -262,16 +264,31 @@ public class MiniDispatcherTest extends TestLogger {
        private static final class TestingJobManagerRunnerFactory implements 
Dispatcher.JobManagerRunnerFactory {
 
                private final CompletableFuture<JobGraph> jobGraphFuture;
+               private final CompletableFuture<ArchivedExecutionGraph> 
resultFuture;
 
-               private 
TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> jobGraphFuture) {
+               private 
TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> jobGraphFuture, 
CompletableFuture<ArchivedExecutionGraph> resultFuture) {
                        this.jobGraphFuture = jobGraphFuture;
+                       this.resultFuture = resultFuture;
                }
 
                @Override
-               public JobManagerRunner createJobManagerRunner(ResourceID 
resourceId, JobGraph jobGraph, Configuration configuration, RpcService 
rpcService, HighAvailabilityServices highAvailabilityServices, 
HeartbeatServices heartbeatServices, BlobServer blobServer, 
JobManagerSharedServices jobManagerSharedServices, MetricRegistry 
metricRegistry, OnCompletionActions onCompleteActions, FatalErrorHandler 
fatalErrorHandler, @Nullable String restAddress) throws Exception {
+               public JobManagerRunner createJobManagerRunner(
+                               ResourceID resourceId,
+                               JobGraph jobGraph,
+                               Configuration configuration,
+                               RpcService rpcService,
+                               HighAvailabilityServices 
highAvailabilityServices,
+                               HeartbeatServices heartbeatServices,
+                               BlobServer blobServer,
+                               JobManagerSharedServices 
jobManagerSharedServices,
+                               MetricRegistry metricRegistry,
+                               @Nullable String restAddress) throws Exception {
                        jobGraphFuture.complete(jobGraph);
 
-                       return mock(JobManagerRunner.class);
+                       final JobManagerRunner mock = 
mock(JobManagerRunner.class);
+                       when(mock.getResultFuture()).thenReturn(resultFuture);
+
+                       return mock;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/075f5b69/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
index 174422f..ba72293 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
@@ -18,7 +18,204 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-public class JobManagerRunnerTest {
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link JobManagerRunner}
+ */
+public class JobManagerRunnerTest extends TestLogger {
+
+       @ClassRule
+       public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       private static Configuration configuration;
+
+       private static TestingRpcService rpcService;
+
+       private static BlobServer blobServer;
+
+       private static HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 1000L);
+
+       private static JobManagerSharedServices jobManagerSharedServices;
+
+       private static MetricRegistry metricRegistry;
+
+       private static JobGraph jobGraph;
+
+       private static ArchivedExecutionGraph archivedExecutionGraph;
+
+       private TestingHighAvailabilityServices haServices;
+
+       @BeforeClass
+       public static void setupClass() throws Exception {
+               configuration = new Configuration();
+               rpcService = new TestingRpcService();
+
+               configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
+               blobServer = new BlobServer(
+                       configuration,
+                       new VoidBlobStore());
+
+               jobManagerSharedServices = 
JobManagerSharedServices.fromConfiguration(configuration, blobServer);
+
+               metricRegistry = NoOpMetricRegistry.INSTANCE;
+
+               final JobVertex jobVertex = new JobVertex("Test vertex");
+               jobVertex.setInvokableClass(NoOpInvokable.class);
+               jobGraph = new JobGraph(jobVertex);
+
+               archivedExecutionGraph = new ArchivedExecutionGraphBuilder()
+                       .setJobID(jobGraph.getJobID())
+                       .setState(JobStatus.FINISHED)
+                       .build();
+       }
+
+       @Before
+       public void setup() {
+               haServices = new TestingHighAvailabilityServices();
+               
haServices.setJobMasterLeaderElectionService(jobGraph.getJobID(), new 
TestingLeaderElectionService());
+               haServices.setResourceManagerLeaderRetriever(new 
TestingLeaderRetrievalService());
+               haServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
+       }
+
+       @After
+       public void tearDown() {
+
+       }
+
+       @AfterClass
+       public static void tearDownClass() throws Exception {
+               if (jobManagerSharedServices != null) {
+                       jobManagerSharedServices.shutdown();
+               }
+
+               if (blobServer != null) {
+                       blobServer.close();
+               }
+
+               if (rpcService != null) {
+                       rpcService.stopService();
+               }
+       }
        
-       // TODO: Test that 
+       @Test
+       public void testJobCompletion() throws Exception {
+               final JobManagerRunner jobManagerRunner = 
createJobManagerRunner();
+
+               try {
+                       jobManagerRunner.start();
+
+                       final CompletableFuture<ArchivedExecutionGraph> 
resultFuture = jobManagerRunner.getResultFuture();
+
+                       assertThat(resultFuture.isDone(), is(false));
+
+                       
jobManagerRunner.jobReachedGloballyTerminalState(archivedExecutionGraph);
+
+                       assertThat(resultFuture.get(), 
is(archivedExecutionGraph));
+               } finally {
+                       jobManagerRunner.shutdown();
+               }
+       }
+
+       @Test
+       public void testJobFinishedByOther() throws Exception {
+               final JobManagerRunner jobManagerRunner = 
createJobManagerRunner();
+
+               try {
+                       jobManagerRunner.start();
+
+                       final CompletableFuture<ArchivedExecutionGraph> 
resultFuture = jobManagerRunner.getResultFuture();
+
+                       assertThat(resultFuture.isDone(), is(false));
+
+                       jobManagerRunner.jobFinishedByOther();
+
+                       try {
+                               resultFuture.get();
+                               fail("Should have failed.");
+                       } catch (ExecutionException ee) {
+                               
assertThat(ExceptionUtils.stripExecutionException(ee), 
instanceOf(JobNotFinishedException.class));
+                       }
+               } finally {
+                       jobManagerRunner.shutdown();
+               }
+       }
+
+       @Test
+       public void testShutDown() throws Exception {
+               final JobManagerRunner jobManagerRunner = 
createJobManagerRunner();
+
+               try {
+                       jobManagerRunner.start();
+
+                       final CompletableFuture<ArchivedExecutionGraph> 
resultFuture = jobManagerRunner.getResultFuture();
+
+                       assertThat(resultFuture.isDone(), is(false));
+
+                       jobManagerRunner.shutdown();
+
+                       try {
+                               resultFuture.get();
+                               fail("Should have failed.");
+                       } catch (ExecutionException ee) {
+                               
assertThat(ExceptionUtils.stripExecutionException(ee), 
instanceOf(JobNotFinishedException.class));
+                       }
+               } finally {
+                       jobManagerRunner.shutdown();
+               }
+       }
+
+       @Nonnull
+       private JobManagerRunner createJobManagerRunner() throws Exception {
+               return new JobManagerRunner(
+                       ResourceID.generate(),
+                       jobGraph,
+                       configuration,
+                       rpcService,
+                       haServices,
+                       heartbeatServices,
+                       blobServer,
+                       jobManagerSharedServices,
+                       metricRegistry,
+                       null);
+       }
 }

Reply via email to