asfgit closed pull request #6743: [FLINK-10411] Make ClusterEntrypoint more 
compositional
URL: https://github.com/apache/flink/pull/6743
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
new file mode 100644
index 00000000000..3e0645d6859
--- /dev/null
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+/**
+ * {@link JobGraphRetriever} which creates the {@link JobGraph} from a class
+ * on the class path.
+ */
+public class ClassPathJobGraphRetriever implements JobGraphRetriever {
+
+       @Nonnull
+       private final String jobClassName;
+
+       @Nonnull
+       private final SavepointRestoreSettings savepointRestoreSettings;
+
+       @Nonnull
+       private final String[] programArguments;
+
+       public ClassPathJobGraphRetriever(
+                       @Nonnull String jobClassName,
+                       @Nonnull SavepointRestoreSettings 
savepointRestoreSettings,
+                       @Nonnull String[] programArguments) {
+               this.jobClassName = jobClassName;
+               this.savepointRestoreSettings = savepointRestoreSettings;
+               this.programArguments = programArguments;
+       }
+
+       @Override
+       public JobGraph retrieveJobGraph(Configuration configuration) throws 
FlinkException {
+               final PackagedProgram packagedProgram = createPackagedProgram();
+               final int defaultParallelism = 
configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+               try {
+                       final JobGraph jobGraph = 
PackagedProgramUtils.createJobGraph(packagedProgram, configuration, 
defaultParallelism);
+                       jobGraph.setAllowQueuedScheduling(true);
+                       
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
+
+                       return jobGraph;
+               } catch (Exception e) {
+                       throw new FlinkException("Could not create the JobGraph 
from the provided user code jar.", e);
+               }
+       }
+
+       private PackagedProgram createPackagedProgram() throws FlinkException {
+               try {
+                       final Class<?> mainClass = 
getClass().getClassLoader().loadClass(jobClassName);
+                       return new PackagedProgram(mainClass, programArguments);
+               } catch (ClassNotFoundException | ProgramInvocationException e) 
{
+                       throw new FlinkException("Could not load the provided 
entrypoint class.", e);
+               }
+       }
+}
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index 0e400950b5c..6c42bf23a22 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -18,38 +18,20 @@
 
 package org.apache.flink.container.entrypoint;
 
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.PackagedProgramUtils;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -79,62 +61,10 @@
        }
 
        @Override
-       protected JobGraph retrieveJobGraph(Configuration configuration) throws 
FlinkException {
-               final PackagedProgram packagedProgram = createPackagedProgram();
-               final int defaultParallelism = 
configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
-               try {
-                       final JobGraph jobGraph = 
PackagedProgramUtils.createJobGraph(packagedProgram, configuration, 
defaultParallelism);
-                       jobGraph.setAllowQueuedScheduling(true);
-                       
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
-
-                       return jobGraph;
-               } catch (Exception e) {
-                       throw new FlinkException("Could not create the JobGraph 
from the provided user code jar.", e);
-               }
-       }
-
-       private PackagedProgram createPackagedProgram() throws FlinkException {
-               try {
-                       final Class<?> mainClass = 
getClass().getClassLoader().loadClass(jobClassName);
-                       return new PackagedProgram(mainClass, programArguments);
-               } catch (ClassNotFoundException | ProgramInvocationException e) 
{
-                       throw new FlinkException("Could not load the provided 
entrypoint class.", e);
-               }
-       }
-
-       @Override
-       protected void 
registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) 
{
-               terminationFuture.thenAccept((status) -> 
shutDownAndTerminate(0, ApplicationStatus.SUCCEEDED, null, true));
-       }
-
-       @Override
-       protected ResourceManager<?> createResourceManager(
-                       Configuration configuration,
-                       ResourceID resourceId,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
-                       FatalErrorHandler fatalErrorHandler,
-                       ClusterInformation clusterInformation,
-                       @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-               final ResourceManagerRuntimeServices 
resourceManagerRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
-                       resourceManagerRuntimeServicesConfiguration,
-                       highAvailabilityServices,
-                       rpcService.getScheduledExecutor());
-
-               return new StandaloneResourceManager(
-                       rpcService,
-                       ResourceManager.RESOURCE_MANAGER_NAME,
-                       resourceId,
-                       highAvailabilityServices,
-                       heartbeatServices,
-                       resourceManagerRuntimeServices.getSlotManager(),
-                       metricRegistry,
-                       resourceManagerRuntimeServices.getJobLeaderIdService(),
-                       clusterInformation,
-                       fatalErrorHandler);
+       protected DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+               return new JobDispatcherResourceManagerComponentFactory(
+                       StandaloneResourceManagerFactory.INSTANCE,
+                       new ClassPathJobGraphRetriever(jobClassName, 
savepointRestoreSettings, programArguments));
        }
 
        public static void main(String[] args) {
@@ -164,7 +94,6 @@ public static void main(String[] args) {
                        clusterConfiguration.getSavepointRestoreSettings(),
                        clusterConfiguration.getArgs());
 
-               entrypoint.startCluster();
+               ClusterEntrypoint.runClusterEntrypoint(entrypoint);
        }
-
 }
diff --git 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
similarity index 81%
rename from 
flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
rename to 
flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
index 2faec4c6627..6e460e1f894 100644
--- 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
+++ 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
@@ -32,24 +32,24 @@
 import static org.junit.Assert.assertThat;
 
 /**
- * Tests for the {@link StandaloneJobClusterEntryPoint}.
+ * Tests for the {@link ClassPathJobGraphRetriever}.
  */
