This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 69db02bddbb535bfc918e97a796037e122a87a71
Author: Kostas Kloudas <[email protected]>
AuthorDate: Wed Oct 21 20:55:44 2020 +0200

    [hotfix] Merge ApplicationDispatcherBootstrap#initialize() with constructor.
---
 .../ApplicationDispatcherBootstrap.java            | 38 +++++-----
 ...ApplicationDispatcherGatewayServiceFactory.java |  3 +-
 .../ApplicationDispatcherBootstrapTest.java        | 84 ++++++++++++----------
 .../flink/runtime/dispatcher/Dispatcher.java       | 10 +--
 .../runtime/dispatcher/DispatcherBootstrap.java    | 13 +---
 ...tstrap.java => DispatcherBootstrapFactory.java} | 23 ++----
 .../runtime/dispatcher/DispatcherFactory.java      |  4 +-
 .../runtime/dispatcher/JobDispatcherFactory.java   |  4 +-
 .../flink/runtime/dispatcher/MiniDispatcher.java   |  4 +-
 .../dispatcher/NoOpDispatcherBootstrap.java        |  6 --
 .../dispatcher/SessionDispatcherFactory.java       |  4 +-
 .../runtime/dispatcher/StandaloneDispatcher.java   |  4 +-
 .../DefaultDispatcherGatewayServiceFactory.java    |  2 +-
 .../dispatcher/DispatcherResourceCleanupTest.java  |  2 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   |  7 +-
 .../runtime/dispatcher/MiniDispatcherTest.java     |  2 +-
 .../runtime/dispatcher/TestingDispatcher.java      |  3 +-
 .../runner/DefaultDispatcherRunnerITCase.java      |  6 +-
 18 files changed, 97 insertions(+), 122 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
index a817048..1443804 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
@@ -84,7 +84,9 @@ public class ApplicationDispatcherBootstrap implements 
DispatcherBootstrap {
 
        private final FatalErrorHandler errorHandler;
 
-       private CompletableFuture<Void> applicationCompletionFuture;
+       private final CompletableFuture<Void> applicationCompletionFuture;
+
+       private final CompletableFuture<Acknowledge> clusterShutdownFuture;
 
        private ScheduledFuture<?> applicationExecutionTask;
 
@@ -92,20 +94,19 @@ public class ApplicationDispatcherBootstrap implements 
DispatcherBootstrap {
                        final PackagedProgram application,
                        final Collection<JobID> recoveredJobIds,
                        final Configuration configuration,
+                       final DispatcherGateway dispatcherGateway,
+                       final ScheduledExecutor scheduledExecutor,
                        final FatalErrorHandler errorHandler) {
                this.configuration = checkNotNull(configuration);
                this.recoveredJobIds = checkNotNull(recoveredJobIds);
                this.application = checkNotNull(application);
                this.errorHandler = checkNotNull(errorHandler);
-       }
 
-       @Override
-       public void initialize(final DispatcherGateway dispatcherGateway, 
ScheduledExecutor scheduledExecutor) {
-               checkNotNull(dispatcherGateway);
+               this.applicationCompletionFuture =
+                               
fixJobIdAndRunApplicationAsync(dispatcherGateway, scheduledExecutor);
 
-               runApplicationAndShutdownClusterAsync(
-                               dispatcherGateway,
-                               scheduledExecutor);
+               this.clusterShutdownFuture =
+                               
runApplicationAndShutdownClusterAsync(dispatcherGateway);
        }
 
        @Override
@@ -124,17 +125,21 @@ public class ApplicationDispatcherBootstrap implements 
DispatcherBootstrap {
                return applicationExecutionTask;
        }
 
+       @VisibleForTesting
+       CompletableFuture<Void> getApplicationCompletionFuture() {
+               return applicationCompletionFuture;
+       }
+
+       @VisibleForTesting
+       CompletableFuture<Acknowledge> getClusterShutdownFuture() {
+               return clusterShutdownFuture;
+       }
+
        /**
         * Runs the user program entrypoint and shuts down the given 
dispatcherGateway when
         * the application completes (either successfully or in case of 
failure).
         */
-       @VisibleForTesting
-       CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(
-                       final DispatcherGateway dispatcherGateway,
-                       final ScheduledExecutor scheduledExecutor) {
-
-               applicationCompletionFuture = 
fixJobIdAndRunApplicationAsync(dispatcherGateway, scheduledExecutor);
-
+       private CompletableFuture<Acknowledge> 
runApplicationAndShutdownClusterAsync(final DispatcherGateway 
dispatcherGateway) {
                return applicationCompletionFuture
                                .handle((r, t) -> {
                                        if (t != null) {
@@ -163,8 +168,7 @@ public class ApplicationDispatcherBootstrap implements 
DispatcherBootstrap {
                                .thenCompose(Function.identity());
        }
 
-       @VisibleForTesting
-       CompletableFuture<Void> fixJobIdAndRunApplicationAsync(
+       private CompletableFuture<Void> fixJobIdAndRunApplicationAsync(
                        final DispatcherGateway dispatcherGateway,
                        final ScheduledExecutor scheduledExecutor) {
 
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
index a749fad..a492cf8 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
@@ -88,7 +88,8 @@ public class ApplicationDispatcherGatewayServiceFactory 
implements AbstractDispa
                                        rpcService,
                                        fencingToken,
                                        recoveredJobs,
-                                       errorHandler -> new 
ApplicationDispatcherBootstrap(application, recoveredJobIds, configuration, 
errorHandler),
+                                       (dispatcherGateway, scheduledExecutor, 
errorHandler) -> new ApplicationDispatcherBootstrap(
+                                                       application, 
recoveredJobIds, configuration, dispatcherGateway, scheduledExecutor, 
errorHandler),
                                        
PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, 
jobGraphWriter));
                } catch (Exception e) {
                        throw new FlinkRuntimeException("Could not create the 
Dispatcher rpc endpoint.", e);
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
index df1c786..09c8dbe 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -263,10 +264,11 @@ public class ApplicationDispatcherBootstrapTest {
                                })
                                .setRequestJobResultFunction(jobId -> 
CompletableFuture.completedFuture(createCancelledJobResult(jobId)));
 
-               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(3);
+               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
+                               3, dispatcherBuilder.build(), 
scheduledExecutor);
 
                final CompletableFuture<Acknowledge> shutdownFuture =
-                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), 
scheduledExecutor);
+                               bootstrap.getClusterShutdownFuture();
 
                // wait until the bootstrap "thinks" it's done, also makes sure 
that we don't
                // fail the future exceptionally with a JobCancelledException
@@ -282,10 +284,10 @@ public class ApplicationDispatcherBootstrapTest {
                                .setRequestJobStatusFunction(jobId -> 
CompletableFuture.completedFuture(JobStatus.FINISHED))
                                .setRequestJobResultFunction(jobId -> 
CompletableFuture.completedFuture(createSuccessfulJobResult(jobId)));
 
-               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(3);
+               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
+                               3, dispatcherBuilder.build(), 
scheduledExecutor);
 
-               final CompletableFuture<Acknowledge> shutdownFuture =
-                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), 
scheduledExecutor);
+               final CompletableFuture<Acknowledge> shutdownFuture = 
bootstrap.getClusterShutdownFuture();
 
                ScheduledFuture<?> applicationExecutionFuture = 
bootstrap.getApplicationExecutionFuture();
 
@@ -306,10 +308,9 @@ public class ApplicationDispatcherBootstrapTest {
                // In production, this will shut down the cluster with an 
exception.
                final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
                final ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
-                               3, errorHandlerFuture::completeExceptionally);
+                               3, dispatcherBuilder.build(), 
scheduledExecutor, errorHandlerFuture::completeExceptionally);
 
-               final CompletableFuture<Acknowledge> shutdownFuture =
-                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), 
scheduledExecutor);
+               final CompletableFuture<Acknowledge> shutdownFuture = 
bootstrap.getClusterShutdownFuture();
 
                ScheduledFuture<?> applicationExecutionFuture = 
bootstrap.getApplicationExecutionFuture();
 
@@ -335,10 +336,9 @@ public class ApplicationDispatcherBootstrapTest {
                // In production, this will shut down the cluster with an 
exception.
                final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
                final ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
-                               2, errorHandlerFuture::completeExceptionally);
+                               2, dispatcherBuilder.build(), 
scheduledExecutor, errorHandlerFuture::completeExceptionally);
 