-public class StandaloneJobClusterEntryPointTest extends TestLogger {
+public class ClassPathJobGraphRetrieverTest extends TestLogger {
 
        public static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"};
 
        @Test
        public void testJobGraphRetrieval() throws FlinkException {
-               final Configuration configuration = new Configuration();
                final int parallelism = 42;
+               final Configuration configuration = new Configuration();
                configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, 
parallelism);
-               final StandaloneJobClusterEntryPoint 
standaloneJobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
-                       configuration,
+
+               final ClassPathJobGraphRetriever classPathJobGraphRetriever = 
new ClassPathJobGraphRetriever(
                        TestJob.class.getCanonicalName(),
                        SavepointRestoreSettings.none(),
                        PROGRAM_ARGUMENTS);
 
-               final JobGraph jobGraph = 
standaloneJobClusterEntryPoint.retrieveJobGraph(configuration);
+               final JobGraph jobGraph = 
classPathJobGraphRetriever.retrieveJobGraph(configuration);
 
                assertThat(jobGraph.getName(), 
is(equalTo(TestJob.class.getCanonicalName() + "-suffix")));
                assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
@@ -59,13 +59,13 @@ public void testJobGraphRetrieval() throws FlinkException {
        public void testSavepointRestoreSettings() throws FlinkException {
                final Configuration configuration = new Configuration();
                final SavepointRestoreSettings savepointRestoreSettings = 
SavepointRestoreSettings.forPath("foobar", true);
-               final StandaloneJobClusterEntryPoint jobClusterEntryPoint = new 
StandaloneJobClusterEntryPoint(
-                       configuration,
+
+               final ClassPathJobGraphRetriever classPathJobGraphRetriever = 
new ClassPathJobGraphRetriever(
                        TestJob.class.getCanonicalName(),
                        savepointRestoreSettings,
                        PROGRAM_ARGUMENTS);
 
-               final JobGraph jobGraph = 
jobClusterEntryPoint.retrieveJobGraph(configuration);
+               final JobGraph jobGraph = 
classPathJobGraphRetriever.retrieveJobGraph(configuration);
 
                assertThat(jobGraph.getSavepointRestoreSettings(), 
is(equalTo(savepointRestoreSettings)));
        }
diff --git 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
index ada434dd8b7..27d1f321d0b 100644
--- 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
+++ 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
@@ -25,7 +25,7 @@
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 
 /**
- * Test job which is used for {@link StandaloneJobClusterEntryPointTest}.
+ * Test job which is used for {@link ClassPathJobGraphRetrieverTest}.
  */
 public class TestJob {
 
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 38b61d86679..377b5b5971a 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -20,31 +20,22 @@
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerFactory;
 import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import 
org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.cli.CommandLine;
@@ -52,13 +43,6 @@
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 
-import javax.annotation.Nullable;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -66,8 +50,6 @@
  */
 public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 
-       public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
-
        // 
------------------------------------------------------------------------
        //  Command-line options
        // 
------------------------------------------------------------------------
@@ -113,58 +95,6 @@ protected void initializeServices(Configuration config) 
throws Exception {
                taskManagerContainerSpec = 
MesosEntrypointUtils.createContainerSpec(config, dynamicProperties);
        }
 
-       @Override
-       protected ResourceManager<?> createResourceManager(
-                       Configuration configuration,
-                       ResourceID resourceId,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
-                       FatalErrorHandler fatalErrorHandler,
-                       ClusterInformation clusterInformation,
-                       @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-               final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
-                       rmServicesConfiguration,
-                       highAvailabilityServices,
-                       rpcService.getScheduledExecutor());
-
-               return new MesosResourceManager(
-                       rpcService,
-                       ResourceManager.RESOURCE_MANAGER_NAME,
-                       resourceId,
-                       highAvailabilityServices,
-                       heartbeatServices,
-                       rmRuntimeServices.getSlotManager(),
-                       metricRegistry,
-                       rmRuntimeServices.getJobLeaderIdService(),
-                       clusterInformation,
-                       fatalErrorHandler,
-                       configuration,
-                       mesosServices,
-                       schedulerConfiguration,
-                       taskManagerParameters,
-                       taskManagerContainerSpec,
-                       webInterfaceUrl);
-       }
-
-       @Override
-       protected JobGraph retrieveJobGraph(Configuration configuration) throws 
FlinkException {
-               String jobGraphFile = 
configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
-               File fp = new File(jobGraphFile);
-
-               try (FileInputStream input = new FileInputStream(fp);
-                       ObjectInputStream obInput = new 
ObjectInputStream(input)) {
-
-                       return (JobGraph) obInput.readObject();
-               } catch (FileNotFoundException e) {
-                       throw new FlinkException("Could not find the JobGraph 
file.", e);
-               } catch (ClassNotFoundException | IOException e) {
-                       throw new FlinkException("Could not load the JobGraph 
from file.", e);
-               }
-       }
-
        @Override
        protected CompletableFuture<Void> stopClusterServices(boolean 
cleanupHaData) {
                final CompletableFuture<Void> serviceShutDownFuture = 
super.stopClusterServices(cleanupHaData);
@@ -179,7 +109,15 @@ protected JobGraph retrieveJobGraph(Configuration 
configuration) throws FlinkExc
        }
 
        @Override
-       protected void 
registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) 
{}
+       protected DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+               return new JobDispatcherResourceManagerComponentFactory(
+                       new MesosResourceManagerFactory(
+                               mesosServices,
+                               schedulerConfiguration,
+                               taskManagerParameters,
+                               taskManagerContainerSpec),
+                       FileJobGraphRetriever.createFrom(configuration));
+       }
 
        public static void main(String[] args) {
                // startup checks and logging
@@ -204,6 +142,6 @@ public static void main(String[] args) {
 
                MesosJobClusterEntrypoint clusterEntrypoint = new 
MesosJobClusterEntrypoint(configuration, dynamicProperties);
 
-               clusterEntrypoint.startCluster();
+               ClusterEntrypoint.runClusterEntrypoint(clusterEntrypoint);
        }
 }
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index 70369f61f2a..98796282cf2 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -20,25 +20,18 @@
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerFactory;
 import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
@@ -49,8 +42,6 @@
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 
-import javax.annotation.Nullable;
-
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -103,42 +94,6 @@ protected void initializeServices(Configuration config) 
throws Exception {
                taskManagerContainerSpec = 
MesosEntrypointUtils.createContainerSpec(config, dynamicProperties);
        }
 
-       @Override
-       protected ResourceManager<?> createResourceManager(
-                       Configuration configuration,
-                       ResourceID resourceId,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
-                       FatalErrorHandler fatalErrorHandler,
-                       ClusterInformation clusterInformation,
-                       @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-               final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
-                       rmServicesConfiguration,
-                       highAvailabilityServices,
-                       rpcService.getScheduledExecutor());
-
-               return new MesosResourceManager(
-                       rpcService,
-                       ResourceManager.RESOURCE_MANAGER_NAME,
-                       resourceId,
-                       highAvailabilityServices,
-                       heartbeatServices,
-                       rmRuntimeServices.getSlotManager(),
-                       metricRegistry,
-                       rmRuntimeServices.getJobLeaderIdService(),
-                       clusterInformation,
-                       fatalErrorHandler,
-                       configuration,
-                       mesosServices,
-                       mesosConfig,
-                       taskManagerParameters,
-                       taskManagerContainerSpec,
-                       webInterfaceUrl);
-       }
-
        @Override
        protected CompletableFuture<Void> stopClusterServices(boolean 
cleanupHaData) {
                final CompletableFuture<Void> serviceShutDownFuture = 
super.stopClusterServices(cleanupHaData);
@@ -152,6 +107,16 @@ protected void initializeServices(Configuration config) 
throws Exception {
                        });
        }
 
+       @Override
+       protected DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+               return new SessionDispatcherResourceManagerComponentFactory(
+                       new MesosResourceManagerFactory(
+                               mesosServices,
+                               mesosConfig,
+                               taskManagerParameters,
+                               taskManagerContainerSpec));
+       }
+
        public static void main(String[] args) {
                // startup checks and logging
                EnvironmentInformation.logEnvironmentInfo(LOG, 
MesosSessionClusterEntrypoint.class.getSimpleName(), args);
@@ -175,7 +140,7 @@ public static void main(String[] args) {
 
                MesosSessionClusterEntrypoint clusterEntrypoint = new 
MesosSessionClusterEntrypoint(configuration, dynamicProperties);
 
-               clusterEntrypoint.startCluster();
+               ClusterEntrypoint.runClusterEntrypoint(clusterEntrypoint);
        }
 
 }
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
new file mode 100644
index 00000000000..9582e9f2e23
--- /dev/null
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * {@link ResourceManagerFactory} which creates a {@link MesosResourceManager}.
+ */
+public class MesosResourceManagerFactory implements 
ResourceManagerFactory<RegisteredMesosWorkerNode> {
+
+       @Nonnull
+       private final MesosServices mesosServices;
+
+       @Nonnull
+       private final MesosConfiguration schedulerConfiguration;
+
+       @Nonnull
+       private final MesosTaskManagerParameters taskManagerParameters;
+
+       @Nonnull
+       private final ContainerSpecification taskManagerContainerSpec;
+
+       public MesosResourceManagerFactory(@Nonnull MesosServices 
mesosServices, @Nonnull MesosConfiguration schedulerConfiguration, @Nonnull 
MesosTaskManagerParameters taskManagerParameters, @Nonnull 
ContainerSpecification taskManagerContainerSpec) {
+               this.mesosServices = mesosServices;
+               this.schedulerConfiguration = schedulerConfiguration;
+               this.taskManagerParameters = taskManagerParameters;
+               this.taskManagerContainerSpec = taskManagerContainerSpec;
+       }
+
+       @Override
+       public ResourceManager<RegisteredMesosWorkerNode> 
createResourceManager(Configuration configuration, ResourceID resourceId, 
RpcService rpcService, HighAvailabilityServices highAvailabilityServices, 
HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, 
FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, 
@Nullable String webInterfaceUrl) throws Exception {
+               final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+               final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
+                       rmServicesConfiguration,
+                       highAvailabilityServices,
+                       rpcService.getScheduledExecutor());
+
+               return new MesosResourceManager(
+                       rpcService,
+                       ResourceManager.RESOURCE_MANAGER_NAME,
+                       resourceId,
+                       highAvailabilityServices,
+                       heartbeatServices,
+                       rmRuntimeServices.getSlotManager(),
+                       metricRegistry,
+                       rmRuntimeServices.getJobLeaderIdService(),
+                       clusterInformation,
+                       fatalErrorHandler,
+                       configuration,
+                       mesosServices,
+                       schedulerConfiguration,
+                       taskManagerParameters,
+                       taskManagerContainerSpec,
+                       webInterfaceUrl);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
new file mode 100644
index 00000000000..59522998690
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link Dispatcher} factory interface.
+ */
+public interface DispatcherFactory<T extends Dispatcher> {
+
+       /**
+        * Create a {@link Dispatcher} of the given type {@link T}.
+        */
+       T createDispatcher(
+               Configuration configuration,
+               RpcService rpcService,
+               HighAvailabilityServices highAvailabilityServices,
+               ResourceManagerGateway resourceManagerGateway,
+               BlobServer blobServer,
+               HeartbeatServices heartbeatServices,
+               JobManagerMetricGroup jobManagerMetricGroup,
+               @Nullable String metricQueryServicePath,
+               ArchivedExecutionGraphStore archivedExecutionGraphStore,
+               FatalErrorHandler fatalErrorHandler,
+               @Nullable String restAddress,
+               HistoryServerArchivist historyServerArchivist) throws Exception;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
new file mode 100644
index 00000000000..e6b1a26e99c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+import static 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.EXECUTION_MODE;
+
+/**
+ * {@link DispatcherFactory} which creates a {@link MiniDispatcher}.
+ */
+public class JobDispatcherFactory implements DispatcherFactory<MiniDispatcher> 
{
+
+       private final JobGraphRetriever jobGraphRetriever;
+
+       public JobDispatcherFactory(JobGraphRetriever jobGraphRetriever) {
+               this.jobGraphRetriever = jobGraphRetriever;
+       }
+
+       @Override
+       public MiniDispatcher createDispatcher(
+                       Configuration configuration,
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       ResourceManagerGateway resourceManagerGateway,
+                       BlobServer blobServer,
+                       HeartbeatServices heartbeatServices,
+                       JobManagerMetricGroup jobManagerMetricGroup,
+                       @Nullable String metricQueryServicePath,
+                       ArchivedExecutionGraphStore archivedExecutionGraphStore,
+                       FatalErrorHandler fatalErrorHandler,
+                       @Nullable String restAddress,
+                       HistoryServerArchivist historyServerArchivist) throws 
Exception {
+               final JobGraph jobGraph = 
jobGraphRetriever.retrieveJobGraph(configuration);
+
+               final String executionModeValue = 
configuration.getString(EXECUTION_MODE);
+
+               final ClusterEntrypoint.ExecutionMode executionMode = 
ClusterEntrypoint.ExecutionMode.valueOf(executionModeValue);
+
+               return new MiniDispatcher(
+                       rpcService,
+                       Dispatcher.DISPATCHER_NAME,
+                       configuration,
+                       highAvailabilityServices,
+                       resourceManagerGateway,
+                       blobServer,
+                       heartbeatServices,
+                       jobManagerMetricGroup,
+                       metricQueryServicePath,
+                       archivedExecutionGraphStore,
+                       Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+                       fatalErrorHandler,
+                       restAddress,
+                       historyServerArchivist,
+                       jobGraph,
+                       executionMode);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
new file mode 100644
index 00000000000..18e29a09711
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link DispatcherFactory} which creates a {@link StandaloneDispatcher}.
+ */
+public enum SessionDispatcherFactory implements DispatcherFactory<Dispatcher> {
+       INSTANCE;
+
+       @Override
+       public Dispatcher createDispatcher(
+                               Configuration configuration,
+                               RpcService rpcService,
+                               HighAvailabilityServices 
highAvailabilityServices,
+                               ResourceManagerGateway resourceManagerGateway,
+                               BlobServer blobServer,
+                               HeartbeatServices heartbeatServices,
+                               JobManagerMetricGroup jobManagerMetricGroup,
+                               @Nullable String metricQueryServicePath,
+                               ArchivedExecutionGraphStore 
archivedExecutionGraphStore,
+                               FatalErrorHandler fatalErrorHandler,
+                               @Nullable String restAddress,
+                               HistoryServerArchivist historyServerArchivist) 
throws Exception {
+               // create the default dispatcher
+               return new StandaloneDispatcher(
+                       rpcService,
+                       Dispatcher.DISPATCHER_NAME,
+                       configuration,
+                       highAvailabilityServices,
+                       resourceManagerGateway,
+                       blobServer,
+                       heartbeatServices,
+                       jobManagerMetricGroup,
+                       metricQueryServicePath,
+                       archivedExecutionGraphStore,
+                       Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+                       fatalErrorHandler,
+                       restAddress,
+                       historyServerArchivist);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 1a8c0581192..c9a17227f79 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -32,32 +32,21 @@
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.TransientBlobCache;
-import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.DispatcherId;
-import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
@@ -65,11 +54,6 @@
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
-import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
@@ -91,7 +75,10 @@
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -109,21 +96,23 @@
 
        protected static final Logger LOG = 
LoggerFactory.getLogger(ClusterEntrypoint.class);
 
-       protected static final int SUCCESS_RETURN_CODE = 0;
        protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
        protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
 
+       private static final Time INITIALIZATION_SHUTDOWN_TIMEOUT = 
Time.seconds(30L);
+
        /** The lock to guard startup / shutdown / manipulation methods. */
        private final Object lock = new Object();
 
        private final Configuration configuration;
 
-       private final CompletableFuture<Void> terminationFuture;
-
-       private final AtomicBoolean isTerminating = new AtomicBoolean(false);
+       private final CompletableFuture<ApplicationStatus> terminationFuture;
 
        private final AtomicBoolean isShutDown = new AtomicBoolean(false);
 
+       @GuardedBy("lock")
+       private DispatcherResourceManagerComponent<?> clusterComponent;
+
        @GuardedBy("lock")
        private MetricRegistryImpl metricRegistry;
 
@@ -139,33 +128,12 @@
        @GuardedBy("lock")
        private RpcService commonRpcService;
 
-       @GuardedBy("lock")
-       private ResourceManager<?> resourceManager;
-
-       @GuardedBy("lock")
-       private Dispatcher dispatcher;
-
-       @GuardedBy("lock")
-       private LeaderRetrievalService dispatcherLeaderRetrievalService;
-
-       @GuardedBy("lock")
-       private LeaderRetrievalService resourceManagerRetrievalService;
-
-       @GuardedBy("lock")
-       private WebMonitorEndpoint<?> webMonitorEndpoint;
-
        @GuardedBy("lock")
        private ArchivedExecutionGraphStore archivedExecutionGraphStore;
 
        @GuardedBy("lock")
        private TransientBlobCache transientBlobCache;
 
-       @GuardedBy("lock")
-       private ClusterInformation clusterInformation;
-
-       @GuardedBy("lock")
-       private JobManagerMetricGroup jobManagerMetricGroup;
-
        private final Thread shutDownHook;
 
        protected ClusterEntrypoint(Configuration configuration) {
@@ -175,11 +143,11 @@ protected ClusterEntrypoint(Configuration configuration) {
                shutDownHook = 
ShutdownHookUtil.addShutdownHook(this::cleanupDirectories, 
getClass().getSimpleName(), LOG);
        }
 
-       public CompletableFuture<Void> getTerminationFuture() {
+       public CompletableFuture<ApplicationStatus> getTerminationFuture() {
                return terminationFuture;
        }
 
-       protected void startCluster() {
+       protected void startCluster() throws ClusterEntrypointException {
                LOG.info("Starting {}.", getClass().getSimpleName());
 
                try {
@@ -194,17 +162,24 @@ protected void startCluster() {
                        });
                } catch (Throwable t) {
                        final Throwable strippedThrowable = 
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
-                       LOG.error("Cluster initialization failed.", 
strippedThrowable);
 
-                       shutDownAndTerminate(
-                               STARTUP_FAILURE_RETURN_CODE,
-                               ApplicationStatus.FAILED,
-                               strippedThrowable.getMessage(),
-                               false);
+                       try {
+                               // clean up any partial state
+                               shutDownAsync(
+                                       ApplicationStatus.FAILED,
+                                       
ExceptionUtils.stringifyException(strippedThrowable),
+                                       
false).get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+                       } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+                               strippedThrowable.addSuppressed(e);
+                       }
+
+                       throw new ClusterEntrypointException(
+                               String.format("Failed to initialize the cluster 
entrypoint %s.", getClass().getSimpleName()),
+                               strippedThrowable);
                }
        }
 
-       protected void configureFileSystems(Configuration configuration) throws 
Exception {
+       private void configureFileSystems(Configuration configuration) throws 
Exception {
                LOG.info("Install default filesystem.");
 
                try {
@@ -223,7 +198,7 @@ protected SecurityContext 
installSecurityContext(Configuration configuration) th
                return SecurityUtils.getInstalledContext();
        }
 
-       protected void runCluster(Configuration configuration) throws Exception 
{
+       private void runCluster(Configuration configuration) throws Exception {
                synchronized (lock) {
                        initializeServices(configuration);
 
@@ -231,27 +206,33 @@ protected void runCluster(Configuration configuration) 
throws Exception {
                        configuration.setString(JobManagerOptions.ADDRESS, 
commonRpcService.getAddress());
                        configuration.setInteger(JobManagerOptions.PORT, 
commonRpcService.getPort());
 
-                       startClusterComponents(
+                       final DispatcherResourceManagerComponentFactory<?> 
dispatcherResourceManagerComponentFactory = 
createDispatcherResourceManagerComponentFactory(configuration);
+
+                       clusterComponent = 
dispatcherResourceManagerComponentFactory.create(
                                configuration,
                                commonRpcService,
                                haServices,
                                blobServer,
                                heartbeatServices,
-                               metricRegistry);
+                               metricRegistry,
+                               archivedExecutionGraphStore,
+                               this);
 
-                       dispatcher.getTerminationFuture().whenComplete(
-                               (Void value, Throwable throwable) -> {
+                       clusterComponent.getShutDownFuture().whenComplete(
+                               (ApplicationStatus applicationStatus, Throwable 
throwable) -> {
                                        if (throwable != null) {
-                                               LOG.info("Could not properly 
terminate the Dispatcher.", throwable);
+                                               shutDownAsync(
+                                                       
ApplicationStatus.UNKNOWN,
+                                                       
ExceptionUtils.stringifyException(throwable),
+                                                       false);
+                                       } else {
+                                               // This is the general shutdown 
path. If a separate more specific shutdown was
+                                               // already triggered, this will 
do nothing
+                                               shutDownAsync(
+                                                       applicationStatus,
+                                                       null,
+                                                       true);
                                        }
-
-                                       // This is the general shutdown path. 
If a separate more specific shutdown was
-                                       // already triggered, this will do 
nothing
-                                       shutDownAndTerminate(
-                                               SUCCESS_RETURN_CODE,
-                                               ApplicationStatus.SUCCEEDED,
-                                               throwable != null ? 
throwable.getMessage() : null,
-                                               true);
                                });
                }
        }
@@ -283,99 +264,11 @@ protected void initializeServices(Configuration 
configuration) throws Exception
 
                        archivedExecutionGraphStore = 
createSerializableExecutionGraphStore(configuration, 
commonRpcService.getScheduledExecutor());
 
-                       clusterInformation = new ClusterInformation(
-                               commonRpcService.getAddress(),
-                               blobServer.getPort());
-
                        transientBlobCache = new TransientBlobCache(
                                configuration,
                                new InetSocketAddress(
-                                       
clusterInformation.getBlobServerHostname(),
-                                       
clusterInformation.getBlobServerPort()));
-               }
-       }
-
-       protected void startClusterComponents(
-                       Configuration configuration,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       BlobServer blobServer,
-                       HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry) throws Exception {
-               synchronized (lock) {
-                       dispatcherLeaderRetrievalService = 
highAvailabilityServices.getDispatcherLeaderRetriever();
-
-                       resourceManagerRetrievalService = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
-
-                       LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
-                               rpcService,
-                               DispatcherGateway.class,
-                               DispatcherId::fromUuid,
-                               10,
-                               Time.milliseconds(50L));
-
-                       LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
-                               rpcService,
-                               ResourceManagerGateway.class,
-                               ResourceManagerId::fromUuid,
-                               10,
-                               Time.milliseconds(50L));
-
-                       // TODO: Remove once we have ported the MetricFetcher 
to the RpcEndpoint
-                       final ActorSystem actorSystem = ((AkkaRpcService) 
rpcService).getActorSystem();
-                       final Time timeout = 
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
-
-                       webMonitorEndpoint = createRestEndpoint(
-                               configuration,
-                               dispatcherGatewayRetriever,
-                               resourceManagerGatewayRetriever,
-                               transientBlobCache,
-                               rpcService.getExecutor(),
-                               new AkkaQueryServiceRetriever(actorSystem, 
timeout),
-                               
highAvailabilityServices.getWebMonitorLeaderElectionService());
-
-                       LOG.debug("Starting Dispatcher REST endpoint.");
-                       webMonitorEndpoint.start();
-
-                       resourceManager = createResourceManager(
-                               configuration,
-                               ResourceID.generate(),
-                               rpcService,
-                               highAvailabilityServices,
-                               heartbeatServices,
-                               metricRegistry,
-                               this,
-                               clusterInformation,
-                               webMonitorEndpoint.getRestBaseUrl());
-
-                       jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(
-                               metricRegistry,
-                               rpcService.getAddress(),
-                               
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
-
-                       final HistoryServerArchivist historyServerArchivist = 
HistoryServerArchivist.createHistoryServerArchivist(configuration, 
webMonitorEndpoint);
-
-                       dispatcher = createDispatcher(
-                               configuration,
-                               rpcService,
-                               highAvailabilityServices,
-                               
resourceManager.getSelfGateway(ResourceManagerGateway.class),
-                               blobServer,
-                               heartbeatServices,
-                               jobManagerMetricGroup,
-                               metricRegistry.getMetricQueryServicePath(),
-                               archivedExecutionGraphStore,
-                               this,
-                               webMonitorEndpoint.getRestBaseUrl(),
-                               historyServerArchivist);
-
-                       LOG.debug("Starting ResourceManager.");
-                       resourceManager.start();
-                       
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
-
-                       LOG.debug("Starting Dispatcher.");
-                       dispatcher.start();
-                       
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
+                                       commonRpcService.getAddress(),
+                                       blobServer.getPort()));
                }
        }
 
@@ -477,63 +370,6 @@ protected MetricRegistryImpl 
createMetricRegistry(Configuration configuration) {
                }
        }
 
-       protected CompletableFuture<Void> stopClusterComponents() {
-               synchronized (lock) {
-
-                       Exception exception = null;
-
-                       final Collection<CompletableFuture<Void>> 
terminationFutures = new ArrayList<>(4);
-
-                       if (dispatcherLeaderRetrievalService != null) {
-                               try {
-                                       dispatcherLeaderRetrievalService.stop();
-                               } catch (Exception e) {
-                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
-                               }
-                       }
-
-                       if (resourceManagerRetrievalService != null) {
-                               try {
-                                       resourceManagerRetrievalService.stop();
-                               } catch (Exception e) {
-                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
-                               }
-                       }
-
-                       if (webMonitorEndpoint != null) {
-                               
terminationFutures.add(webMonitorEndpoint.closeAsync());
-                       }
-
-                       if (dispatcher != null) {
-                               dispatcher.shutDown();
-                               
terminationFutures.add(dispatcher.getTerminationFuture());
-                       }
-
-                       if (resourceManager != null) {
-                               resourceManager.shutDown();
-                               
terminationFutures.add(resourceManager.getTerminationFuture());
-                       }
-
-                       if (exception != null) {
-                               
terminationFutures.add(FutureUtils.completedExceptionally(exception));
-                       }
-
-                       final CompletableFuture<Void> 
componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
-
-                       if (jobManagerMetricGroup != null) {
-                               return FutureUtils.runAfterwards(
-                                       componentTerminationFuture,
-                                       () -> {
-                                               synchronized (lock) {
-                                                       
jobManagerMetricGroup.close();
-                                               }
-                                       });
-                       } else {
-                               return componentTerminationFuture;
-                       }
-               }
-       }
-
        @Override
        public void onFatalError(Throwable exception) {
                LOG.error("Fatal error occurred in the cluster entrypoint.", 
exception);
@@ -556,21 +392,20 @@ private Configuration 
generateClusterConfiguration(Configuration configuration)
                return resultConfiguration;
        }
 
-       private CompletableFuture<Void> shutDownAsync(
-                       boolean cleanupHaData,
+       private CompletableFuture<ApplicationStatus> shutDownAsync(
                        ApplicationStatus applicationStatus,
-                       @Nullable String diagnostics) {
+                       @Nullable String diagnostics,
+                       boolean cleanupHaData) {
                if (isShutDown.compareAndSet(false, true)) {
-                       LOG.info("Stopping {}.", getClass().getSimpleName());
-
-                       final CompletableFuture<Void> shutDownApplicationFuture 
= deregisterApplication(applicationStatus, diagnostics);
+                       LOG.info("Shutting {} down with application status {}. 
Diagnostics {}.",
+                               getClass().getSimpleName(),
+                               applicationStatus,
+                               diagnostics);
 
-                       final CompletableFuture<Void> componentShutdownFuture = 
FutureUtils.composeAfterwards(
-                               shutDownApplicationFuture,
-                               this::stopClusterComponents);
+                       final CompletableFuture<Void> shutDownApplicationFuture 
= closeClusterComponent(applicationStatus, diagnostics);
 
                        final CompletableFuture<Void> serviceShutdownFuture = 
FutureUtils.composeAfterwards(
-                               componentShutdownFuture,
+                               shutDownApplicationFuture,
                                () -> stopClusterServices(cleanupHaData));
 
                        final CompletableFuture<Void> cleanupDirectoriesFuture 
= FutureUtils.runAfterwards(
@@ -582,7 +417,7 @@ private Configuration 
generateClusterConfiguration(Configuration configuration)
                                        if (serviceThrowable != null) {
                                                
terminationFuture.completeExceptionally(serviceThrowable);
                                        } else {
-                                               
terminationFuture.complete(null);
+                                               
terminationFuture.complete(applicationStatus);
                                        }
                                });
                }
@@ -590,36 +425,6 @@ private Configuration 
generateClusterConfiguration(Configuration configuration)
                return terminationFuture;
        }
 
-       protected void shutDownAndTerminate(
-               int returnCode,
-               ApplicationStatus applicationStatus,
-               @Nullable String diagnostics,
-               boolean cleanupHaData) {
-
-               if (isTerminating.compareAndSet(false, true)) {
-                       LOG.info("Shut down and terminate {} with return code 
{} and application status {}.",
-                               getClass().getSimpleName(),
-                               returnCode,
-                               applicationStatus);
-
-                       shutDownAsync(
-                               cleanupHaData,
-                               applicationStatus,
-                               diagnostics).whenComplete(
-                               (Void ignored, Throwable t) -> {
-                                       if (t != null) {
-                                               LOG.info("Could not properly 
shut down cluster entrypoint.", t);
-                                       }
-
-                                       System.exit(returnCode);
-                               });
-               } else {
-                       LOG.debug("Concurrent termination call detected. 
Ignoring termination call with return code {} and application status {}.",
-                               returnCode,
-                               applicationStatus);
-               }
-       }
-
        /**
         * Deregister the Flink application from the resource management system 
by signalling
         * the {@link ResourceManager}.
@@ -628,11 +433,12 @@ protected void shutDownAndTerminate(
         * @param diagnostics additional information about the shut down, can 
be {@code null}
         * @return Future which is completed once the shut down
         */
-       private CompletableFuture<Void> deregisterApplication(ApplicationStatus 
applicationStatus, @Nullable String diagnostics) {
+       private CompletableFuture<Void> closeClusterComponent(ApplicationStatus 
applicationStatus, @Nullable String diagnostics) {
                synchronized (lock) {
-                       if (resourceManager != null) {
-                               final ResourceManagerGateway selfGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
-                               return 
selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack 
-> null);
+                       if (clusterComponent != null) {
+                               final CompletableFuture<Void> 
deregisterApplicationFuture = 
clusterComponent.deregisterApplication(applicationStatus, diagnostics);
+
+                               return 
FutureUtils.runAfterwards(deregisterApplicationFuture, 
clusterComponent::closeAsync);
                        } else {
                                return CompletableFuture.completedFuture(null);
                        }
@@ -656,39 +462,7 @@ private void cleanupDirectories() throws IOException {
        // Abstract methods
        // --------------------------------------------------
 
-       protected abstract Dispatcher createDispatcher(
-               Configuration configuration,
-               RpcService rpcService,
-               HighAvailabilityServices highAvailabilityServices,
-               ResourceManagerGateway resourceManagerGateway,
-               BlobServer blobServer,
-               HeartbeatServices heartbeatServices,
-               JobManagerMetricGroup jobManagerMetricGroup,
-               @Nullable String metricQueryServicePath,
-               ArchivedExecutionGraphStore archivedExecutionGraphStore,
-               FatalErrorHandler fatalErrorHandler,
-               @Nullable String restAddress,
-               HistoryServerArchivist historyServerArchivist) throws Exception;
-
-       protected abstract ResourceManager<?> createResourceManager(
-               Configuration configuration,
-               ResourceID resourceId,
-               RpcService rpcService,
-               HighAvailabilityServices highAvailabilityServices,
-               HeartbeatServices heartbeatServices,
-               MetricRegistry metricRegistry,
-               FatalErrorHandler fatalErrorHandler,
-               ClusterInformation clusterInformation,
-               @Nullable String webInterfaceUrl) throws Exception;
-
-       protected abstract WebMonitorEndpoint<?> createRestEndpoint(
-               Configuration configuration,
-               LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever,
-               LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
-               TransientBlobService transientBlobService,
-               Executor executor,
-               MetricQueryServiceRetriever metricQueryServiceRetriever,
-               LeaderElectionService leaderElectionService) throws Exception;
+       protected abstract DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration);
 
        protected abstract ArchivedExecutionGraphStore 
createSerializableExecutionGraphStore(
                Configuration configuration,
@@ -719,6 +493,34 @@ protected static Configuration 
loadConfiguration(EntrypointClusterConfiguration
                return configuration;
        }
 
+       // --------------------------------------------------
+       // Helper methods
+       // --------------------------------------------------
+
+       public static void runClusterEntrypoint(ClusterEntrypoint 
clusterEntrypoint) {
+
+               final String clusterEntrypointName = 
clusterEntrypoint.getClass().getSimpleName();
+               try {
+                       clusterEntrypoint.startCluster();
+               } catch (ClusterEntrypointException e) {
+                       LOG.error(String.format("Could not start cluster 
entrypoint %s.", clusterEntrypointName), e);
+                       System.exit(STARTUP_FAILURE_RETURN_CODE);
+               }
+
+               
clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, 
throwable) -> {
+                       final int returnCode;
+
+                       if (throwable != null) {
+                               returnCode = RUNTIME_FAILURE_RETURN_CODE;
+                       } else {
+                               returnCode = 
applicationStatus.processExitCode();
+                       }
+
+                       LOG.info("Terminating cluster entrypoint process {} 
with exit code {}.", clusterEntrypointName, returnCode, throwable);
+                       System.exit(returnCode);
+               });
+       }
+
        /**
         * Execution mode of the {@link MiniDispatcher}.
         */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointException.java
new file mode 100644
index 00000000000..21d37ee01ee
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exceptions thrown by the {@link ClusterEntrypoint}.
+ */
+public class ClusterEntrypointException extends FlinkException {
+       private static final long serialVersionUID = -3855286807063809945L;
+
+       public ClusterEntrypointException(String message) {
+               super(message);
+       }
+
+       public ClusterEntrypointException(Throwable cause) {
+               super(cause);
+       }
+
+       public ClusterEntrypointException(String message, Throwable cause) {
+               super(message, cause);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 80a9da278c1..0426df9aabd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -19,35 +19,9 @@
 package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.TransientBlobService;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.MiniDispatcher;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
-import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.FlinkException;
-
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 
 /**
  * Base class for per-job cluster entry points.
@@ -58,82 +32,10 @@ public JobClusterEntrypoint(Configuration configuration) {
                super(configuration);
        }
 
-       @Override
-       protected MiniDispatcherRestEndpoint createRestEndpoint(
-                       Configuration configuration,
-                       LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever,
-                       LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
-                       TransientBlobService transientBlobService,
-                       Executor executor,
-                       MetricQueryServiceRetriever metricQueryServiceRetriever,
-                       LeaderElectionService leaderElectionService) throws 
Exception {
-               final RestHandlerConfiguration restHandlerConfiguration = 
RestHandlerConfiguration.fromConfiguration(configuration);
-
-               return new MiniDispatcherRestEndpoint(
-                       
RestServerEndpointConfiguration.fromConfiguration(configuration),
-                       dispatcherGatewayRetriever,
-                       configuration,
-                       restHandlerConfiguration,
-                       resourceManagerGatewayRetriever,
-                       transientBlobService,
-                       executor,
-                       metricQueryServiceRetriever,
-                       leaderElectionService,
-                       this);
-       }
-
        @Override
        protected ArchivedExecutionGraphStore 
createSerializableExecutionGraphStore(
                        Configuration configuration,
                        ScheduledExecutor scheduledExecutor) {
                return new MemoryArchivedExecutionGraphStore();
        }
-
-       @Override
-       protected Dispatcher createDispatcher(
-                       Configuration configuration,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       ResourceManagerGateway resourceManagerGateway,
-                       BlobServer blobServer,
-                       HeartbeatServices heartbeatServices,
-                       JobManagerMetricGroup jobManagerMetricGroup,
-                       @Nullable String metricQueryServicePath,
-                       ArchivedExecutionGraphStore archivedExecutionGraphStore,
-                       FatalErrorHandler fatalErrorHandler,
-                       @Nullable String restAddress,
-                       HistoryServerArchivist historyServerArchivist) throws 
Exception {
-
-               final JobGraph jobGraph = retrieveJobGraph(configuration);
-
-               final String executionModeValue = 
configuration.getString(EXECUTION_MODE);
-
-               final ExecutionMode executionMode = 
ExecutionMode.valueOf(executionModeValue);
-
-               final MiniDispatcher dispatcher = new MiniDispatcher(
-                       rpcService,
-                       Dispatcher.DISPATCHER_NAME,
-                       configuration,
-                       highAvailabilityServices,
-                       resourceManagerGateway,
-                       blobServer,
-                       heartbeatServices,
-                       jobManagerMetricGroup,
-                       metricQueryServicePath,
-                       archivedExecutionGraphStore,
-                       Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
-                       fatalErrorHandler,
-                       restAddress,
-                       historyServerArchivist,
-                       jobGraph,
-                       executionMode);
-
-               registerShutdownActions(dispatcher.getJobTerminationFuture());
-
-               return dispatcher;
-       }
-
-       protected abstract JobGraph retrieveJobGraph(Configuration 
configuration) throws FlinkException;
-
-       protected abstract void 
registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 40eb8b76104..1fb693cf83b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -22,35 +22,14 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
-import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
-import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
 import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
 
-import javax.annotation.Nullable;
-
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.Executor;
 
 /**
  * Base class for session cluster entry points.
@@ -77,62 +56,4 @@ protected ArchivedExecutionGraphStore 
createSerializableExecutionGraphStore(
                        scheduledExecutor,
                        Ticker.systemTicker());
        }
-
-       @Override
-       protected DispatcherRestEndpoint createRestEndpoint(
-                       Configuration configuration,
-                       LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever,
-                       LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
-                       TransientBlobService transientBlobService,
-                       Executor executor,
-                       MetricQueryServiceRetriever metricQueryServiceRetriever,
-                       LeaderElectionService leaderElectionService) throws 
Exception {
-
-               final RestHandlerConfiguration restHandlerConfiguration = 
RestHandlerConfiguration.fromConfiguration(configuration);
-
-               return new DispatcherRestEndpoint(
-                       
RestServerEndpointConfiguration.fromConfiguration(configuration),
-                       dispatcherGatewayRetriever,
-                       configuration,
-                       restHandlerConfiguration,
-                       resourceManagerGatewayRetriever,
-                       transientBlobService,
-                       executor,
-                       metricQueryServiceRetriever,
-                       leaderElectionService,
-                       this);
-       }
-
-       @Override
-       protected Dispatcher createDispatcher(
-                       Configuration configuration,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       ResourceManagerGateway resourceManagerGateway,
-                       BlobServer blobServer,
-                       HeartbeatServices heartbeatServices,
-                       JobManagerMetricGroup jobManagerMetricGroup,
-                       @Nullable String metricQueryServicePath,
-                       ArchivedExecutionGraphStore archivedExecutionGraphStore,
-                       FatalErrorHandler fatalErrorHandler,
-                       @Nullable String restAddress,
-                       HistoryServerArchivist historyServerArchivist) throws 
Exception {
-
-               // create the default dispatcher
-               return new StandaloneDispatcher(
-                       rpcService,
-                       Dispatcher.DISPATCHER_NAME,
-                       configuration,
-                       highAvailabilityServices,
-                       resourceManagerGateway,
-                       blobServer,
-                       heartbeatServices,
-                       jobManagerMetricGroup,
-                       metricQueryServicePath,
-                       archivedExecutionGraphStore,
-                       Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
-                       fatalErrorHandler,
-                       restAddress,
-                       historyServerArchivist);
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index edff87b783d..127fc8b9831 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -19,24 +19,14 @@
 package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 
-import javax.annotation.Nullable;
-
 /**
  * Entry point for the standalone session cluster.
  */
@@ -47,33 +37,8 @@ public StandaloneSessionClusterEntrypoint(Configuration 
configuration) {
        }
 
        @Override
-       protected ResourceManager<?> createResourceManager(
-                       Configuration configuration,
-                       ResourceID resourceId,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
-                       FatalErrorHandler fatalErrorHandler,
-                       ClusterInformation clusterInformation,
-                       @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-               final ResourceManagerRuntimeServices 
resourceManagerRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
-                       resourceManagerRuntimeServicesConfiguration,
-                       highAvailabilityServices,
-                       rpcService.getScheduledExecutor());
-
-               return new StandaloneResourceManager(
-                       rpcService,
-                       FlinkResourceManager.RESOURCE_MANAGER_NAME,
-                       resourceId,
-                       highAvailabilityServices,
-                       heartbeatServices,
-                       resourceManagerRuntimeServices.getSlotManager(),
-                       metricRegistry,
-                       resourceManagerRuntimeServices.getJobLeaderIdService(),
-                       clusterInformation,
-                       fatalErrorHandler);
+       protected DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+               return new 
SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
        }
 
        public static void main(String[] args) {
@@ -97,6 +62,6 @@ public static void main(String[] args) {
 
                StandaloneSessionClusterEntrypoint entrypoint = new 
StandaloneSessionClusterEntrypoint(configuration);
 
-               entrypoint.startCluster();
+               ClusterEntrypoint.runClusterEntrypoint(entrypoint);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
new file mode 100644
index 00000000000..0a374117841
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rest.RestEndpointFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import akka.actor.ActorSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstract class which implements the creation of the {@link 
DispatcherResourceManagerComponent} components.
+ *
+ * @param <T> type of the {@link Dispatcher}
+ * @param <U> type of the {@link RestfulGateway} given to the {@link 
WebMonitorEndpoint}
+ */
+public abstract class AbstractDispatcherResourceManagerComponentFactory<T 
extends Dispatcher, U extends RestfulGateway> implements 
DispatcherResourceManagerComponentFactory<T> {
+
+       private final Logger log = LoggerFactory.getLogger(getClass());
+
+       @Nonnull
+       private final DispatcherFactory<T> dispatcherFactory;
+
+       @Nonnull
+       private final ResourceManagerFactory<?> resourceManagerFactory;
+
+       @Nonnull
+       private final RestEndpointFactory<U> restEndpointFactory;
+
+       public AbstractDispatcherResourceManagerComponentFactory(
+                       @Nonnull DispatcherFactory<T> dispatcherFactory,
+                       @Nonnull ResourceManagerFactory<?> 
resourceManagerFactory,
+                       @Nonnull RestEndpointFactory<U> restEndpointFactory) {
+               this.dispatcherFactory = dispatcherFactory;
+               this.resourceManagerFactory = resourceManagerFactory;
+               this.restEndpointFactory = restEndpointFactory;
+       }
+
+       @Override
+       public DispatcherResourceManagerComponent<T> create(
+                       Configuration configuration,
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       BlobServer blobServer,
+                       HeartbeatServices heartbeatServices,
+                       MetricRegistry metricRegistry,
+                       ArchivedExecutionGraphStore archivedExecutionGraphStore,
+                       FatalErrorHandler fatalErrorHandler) throws Exception {
+
+               LeaderRetrievalService dispatcherLeaderRetrievalService = null;
+               LeaderRetrievalService resourceManagerRetrievalService = null;
+               WebMonitorEndpoint<U> webMonitorEndpoint = null;
+               ResourceManager<?> resourceManager = null;
+               JobManagerMetricGroup jobManagerMetricGroup = null;
+               T dispatcher = null;
+
+               try {
+                       dispatcherLeaderRetrievalService = 
highAvailabilityServices.getDispatcherLeaderRetriever();
+
+                       resourceManagerRetrievalService = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
+
+                       final LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
+                               rpcService,
+                               DispatcherGateway.class,
+                               DispatcherId::fromUuid,
+                               10,
+                               Time.milliseconds(50L));
+
+                       final LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
+                               rpcService,
+                               ResourceManagerGateway.class,
+                               ResourceManagerId::fromUuid,
+                               10,
+                               Time.milliseconds(50L));
+
+                       // TODO: Remove once we have ported the MetricFetcher 
to the RpcEndpoint
+                       final ActorSystem actorSystem = ((AkkaRpcService) 
rpcService).getActorSystem();
+                       final Time timeout = 
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+
+                       webMonitorEndpoint = 
restEndpointFactory.createRestEndpoint(
+                               configuration,
+                               dispatcherGatewayRetriever,
+                               resourceManagerGatewayRetriever,
+                               blobServer,
+                               rpcService.getExecutor(),
+                               new AkkaQueryServiceRetriever(actorSystem, 
timeout),
+                               
highAvailabilityServices.getWebMonitorLeaderElectionService(),
+                               fatalErrorHandler);
+
+                       log.debug("Starting Dispatcher REST endpoint.");
+                       webMonitorEndpoint.start();
+
+                       resourceManager = 
resourceManagerFactory.createResourceManager(
+                               configuration,
+                               ResourceID.generate(),
+                               rpcService,
+                               highAvailabilityServices,
+                               heartbeatServices,
+                               metricRegistry,
+                               fatalErrorHandler,
+                               new ClusterInformation(rpcService.getAddress(), 
blobServer.getPort()),
+                               webMonitorEndpoint.getRestBaseUrl());
+
+                       jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(
+                               metricRegistry,
+                               rpcService.getAddress(),
+                               
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
+
+                       final HistoryServerArchivist historyServerArchivist = 
HistoryServerArchivist.createHistoryServerArchivist(configuration, 
webMonitorEndpoint);
+
+                       dispatcher = dispatcherFactory.createDispatcher(
+                               configuration,
+                               rpcService,
+                               highAvailabilityServices,
+                               
resourceManager.getSelfGateway(ResourceManagerGateway.class),
+                               blobServer,
+                               heartbeatServices,
+                               jobManagerMetricGroup,
+                               metricRegistry.getMetricQueryServicePath(),
+                               archivedExecutionGraphStore,
+                               fatalErrorHandler,
+                               webMonitorEndpoint.getRestBaseUrl(),
+                               historyServerArchivist);
+
+                       log.debug("Starting ResourceManager.");
+                       resourceManager.start();
+                       
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
+
+                       log.debug("Starting Dispatcher.");
+                       dispatcher.start();
+                       
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
+
+                       return createDispatcherResourceManagerComponent(
+                               dispatcher,
+                               resourceManager,
+                               dispatcherLeaderRetrievalService,
+                               resourceManagerRetrievalService,
+                               webMonitorEndpoint,
+                               jobManagerMetricGroup);
+
+               } catch (Exception exception) {
+                       // clean up all started components
+                       if (dispatcherLeaderRetrievalService != null) {
+                               try {
+                                       dispatcherLeaderRetrievalService.stop();
+                               } catch (Exception e) {
+                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
+                               }
+                       }
+
+                       if (resourceManagerRetrievalService != null) {
+                               try {
+                                       resourceManagerRetrievalService.stop();
+                               } catch (Exception e) {
+                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
+                               }
+                       }
+
+                       final Collection<CompletableFuture<Void>> 
terminationFutures = new ArrayList<>(3);
+
+                       if (webMonitorEndpoint != null) {
+                               
terminationFutures.add(webMonitorEndpoint.closeAsync());
+                       }
+
+                       if (resourceManager != null) {
+                               resourceManager.shutDown();
+                               
terminationFutures.add(resourceManager.getTerminationFuture());
+                       }
+
+                       if (dispatcher != null) {
+                               dispatcher.shutDown();
+                               
terminationFutures.add(dispatcher.getTerminationFuture());
+                       }
+
+                       final FutureUtils.ConjunctFuture<Void> 
terminationFuture = FutureUtils.completeAll(terminationFutures);
+
+                       try {
+                               terminationFuture.get();
+                       } catch (Exception e) {
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+                       }
+
+                       if (jobManagerMetricGroup != null) {
+                               jobManagerMetricGroup.close();
+                       }
+
+                       throw new FlinkException("Could not create the 
DispatcherResourceManagerComponent.", exception);
+               }
+       }
+
+       protected abstract DispatcherResourceManagerComponent<T> 
createDispatcherResourceManagerComponent(
+               T dispatcher,
+               ResourceManager<?> resourceManager,
+               LeaderRetrievalService dispatcherLeaderRetrievalService,
+               LeaderRetrievalService resourceManagerRetrievalService,
+               WebMonitorEndpoint<?> webMonitorEndpoint,
+               JobManagerMetricGroup jobManagerMetricGroup);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
new file mode 100644
index 00000000000..94925b2aba0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Component which starts a {@link Dispatcher}, {@link ResourceManager} and 
{@link WebMonitorEndpoint}
+ * in the same process.
+ */
+public class DispatcherResourceManagerComponent<T extends Dispatcher> 
implements AutoCloseableAsync {
+
+       @Nonnull
+       private final T dispatcher;
+
+       @Nonnull
+       private final ResourceManager<?> resourceManager;
+
+       @Nonnull
+       private final LeaderRetrievalService dispatcherLeaderRetrievalService;
+
+       @Nonnull
+       private final LeaderRetrievalService resourceManagerRetrievalService;
+
+       @Nonnull
+       private final WebMonitorEndpoint<?> webMonitorEndpoint;
+
+       @Nonnull
+       private final JobManagerMetricGroup jobManagerMetricGroup;
+
+       private final CompletableFuture<Void> terminationFuture;
+
+       private final CompletableFuture<ApplicationStatus> shutDownFuture;
+
+       private final AtomicBoolean isRunning = new AtomicBoolean(true);
+
+       DispatcherResourceManagerComponent(
+                       @Nonnull T dispatcher,
+                       @Nonnull ResourceManager<?> resourceManager,
+                       @Nonnull LeaderRetrievalService 
dispatcherLeaderRetrievalService,
+                       @Nonnull LeaderRetrievalService 
resourceManagerRetrievalService,
+                       @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint,
+                       @Nonnull JobManagerMetricGroup jobManagerMetricGroup) {
+               this.resourceManager = resourceManager;
+               this.dispatcher = dispatcher;
+               this.dispatcherLeaderRetrievalService = 
dispatcherLeaderRetrievalService;
+               this.resourceManagerRetrievalService = 
resourceManagerRetrievalService;
+               this.webMonitorEndpoint = webMonitorEndpoint;
+               this.jobManagerMetricGroup = jobManagerMetricGroup;
+               this.terminationFuture = new CompletableFuture<>();
+               this.shutDownFuture = new CompletableFuture<>();
+
+               registerShutDownFuture();
+       }
+
+       private void registerShutDownFuture() {
+               terminationFuture.whenComplete(
+                       (aVoid, throwable) -> {
+                               if (throwable != null) {
+                                       
shutDownFuture.completeExceptionally(throwable);
+                               } else {
+                                       
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+                               }
+                       });
+
+               dispatcher
+                       .getTerminationFuture()
+                       .whenComplete(
+                               (aVoid, throwable) -> {
+                                       if (throwable != null) {
+                                               
shutDownFuture.completeExceptionally(throwable);
+                                       } else {
+                                               
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+                                       }
+                               });
+       }
+
+       public CompletableFuture<Void> getTerminationFuture() {
+               return terminationFuture;
+       }
+
+       public final CompletableFuture<ApplicationStatus> getShutDownFuture() {
+               return shutDownFuture;
+       }
+
+       @Override
+       public CompletableFuture<Void> closeAsync() {
+               if (isRunning.compareAndSet(true, false)) {
+                       Exception exception = null;
+
+                       final Collection<CompletableFuture<Void>> 
terminationFutures = new ArrayList<>(4);
+
+                       try {
+                               dispatcherLeaderRetrievalService.stop();
+                       } catch (Exception e) {
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+                       }
+
+                       try {
+                               resourceManagerRetrievalService.stop();
+                       } catch (Exception e) {
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+                       }
+
+                       terminationFutures.add(webMonitorEndpoint.closeAsync());
+
+                       dispatcher.shutDown();
+                       
terminationFutures.add(dispatcher.getTerminationFuture());
+
+                       resourceManager.shutDown();
+                       
terminationFutures.add(resourceManager.getTerminationFuture());
+
+                       if (exception != null) {
+                               
terminationFutures.add(FutureUtils.completedExceptionally(exception));
+                       }
+
+                       final CompletableFuture<Void> 
componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
+
+                       final CompletableFuture<Void> 
metricGroupTerminationFuture = FutureUtils.runAfterwards(
+                               componentTerminationFuture,
+                               jobManagerMetricGroup::close);
+
+                       metricGroupTerminationFuture.whenComplete((aVoid, 
throwable) -> {
+                               if (throwable != null) {
+                                       
terminationFuture.completeExceptionally(throwable);
+                               } else {
+                                       terminationFuture.complete(aVoid);
+                               }
+                       });
+               }
+
+               return terminationFuture;
+       }
+
+       /**
+        * Deregister the Flink application from the resource management system 
by signalling
+        * the {@link ResourceManager}.
+        *
+        * @param applicationStatus to terminate the application with
+        * @param diagnostics additional information about the shut down, can 
be {@code null}
+        * @return Future which is completed once the shut down
+        */
+       public CompletableFuture<Void> deregisterApplication(ApplicationStatus 
applicationStatus, @Nullable String diagnostics) {
+               final ResourceManagerGateway selfGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
+               return selfGateway.deregisterApplication(applicationStatus, 
diagnostics).thenApply(ack -> null);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
new file mode 100644
index 00000000000..df22a59d955
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Factory for the {@link DispatcherResourceManagerComponent}.
+ */
+public interface DispatcherResourceManagerComponentFactory<T extends 
Dispatcher> {
+
+       DispatcherResourceManagerComponent<T> create(
+               Configuration configuration,
+               RpcService rpcService,
+               HighAvailabilityServices highAvailabilityServices,
+               BlobServer blobServer,
+               HeartbeatServices heartbeatServices,
+               MetricRegistry metricRegistry,
+               ArchivedExecutionGraphStore archivedExecutionGraphStore,
+               FatalErrorHandler fatalErrorHandler) throws Exception;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetriever.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetriever.java
new file mode 100644
index 00000000000..1848408e9fa
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetriever.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * {@link JobGraphRetriever} implementation which retrieves the {@link 
JobGraph} from
+ * a file on disk.
+ */
+public class FileJobGraphRetriever implements JobGraphRetriever {
+
+       public static final ConfigOption<String> JOB_GRAPH_FILE_PATH = 
ConfigOptions
+               .key("internal.jobgraph-path")
+               .defaultValue("job.graph");
+
+       @Nonnull
+       private final String jobGraphFile;
+
+       public FileJobGraphRetriever(@Nonnull String jobGraphFile) {
+               this.jobGraphFile = jobGraphFile;
+       }
+
+       @Override
+       public JobGraph retrieveJobGraph(Configuration configuration) throws 
FlinkException {
+               File fp = new File(jobGraphFile);
+
+               try (FileInputStream input = new FileInputStream(fp);
+                       ObjectInputStream obInput = new 
ObjectInputStream(input)) {
+
+                       return (JobGraph) obInput.readObject();
+               } catch (FileNotFoundException e) {
+                       throw new FlinkException("Could not find the JobGraph 
file.", e);
+               } catch (ClassNotFoundException | IOException e) {
+                       throw new FlinkException("Could not load the JobGraph 
from file.", e);
+               }
+       }
+
+       public static FileJobGraphRetriever createFrom(Configuration 
configuration) {
+               return new 
FileJobGraphRetriever(configuration.getString(JOB_GRAPH_FILE_PATH));
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java
new file mode 100644
index 00000000000..c1df47fe98e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.MiniDispatcher;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link DispatcherResourceManagerComponent} for a job cluster. The 
dispatcher component starts
+ * a {@link MiniDispatcher}.
+ */
+class JobDispatcherResourceManagerComponent extends 
DispatcherResourceManagerComponent<MiniDispatcher> {
+
+       JobDispatcherResourceManagerComponent(
+                       MiniDispatcher dispatcher,
+                       ResourceManager<?> resourceManager,
+                       LeaderRetrievalService dispatcherLeaderRetrievalService,
+                       LeaderRetrievalService resourceManagerRetrievalService,
+                       WebMonitorEndpoint<?> webMonitorEndpoint,
+                       JobManagerMetricGroup jobManagerMetricGroup) {
+               super(dispatcher, resourceManager, 
dispatcherLeaderRetrievalService, resourceManagerRetrievalService, 
webMonitorEndpoint, jobManagerMetricGroup);
+
+               final CompletableFuture<ApplicationStatus> shutDownFuture = 
getShutDownFuture();
+
+               
dispatcher.getJobTerminationFuture().whenComplete((applicationStatus, 
throwable) -> {
+                       if (throwable != null) {
+                               shutDownFuture.completeExceptionally(throwable);
+                       } else {
+                               shutDownFuture.complete(applicationStatus);
+                       }
+               });
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java
new file mode 100644
index 00000000000..c7ce14c1db3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.dispatcher.JobDispatcherFactory;
+import org.apache.flink.runtime.dispatcher.MiniDispatcher;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+
+import javax.annotation.Nonnull;
+
+/**
+ * {@link DispatcherResourceManagerComponentFactory} for a {@link 
JobDispatcherResourceManagerComponent}.
+ */
+public class JobDispatcherResourceManagerComponentFactory extends 
AbstractDispatcherResourceManagerComponentFactory<MiniDispatcher, 
RestfulGateway> {
+
+       public JobDispatcherResourceManagerComponentFactory(@Nonnull 
ResourceManagerFactory<?> resourceManagerFactory, @Nonnull JobGraphRetriever 
jobGraphRetriever) {
+               super(new JobDispatcherFactory(jobGraphRetriever), 
resourceManagerFactory, JobRestEndpointFactory.INSTANCE);
+       }
+
+       @Override
+       protected DispatcherResourceManagerComponent<MiniDispatcher> 
createDispatcherResourceManagerComponent(
+                       MiniDispatcher dispatcher,
+                       ResourceManager<?> resourceManager,
+                       LeaderRetrievalService dispatcherLeaderRetrievalService,
+                       LeaderRetrievalService resourceManagerRetrievalService,
+                       WebMonitorEndpoint<?> webMonitorEndpoint,
+                       JobManagerMetricGroup jobManagerMetricGroup) {
+               return new JobDispatcherResourceManagerComponent(
+                       dispatcher,
+                       resourceManager,
+                       dispatcherLeaderRetrievalService,
+                       resourceManagerRetrievalService,
+                       webMonitorEndpoint,
+                       jobManagerMetricGroup);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobGraphRetriever.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobGraphRetriever.java
new file mode 100644
index 00000000000..b1586ea1afe
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobGraphRetriever.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Interface which allows to retrieve the {@link JobGraph}.
+ */
+public interface JobGraphRetriever {
+
+       /**
+        * Retrieve the {@link JobGraph}.
+        *
+        * @param configuration cluster configuration
+        * @return the retrieved {@link JobGraph}.
+        * @throws FlinkException if the {@link JobGraph} could not be retrieved
+        */
+       JobGraph retrieveJobGraph(Configuration configuration) throws 
FlinkException;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java
new file mode 100644
index 00000000000..8be7b74c86f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+
+/**
+ * {@link DispatcherResourceManagerComponent} used by session clusters.
+ */
+class SessionDispatcherResourceManagerComponent extends 
DispatcherResourceManagerComponent<Dispatcher> {
+       SessionDispatcherResourceManagerComponent(
+                       Dispatcher dispatcher,
+                       ResourceManager<?> resourceManager,
+                       LeaderRetrievalService dispatcherLeaderRetrievalService,
+                       LeaderRetrievalService resourceManagerRetrievalService,
+                       WebMonitorEndpoint<?> webMonitorEndpoint,
+                       JobManagerMetricGroup jobManagerMetricGroup) {
+               super(dispatcher, resourceManager, 
dispatcherLeaderRetrievalService, resourceManagerRetrievalService, 
webMonitorEndpoint, jobManagerMetricGroup);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java
new file mode 100644
index 00000000000..c44833df2ff
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+
+import javax.annotation.Nonnull;
+
+/**
+ * {@link DispatcherResourceManagerComponentFactory} for a {@link 
SessionDispatcherResourceManagerComponent}.
+ */
+public class SessionDispatcherResourceManagerComponentFactory extends 
AbstractDispatcherResourceManagerComponentFactory<Dispatcher, 
DispatcherGateway> {
+
+       public SessionDispatcherResourceManagerComponentFactory(@Nonnull 
ResourceManagerFactory<?> resourceManagerFactory) {
+               super(SessionDispatcherFactory.INSTANCE, 
resourceManagerFactory, SessionRestEndpointFactory.INSTANCE);
+       }
+
+       @Override
+       protected DispatcherResourceManagerComponent<Dispatcher> 
createDispatcherResourceManagerComponent(
+                       Dispatcher dispatcher,
+                       ResourceManager<?> resourceManager,
+                       LeaderRetrievalService dispatcherLeaderRetrievalService,
+                       LeaderRetrievalService resourceManagerRetrievalService,
+                       WebMonitorEndpoint<?> webMonitorEndpoint,
+                       JobManagerMetricGroup jobManagerMetricGroup) {
+               return new SessionDispatcherResourceManagerComponent(
+                       dispatcher,
+                       resourceManager,
+                       dispatcherLeaderRetrievalService,
+                       resourceManagerRetrievalService,
+                       webMonitorEndpoint,
+                       jobManagerMetricGroup);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
new file mode 100644
index 00000000000..91a7b267594
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link ResourceManager} factory.
+ *
+ * @param <T> type of the workers of the ResourceManager
+ */
+public interface ResourceManagerFactory<T extends ResourceIDRetrievable> {
+
+       ResourceManager<T> createResourceManager(
+               Configuration configuration,
+               ResourceID resourceId,
+               RpcService rpcService,
+               HighAvailabilityServices highAvailabilityServices,
+               HeartbeatServices heartbeatServices,
+               MetricRegistry metricRegistry,
+               FatalErrorHandler fatalErrorHandler,
+               ClusterInformation clusterInformation,
+               @Nullable String webInterfaceUrl) throws Exception;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
new file mode 100644
index 00000000000..c8e314f659c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link ResourceManagerFactory} which creates a {@link 
StandaloneResourceManager}.
+ */
+public enum StandaloneResourceManagerFactory implements 
ResourceManagerFactory<ResourceID> {
+       INSTANCE;
+
+       @Override
+       public ResourceManager<ResourceID> createResourceManager(
+                       Configuration configuration,
+                       ResourceID resourceId,
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       HeartbeatServices heartbeatServices,
+                       MetricRegistry metricRegistry,
+                       FatalErrorHandler fatalErrorHandler,
+                       ClusterInformation clusterInformation,
+                       @Nullable String webInterfaceUrl) throws Exception {
+               final ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+               final ResourceManagerRuntimeServices 
resourceManagerRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
+                       resourceManagerRuntimeServicesConfiguration,
+                       highAvailabilityServices,
+                       rpcService.getScheduledExecutor());
+
+               return new StandaloneResourceManager(
+                       rpcService,
+                       FlinkResourceManager.RESOURCE_MANAGER_NAME,
+                       resourceId,
+                       highAvailabilityServices,
+                       heartbeatServices,
+                       resourceManagerRuntimeServices.getSlotManager(),
+                       metricRegistry,
+                       resourceManagerRuntimeServices.getJobLeaderIdService(),
+                       clusterInformation,
+                       fatalErrorHandler);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
new file mode 100644
index 00000000000..da4b0633f40
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link RestEndpointFactory} which creates a {@link 
MiniDispatcherRestEndpoint}.
+ */
+public enum JobRestEndpointFactory implements 
RestEndpointFactory<RestfulGateway> {
+       INSTANCE;
+
+       @Override
+       public WebMonitorEndpoint<RestfulGateway> createRestEndpoint(
+                       Configuration configuration,
+                       LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever,
+                       LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+                       TransientBlobService transientBlobService,
+                       Executor executor,
+                       MetricQueryServiceRetriever metricQueryServiceRetriever,
+                       LeaderElectionService leaderElectionService,
+                       FatalErrorHandler fatalErrorHandler) throws Exception {
+               final RestHandlerConfiguration restHandlerConfiguration = 
RestHandlerConfiguration.fromConfiguration(configuration);
+
+               return new MiniDispatcherRestEndpoint(
+                       
RestServerEndpointConfiguration.fromConfiguration(configuration),
+                       dispatcherGatewayRetriever,
+                       configuration,
+                       restHandlerConfiguration,
+                       resourceManagerGatewayRetriever,
+                       transientBlobService,
+                       executor,
+                       metricQueryServiceRetriever,
+                       leaderElectionService,
+                       fatalErrorHandler);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
new file mode 100644
index 00000000000..ffdc0cbc39e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link WebMonitorEndpoint} factory.
+ *
+ * @param <T> type of the {@link RestfulGateway}
+ */
+public interface RestEndpointFactory<T extends RestfulGateway> {
+
+       WebMonitorEndpoint<T> createRestEndpoint(
+               Configuration configuration,
+               LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever,
+               LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+               TransientBlobService transientBlobService,
+               Executor executor,
+               MetricQueryServiceRetriever metricQueryServiceRetriever,
+               LeaderElectionService leaderElectionService,
+               FatalErrorHandler fatalErrorHandler) throws Exception;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
new file mode 100644
index 00000000000..359efbfa18e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link RestEndpointFactory} which creates a {@link DispatcherRestEndpoint}.
+ */
+public enum SessionRestEndpointFactory implements 
RestEndpointFactory<DispatcherGateway> {
+       INSTANCE;
+
+       @Override
+       public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
+                       Configuration configuration,
+                       LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever,
+                       LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+                       TransientBlobService transientBlobService,
+                       Executor executor,
+                       MetricQueryServiceRetriever metricQueryServiceRetriever,
+                       LeaderElectionService leaderElectionService,
+                       FatalErrorHandler fatalErrorHandler) throws Exception {
+               final RestHandlerConfiguration restHandlerConfiguration = 
RestHandlerConfiguration.fromConfiguration(configuration);
+
+               return new DispatcherRestEndpoint(
+                       
RestServerEndpointConfiguration.fromConfiguration(configuration),
+                       dispatcherGatewayRetriever,
+                       configuration,
+                       restHandlerConfiguration,
+                       resourceManagerGatewayRetriever,
+                       transientBlobService,
+                       executor,
+                       metricQueryServiceRetriever,
+                       leaderElectionService,
+                       fatalErrorHandler);
+       }
+}
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index 482e2b70cb6..42e666eda88 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -19,48 +19,28 @@
 package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import 
org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.yarn.YarnResourceManager;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 
-import javax.annotation.Nullable;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * Entry point for Yarn per-job clusters.
  */
 public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 
-       /** The job graph file path. */
-       public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
-
        private final String workingDirectory;
 
        public YarnJobClusterEntrypoint(
@@ -82,58 +62,10 @@ protected String getRPCPortRange(Configuration 
configuration) {
        }
 
        @Override
-       protected ResourceManager<?> createResourceManager(
-                       Configuration configuration,
-                       ResourceID resourceId,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
-                       FatalErrorHandler fatalErrorHandler,
-                       ClusterInformation clusterInformation,
-                       @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-               final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
-                       rmServicesConfiguration,
-                       highAvailabilityServices,
-                       rpcService.getScheduledExecutor());
-
-               return new YarnResourceManager(
-                       rpcService,
-                       ResourceManager.RESOURCE_MANAGER_NAME,
-                       resourceId,
-                       configuration,
-                       System.getenv(),
-                       highAvailabilityServices,
-                       heartbeatServices,
-                       rmRuntimeServices.getSlotManager(),
-                       metricRegistry,
-                       rmRuntimeServices.getJobLeaderIdService(),
-                       clusterInformation,
-                       fatalErrorHandler,
-                       webInterfaceUrl);
-       }
-
-       @Override
-       protected JobGraph retrieveJobGraph(Configuration configuration) throws 
FlinkException {
-               String jobGraphFile = 
configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
-               File fp = new File(jobGraphFile);
-
-               try (FileInputStream input = new FileInputStream(fp);
-                       ObjectInputStream obInput = new 
ObjectInputStream(input)) {
-
-                       return (JobGraph) obInput.readObject();
-               } catch (FileNotFoundException e) {
-                       throw new FlinkException("Could not find the JobGraph 
file.", e);
-               } catch (ClassNotFoundException | IOException e) {
-                       throw new FlinkException("Could not load the JobGraph 
from file.", e);
-               }
-       }
-
-       @Override
-       protected void 
registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) 
{
-               terminationFuture.thenAccept((status) ->
-                       shutDownAndTerminate(status.processExitCode(), status, 
null, true));
+       protected DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+               return new JobDispatcherResourceManagerComponentFactory(
+                       YarnResourceManagerFactory.INSTANCE,
+                       FileJobGraphRetriever.createFrom(configuration));
        }
 
        // 
------------------------------------------------------------------------
@@ -167,6 +99,6 @@ public static void main(String[] args) {
                        configuration,
                        workingDirectory);
 
-               yarnJobClusterEntrypoint.startCluster();
+               
ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
        }
 }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
new file mode 100644
index 00000000000..bfd1b4a07ed
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.yarn.YarnResourceManager;
+import org.apache.flink.yarn.YarnWorkerNode;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link ResourceManagerFactory} implementation which creates a {@link 
YarnResourceManager}.
+ */
+public enum YarnResourceManagerFactory implements 
ResourceManagerFactory<YarnWorkerNode> {
+       INSTANCE;
+
+       @Override
+       public ResourceManager<YarnWorkerNode> createResourceManager(
+                       Configuration configuration,
+                       ResourceID resourceId,
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       HeartbeatServices heartbeatServices,
+                       MetricRegistry metricRegistry,
+                       FatalErrorHandler fatalErrorHandler,
+                       ClusterInformation clusterInformation,
+                       @Nullable String webInterfaceUrl) throws Exception {
+               final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+               final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
+                       rmServicesConfiguration,
+                       highAvailabilityServices,
+                       rpcService.getScheduledExecutor());
+
+               return new YarnResourceManager(
+                       rpcService,
+                       ResourceManager.RESOURCE_MANAGER_NAME,
+                       resourceId,
+                       configuration,
+                       System.getenv(),
+                       highAvailabilityServices,
+                       heartbeatServices,
+                       rmRuntimeServices.getSlotManager(),
+                       metricRegistry,
+                       rmRuntimeServices.getJobLeaderIdService(),
+                       clusterInformation,
+                       fatalErrorHandler,
+                       webInterfaceUrl);
+       }
+}
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index 6c928611910..0f4656e62a4 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -19,29 +19,19 @@
 package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.yarn.YarnResourceManager;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.Map;
 
@@ -70,36 +60,8 @@ protected String getRPCPortRange(Configuration 
configuration) {
        }
 
        @Override
-       protected ResourceManager<?> createResourceManager(
-                       Configuration configuration,
-                       ResourceID resourceId,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
-                       FatalErrorHandler fatalErrorHandler,
-                       ClusterInformation clusterInformation,
-                       @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-               final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
-                       rmServicesConfiguration,
-                       highAvailabilityServices,
-                       rpcService.getScheduledExecutor());
-
-               return new YarnResourceManager(
-                       rpcService,
-                       ResourceManager.RESOURCE_MANAGER_NAME,
-                       resourceId,
-                       configuration,
-                       System.getenv(),
-                       highAvailabilityServices,
-                       heartbeatServices,
-                       rmRuntimeServices.getSlotManager(),
-                       metricRegistry,
-                       rmRuntimeServices.getJobLeaderIdService(),
-                       clusterInformation,
-                       fatalErrorHandler,
-                       webInterfaceUrl);
+       protected DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+               return new 
SessionDispatcherResourceManagerComponentFactory(YarnResourceManagerFactory.INSTANCE);
        }
 
        public static void main(String[] args) {
@@ -128,6 +90,6 @@ public static void main(String[] args) {
                        configuration,
                        workingDirectory);
 
-               yarnSessionClusterEntrypoint.startCluster();
+               
ClusterEntrypoint.runClusterEntrypoint(yarnSessionClusterEntrypoint);
        }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to