-               final CompletableFuture<Acknowledge> shutdownFuture =
-                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), 
scheduledExecutor);
+               final CompletableFuture<Acknowledge> shutdownFuture = 
bootstrap.getClusterShutdownFuture();
 
                bootstrap.stop();
 
@@ -363,12 +363,11 @@ public class ApplicationDispatcherBootstrapTest {
                // we're "listening" on this to be completed to verify that the 
error handler is called.
                // In production, this will shut down the cluster with an 
exception.
                final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
+               final TestingDispatcherGateway dispatcherGateway = 
dispatcherBuilder.build();
                final ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
-                               3, errorHandlerFuture::completeExceptionally);
+                               3, dispatcherGateway, scheduledExecutor, 
errorHandlerFuture::completeExceptionally);
 
-               final TestingDispatcherGateway dispatcherGateway = 
dispatcherBuilder.build();
-               final CompletableFuture<Acknowledge> shutdownFuture =
-                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherGateway, 
scheduledExecutor);
+               final CompletableFuture<Acknowledge> shutdownFuture = 
bootstrap.getClusterShutdownFuture();
 
                // we call the error handler
                assertException(errorHandlerFuture, 
ApplicationExecutionException.class);
@@ -392,10 +391,10 @@ public class ApplicationDispatcherBootstrapTest {
                                        return 
CompletableFuture.completedFuture(Acknowledge.get());
                                });
 
-               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(3);
+               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
+                               3, dispatcherBuilder.build(), 
scheduledExecutor);
 
-               final CompletableFuture<Acknowledge> shutdownFuture =
-                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), 
scheduledExecutor);
+               final CompletableFuture<Acknowledge> shutdownFuture = 
bootstrap.getClusterShutdownFuture();
 
                // wait until the bootstrap "thinks" it's done
                shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
@@ -419,10 +418,10 @@ public class ApplicationDispatcherBootstrapTest {
                                        return 
CompletableFuture.completedFuture(Acknowledge.get());
                                });
 
-               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(3);
+               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
+                               3, dispatcherBuilder.build(), 
scheduledExecutor);
 
-               final CompletableFuture<Acknowledge> shutdownFuture =
-                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), 
scheduledExecutor);
+               final CompletableFuture<Acknowledge> shutdownFuture = 
bootstrap.getClusterShutdownFuture();
 
                // wait until the bootstrap "thinks" it's done
                shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
@@ -446,10 +445,10 @@ public class ApplicationDispatcherBootstrapTest {
                                        return 
CompletableFuture.completedFuture(Acknowledge.get());
                                });
 
-               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(3);
+               ApplicationDispatcherBootstrap bootstrap =
+                               createApplicationDispatcherBootstrap(3, 
dispatcherBuilder.build(), scheduledExecutor);
 
-               final CompletableFuture<Acknowledge> shutdownFuture =
-                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), 
scheduledExecutor);
+               final CompletableFuture<Acknowledge> shutdownFuture = 
bootstrap.getClusterShutdownFuture();
 
                // wait until the bootstrap "thinks" it's done
                shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
@@ -471,11 +470,11 @@ public class ApplicationDispatcherBootstrapTest {
                                        return 
CompletableFuture.completedFuture(Acknowledge.get());
                                });
 
-               final ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(3);
-
                final TestingDispatcherGateway dispatcherGateway = 
dispatcherBuilder.build();
-               final CompletableFuture<Acknowledge> applicationFuture =
-                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherGateway, 
scheduledExecutor);
+               final ApplicationDispatcherBootstrap bootstrap =
+                               createApplicationDispatcherBootstrap(3, 
dispatcherGateway, scheduledExecutor);
+
+               final CompletableFuture<Acknowledge> applicationFuture = 
bootstrap.getClusterShutdownFuture();
 
                final ApplicationFailureException exception = 
assertException(applicationFuture, ApplicationFailureException.class);
                assertEquals(exception.getStatus(), ApplicationStatus.UNKNOWN);
@@ -509,22 +508,31 @@ public class ApplicationDispatcherBootstrapTest {
 
                final ApplicationDispatcherBootstrap bootstrap =
                                new ApplicationDispatcherBootstrap(
-                                               program, 
Collections.emptyList(), configuration, exception -> {});
-
-               return bootstrap.fixJobIdAndRunApplicationAsync(
-                               dispatcherBuilder.build(),
-                               scheduledExecutor);
+                                               program,
+                                               Collections.emptyList(),
+                                               configuration,
+                                               dispatcherBuilder.build(),
+                                               scheduledExecutor,
+                                               exception -> {});
+
+               return bootstrap.getApplicationCompletionFuture();
        }
 
-       private ApplicationDispatcherBootstrap 
createApplicationDispatcherBootstrap(int noOfJobs) throws FlinkException {
-               return createApplicationDispatcherBootstrap(noOfJobs, exception 
-> {});
+       private ApplicationDispatcherBootstrap 
createApplicationDispatcherBootstrap(
+                       final int noOfJobs,
+                       final DispatcherGateway dispatcherGateway,
+                       final ScheduledExecutor scheduledExecutor) throws 
FlinkException {
+               return createApplicationDispatcherBootstrap(noOfJobs, 
dispatcherGateway, scheduledExecutor, exception -> {});
        }
 
        private ApplicationDispatcherBootstrap 
createApplicationDispatcherBootstrap(
-                       int noOfJobs,
-                       FatalErrorHandler errorHandler) throws FlinkException {
+                       final int noOfJobs,
+                       final DispatcherGateway dispatcherGateway,
+                       final ScheduledExecutor scheduledExecutor,
+                       final FatalErrorHandler errorHandler) throws 
FlinkException {
                final PackagedProgram program = getProgram(noOfJobs);
-               return new ApplicationDispatcherBootstrap(program, 
Collections.emptyList(), getConfiguration(), errorHandler);
+               return new ApplicationDispatcherBootstrap(
+                               program, Collections.emptyList(), 
getConfiguration(), dispatcherGateway, scheduledExecutor, errorHandler);
        }
 
        private PackagedProgram getProgram(int noOfJobs) throws FlinkException {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 4aeba30..55e2c3d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -122,7 +122,7 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
 
        private final Collection<JobGraph> recoveredJobs;
 
-       private final Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory;
+       private final DispatcherBootstrapFactory dispatcherBootstrapFactory;
 
        private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
 
@@ -145,7 +145,7 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                        RpcService rpcService,
                        DispatcherId fencingToken,
                        Collection<JobGraph> recoveredJobs,
-                       Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
+                       DispatcherBootstrapFactory dispatcherBootstrapFactory,
                        DispatcherServices dispatcherServices) throws Exception 
{
                super(rpcService, 
AkkaRpcServiceUtils.createRandomName(DISPATCHER_NAME), fencingToken);
                checkNotNull(dispatcherServices);
@@ -206,9 +206,11 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                        throw exception;
                }
 
-               this.dispatcherBootstrap = 
this.dispatcherBootstrapFactory.apply(this::onFatalError);
                startRecoveredJobs();
-               
this.dispatcherBootstrap.initialize(getSelfGateway(DispatcherGateway.class), 
this.getRpcService().getScheduledExecutor());
+               this.dispatcherBootstrap = 
this.dispatcherBootstrapFactory.create(
+                               getSelfGateway(DispatcherGateway.class),
+                               this.getRpcService().getScheduledExecutor() ,
+                               this::onFatalError);
        }
 
        private void startDispatcherServices() throws Exception {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrap.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrap.java
index 5cf9336..d8b7b80 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrap.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrap.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 
 /**
  * An interface containing the logic of bootstrapping the {@link Dispatcher} 
of a cluster.
@@ -28,18 +27,8 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 public interface DispatcherBootstrap {
 
        /**
-        * Initializes the {@link Dispatcher} provided as an argument.
-        *
-        * <p>IMPORTANT: In HA settings, this method will run during
-        * the initialization of the **leader** dispatcher.
-        *
-        * @param dispatcher the dispatcher to be initialized.
-        */
-       void initialize(final DispatcherGateway dispatcher, ScheduledExecutor 
scheduledExecutor) throws Exception;
-
-       /**
         * Stops and frees any resources (e.g. threads) acquired
-        * by the {@link #initialize(DispatcherGateway, ScheduledExecutor)}.
+        * during the execution of the bootstrap.
         */
        void stop() throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrapFactory.java
similarity index 67%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrapFactory.java
index 9d3934d..2cbba75 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrapFactory.java
@@ -20,25 +20,16 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 
 /**
- * A {@link DispatcherBootstrap} which submits the provided {@link JobGraph 
job graphs}
- * for execution upon dispatcher initialization.
+ * A factory to create a {@link DispatcherBootstrap}.
  */
 @Internal
-public class NoOpDispatcherBootstrap implements DispatcherBootstrap {
+public interface DispatcherBootstrapFactory {
 
-       public NoOpDispatcherBootstrap() {
-       }
-
-       @Override
-       public void initialize(final DispatcherGateway dispatcher, 
ScheduledExecutor scheduledExecutor) {
-
-       }
-
-       @Override
-       public void stop() throws Exception {
-               // do nothing
-       }
+       DispatcherBootstrap create(
+                       final DispatcherGateway dispatcher,
+                       final ScheduledExecutor scheduledExecutor,
+                       final FatalErrorHandler errorHandler) throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
index 54a8338..2ad968d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
@@ -19,11 +19,9 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import java.util.Collection;
-import java.util.function.Function;
 
 /**
  * {@link Dispatcher} factory interface.
@@ -37,6 +35,6 @@ public interface DispatcherFactory {
                        RpcService rpcService,
                        DispatcherId fencingToken,
                        Collection<JobGraph> recoveredJobs,
-                       Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
+                       DispatcherBootstrapFactory dispatcherBootstrapFactory,
                        PartialDispatcherServicesWithJobGraphStore 
partialDispatcherServicesWithJobGraphStore) throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
index 7b08e10..c779c19 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
@@ -21,13 +21,11 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
 import java.util.Collection;
-import java.util.function.Function;
 
 import static 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.EXECUTION_MODE;
 
@@ -42,7 +40,7 @@ public enum JobDispatcherFactory implements DispatcherFactory 
{
                        RpcService rpcService,
                        DispatcherId fencingToken,
                        Collection<JobGraph> recoveredJobs,
-                       Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
+                       DispatcherBootstrapFactory dispatcherBootstrapFactory,
                        PartialDispatcherServicesWithJobGraphStore 
partialDispatcherServicesWithJobGraphStore) throws Exception {
                final JobGraph jobGraph = 
Iterables.getOnlyElement(recoveredJobs);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 568e456..b06453d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.FlinkException;
 
@@ -36,7 +35,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -59,7 +57,7 @@ public class MiniDispatcher extends Dispatcher {
                        DispatcherId fencingToken,
                        DispatcherServices dispatcherServices,
                        JobGraph jobGraph,
-                       Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
+                       DispatcherBootstrapFactory dispatcherBootstrapFactory,
                        JobClusterEntrypoint.ExecutionMode executionMode) 
throws Exception {
                super(
                        rpcService,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java
index 9d3934d..b4a3b36 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
 /**
@@ -33,11 +32,6 @@ public class NoOpDispatcherBootstrap implements 
DispatcherBootstrap {
        }
 
        @Override
-       public void initialize(final DispatcherGateway dispatcher, 
ScheduledExecutor scheduledExecutor) {
-
-       }
-
-       @Override
        public void stop() throws Exception {
                // do nothing
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
index 05fcc14..3aa7b4b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
@@ -19,11 +19,9 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import java.util.Collection;
-import java.util.function.Function;
 
 /**
  * {@link DispatcherFactory} which creates a {@link StandaloneDispatcher}.
@@ -36,7 +34,7 @@ public enum SessionDispatcherFactory implements 
DispatcherFactory {
                        RpcService rpcService,
                        DispatcherId fencingToken,
                        Collection<JobGraph> recoveredJobs,
-                       Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
+                       DispatcherBootstrapFactory dispatcherBootstrapFactory,
                        PartialDispatcherServicesWithJobGraphStore 
partialDispatcherServicesWithJobGraphStore) throws Exception {
                // create the default dispatcher
                return new StandaloneDispatcher(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 6b7b942..8d88d6d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -20,11 +20,9 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import java.util.Collection;
-import java.util.function.Function;
 
 /**
  * Dispatcher implementation which spawns a {@link JobMaster} for each
@@ -36,7 +34,7 @@ public class StandaloneDispatcher extends Dispatcher {
                        RpcService rpcService,
                        DispatcherId fencingToken,
                        Collection<JobGraph> recoveredJobs,
-                       Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
+                       DispatcherBootstrapFactory dispatcherBootstrapFactory,
                        DispatcherServices dispatcherServices) throws Exception 
{
                super(
                        rpcService,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java
index adc1440..67a3231 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java
@@ -63,7 +63,7 @@ class DefaultDispatcherGatewayServiceFactory implements 
AbstractDispatcherLeader
                                rpcService,
                                fencingToken,
                                recoveredJobs,
-                               errorHandler -> new NoOpDispatcherBootstrap(),
+                               (dispatcherGateway, scheduledExecutor, 
errorHandler) -> new NoOpDispatcherBootstrap(),
                                
PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, 
jobGraphWriter));
                } catch (Exception e) {
                        throw new FlinkRuntimeException("Could not create the 
Dispatcher rpc endpoint.", e);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index 06c9cc8b..fdbf165 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -194,7 +194,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                        rpcService,
                        DispatcherId.generate(),
                        Collections.emptyList(),
-                       errorHandler -> new NoOpDispatcherBootstrap(),
+                       (dispatcher, scheduledExecutor, errorHandler) -> new 
NoOpDispatcherBootstrap(),
                        new DispatcherServices(
                                configuration,
                                highAvailabilityServices,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 3ccc6b8..1ffaf86 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -99,7 +99,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
 
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
@@ -206,8 +205,8 @@ public class DispatcherTest extends TestLogger {
 
                private Collection<JobGraph> initialJobGraphs = 
Collections.emptyList();
 
-               private Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory =
-                               errorHandler -> new NoOpDispatcherBootstrap();
+               private DispatcherBootstrapFactory dispatcherBootstrapFactory =
+                               (dispatcher, scheduledExecutor, errorHandler) 
-> new NoOpDispatcherBootstrap();
 
                private HeartbeatServices heartbeatServices = 
DispatcherTest.this.heartbeatServices;
 
@@ -233,7 +232,7 @@ public class DispatcherTest extends TestLogger {
                }
 
                TestingDispatcherBuilder setDispatcherBootstrapFactory(
-                               Function<FatalErrorHandler, 
DispatcherBootstrap> dispatcherBootstrapFactory) {
+                               DispatcherBootstrapFactory 
dispatcherBootstrapFactory) {
                        this.dispatcherBootstrapFactory = 
dispatcherBootstrapFactory;
                        return this;
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 4794797..bcded74 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -251,7 +251,7 @@ public class MiniDispatcherTest extends TestLogger {
                                highAvailabilityServices.getJobGraphStore(),
                                testingJobManagerRunnerFactory),
                        jobGraph,
-                       errorHandler -> new NoOpDispatcherBootstrap(),
+                       (dispatcher, scheduledExecutor, errorHandler) -> new 
NoOpDispatcherBootstrap(),
                        executionMode);
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index 7b2bbcb..646fcda 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import javax.annotation.Nonnull;
@@ -42,7 +41,7 @@ class TestingDispatcher extends Dispatcher {
                        RpcService rpcService,
                        DispatcherId fencingToken,
                        Collection<JobGraph> recoveredJobs,
-                       Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
+                       DispatcherBootstrapFactory dispatcherBootstrapFactory,
                        DispatcherServices dispatcherServices) throws Exception 
{
                super(
                        rpcService,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
index 51325bf..2d5613e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherBootstrap;
+import org.apache.flink.runtime.dispatcher.DispatcherBootstrapFactory;
 import org.apache.flink.runtime.dispatcher.DispatcherFactory;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
@@ -44,7 +44,6 @@ import org.apache.flink.runtime.jobmanager.JobGraphStore;
 import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -66,7 +65,6 @@ import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
@@ -209,7 +207,7 @@ public class DefaultDispatcherRunnerITCase extends 
TestLogger {
                        RpcService rpcService,
                        DispatcherId fencingToken,
                        Collection<JobGraph> recoveredJobs,
-                       Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
+                       DispatcherBootstrapFactory dispatcherBootstrapFactory,
                        PartialDispatcherServicesWithJobGraphStore 
partialDispatcherServicesWithJobGraphStore) throws Exception {
                        return new StandaloneDispatcher(
                                rpcService,

Reply via email to