This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1fcbc9f816460b1a159fd79b4c9bcf83327d79c3
Author: Till Rohrmann <[email protected]>
AuthorDate: Sun Sep 23 13:39:34 2018 +0200

    [FLINK-10411] Make ClusterEntrypoint more compositional
    
    Introduce a ClusterComponent which encapsulates the logic for starting the 
cluster
    components, Dispatcher, RestServerEndpoint and the ResourceManager. The 
individual
    components are created by using a factory instance. The ClusterEntrypoint 
is now
    only responsible for managing the required services needed by the 
ClusterComponent.
    This design should make the testing of these components easier, improve 
reusability
    and reduce code duplication.
---
 .../entrypoint/ClassPathJobGraphRetriever.java     |  80 +++++
 .../entrypoint/StandaloneJobClusterEntryPoint.java |  85 +----
 ...st.java => ClassPathJobGraphRetrieverTest.java} |  18 +-
 .../apache/flink/container/entrypoint/TestJob.java |   2 +-
 .../entrypoint/MesosJobClusterEntrypoint.java      |  89 +-----
 .../entrypoint/MesosSessionClusterEntrypoint.java  |  61 +---
 .../MesosResourceManagerFactory.java               |  90 ++++++
 .../runtime/dispatcher/DispatcherFactory.java      |  53 ++++
 .../runtime/dispatcher/JobDispatcherFactory.java   |  86 +++++
 .../dispatcher/SessionDispatcherFactory.java       |  69 ++++
 .../flink/runtime/entrypoint/ClusterComponent.java | 347 +++++++++++++++++++++
 .../runtime/entrypoint/ClusterEntrypoint.java      | 284 +++--------------
 .../runtime/entrypoint/FileJobGraphRetriever.java  |  70 +++++
 .../runtime/entrypoint/JobClusterComponent.java    |  51 +++
 .../runtime/entrypoint/JobClusterEntrypoint.java   |  98 ------
 .../runtime/entrypoint/JobGraphRetriever.java      |  38 +++
 .../entrypoint/SessionClusterComponent.java        |  34 ++
 .../entrypoint/SessionClusterEntrypoint.java       |  79 -----
 .../StandaloneSessionClusterEntrypoint.java        |  43 +--
 .../resourcemanager/ResourceManagerFactory.java    |  50 +++
 .../StandaloneResourceManagerFactory.java}         |  46 +--
 .../flink/runtime/rest/JobRestEndpointFactory.java |  66 ++++
 .../flink/runtime/rest/RestEndpointFactory.java    |  50 +++
 .../runtime/rest/SessionRestEndpointFactory.java   |  65 ++++
 .../yarn/entrypoint/YarnJobClusterEntrypoint.java  |  83 +----
 ...ypoint.java => YarnResourceManagerFactory.java} |  69 +---
 .../entrypoint/YarnSessionClusterEntrypoint.java   |  47 +--
 27 files changed, 1255 insertions(+), 898 deletions(-)

diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
new file mode 100644
index 0000000..5ded04b
--- /dev/null
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.entrypoint.JobGraphRetriever;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+/**
+ * {@link JobGraphRetriever} which creates the {@link JobGraph} from a class
+ * on the class path.
+ */
+public class ClassPathJobGraphRetriever implements JobGraphRetriever {
+
+       @Nonnull
+       private final String jobClassName;
+
+       @Nonnull
+       private final SavepointRestoreSettings savepointRestoreSettings;
+
+       @Nonnull
+       private final String[] programArguments;
+
+       public ClassPathJobGraphRetriever(
+                       @Nonnull String jobClassName,
+                       @Nonnull SavepointRestoreSettings 
savepointRestoreSettings,
+                       @Nonnull String[] programArguments) {
+               this.jobClassName = jobClassName;
+               this.savepointRestoreSettings = savepointRestoreSettings;
+               this.programArguments = programArguments;
+       }
+
+       @Override
+       public JobGraph retrieveJobGraph(Configuration configuration) throws 
FlinkException {
+               final PackagedProgram packagedProgram = createPackagedProgram();
+               final int defaultParallelism = 
configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+               try {
+                       final JobGraph jobGraph = 
PackagedProgramUtils.createJobGraph(packagedProgram, configuration, 
defaultParallelism);
+                       jobGraph.setAllowQueuedScheduling(true);
+                       
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
+
+                       return jobGraph;
+               } catch (Exception e) {
+                       throw new FlinkException("Could not create the JobGraph 
from the provided user code jar.", e);
+               }
+       }
+
+       private PackagedProgram createPackagedProgram() throws FlinkException {
+               try {
+                       final Class<?> mainClass = 
getClass().getClassLoader().loadClass(jobClassName);
+                       return new PackagedProgram(mainClass, programArguments);
+               } catch (ClassNotFoundException | ProgramInvocationException e) 
{
+                       throw new FlinkException("Could not load the provided 
entrypoint class.", e);
+               }
+       }
+}
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index 0e40095..cdd44b5 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -18,38 +18,20 @@
 
 package org.apache.flink.container.entrypoint;
 
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.PackagedProgramUtils;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterComponent;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -79,62 +61,10 @@ public final class StandaloneJobClusterEntryPoint extends 
JobClusterEntrypoint {
        }
 
        @Override
-       protected JobGraph retrieveJobGraph(Configuration configuration) throws 
FlinkException {
-               final PackagedProgram packagedProgram = createPackagedProgram();
-               final int defaultParallelism = 
configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
-               try {
-                       final JobGraph jobGraph = 
PackagedProgramUtils.createJobGraph(packagedProgram, configuration, 
defaultParallelism);
-                       jobGraph.setAllowQueuedScheduling(true);
-                       
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
-
-                       return jobGraph;
-               } catch (Exception e) {
-                       throw new FlinkException("Could not create the JobGraph 
from the provided user code jar.", e);
-               }
-       }
-
-       private PackagedProgram createPackagedProgram() throws FlinkException {
-               try {
-                       final Class<?> mainClass = 
getClass().getClassLoader().loadClass(jobClassName);
-                       return new PackagedProgram(mainClass, programArguments);
-               } catch (ClassNotFoundException | ProgramInvocationException e) 
{
-                       throw new FlinkException("Could not load the provided 
entrypoint class.", e);
-               }
-       }
-
-       @Override
-       protected void 
registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) 
{
-               terminationFuture.thenAccept((status) -> 
shutDownAndTerminate(0, ApplicationStatus.SUCCEEDED, null, true));
-       }
-
-       @Override
-       protected ResourceManager<?> createResourceManager(
-                       Configuration configuration,
-                       ResourceID resourceId,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
-                       FatalErrorHandler fatalErrorHandler,
-                       ClusterInformation clusterInformation,
-                       @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-               final ResourceManagerRuntimeServices 
resourceManagerRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
-                       resourceManagerRuntimeServicesConfiguration,
-                       highAvailabilityServices,
-                       rpcService.getScheduledExecutor());
-
-               return new StandaloneResourceManager(
-                       rpcService,
-                       ResourceManager.RESOURCE_MANAGER_NAME,
-                       resourceId,
-                       highAvailabilityServices,
-                       heartbeatServices,
-                       resourceManagerRuntimeServices.getSlotManager(),
-                       metricRegistry,
-                       resourceManagerRuntimeServices.getJobLeaderIdService(),
-                       clusterInformation,
-                       fatalErrorHandler);
+       protected ClusterComponent<?> createDispatcherComponent(Configuration 
configuration) {
+               return new JobClusterComponent(
+                       StandaloneResourceManagerFactory.INSTANCE,
+                       new ClassPathJobGraphRetriever(jobClassName, 
savepointRestoreSettings, programArguments));
        }
 
        public static void main(String[] args) {
@@ -166,5 +96,4 @@ public final class StandaloneJobClusterEntryPoint extends 
JobClusterEntrypoint {
 
                entrypoint.startCluster();
        }
-
 }
diff --git 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
similarity index 81%
rename from 
flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
rename to 
flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
index 2faec4c..6e460e1 100644
--- 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
+++ 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
@@ -32,24 +32,24 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
 /**
- * Tests for the {@link StandaloneJobClusterEntryPoint}.
+ * Tests for the {@link ClassPathJobGraphRetriever}.
  */
-public class StandaloneJobClusterEntryPointTest extends TestLogger {
+public class ClassPathJobGraphRetrieverTest extends TestLogger {
 
        public static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"};
 
        @Test
        public void testJobGraphRetrieval() throws FlinkException {
-               final Configuration configuration = new Configuration();
                final int parallelism = 42;
+               final Configuration configuration = new Configuration();
                configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, 
parallelism);
-               final StandaloneJobClusterEntryPoint 
standaloneJobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
-                       configuration,
+
+               final ClassPathJobGraphRetriever classPathJobGraphRetriever = 
new ClassPathJobGraphRetriever(
                        TestJob.class.getCanonicalName(),
                        SavepointRestoreSettings.none(),
                        PROGRAM_ARGUMENTS);
 
-               final JobGraph jobGraph = 
standaloneJobClusterEntryPoint.retrieveJobGraph(configuration);
+               final JobGraph jobGraph = 
classPathJobGraphRetriever.retrieveJobGraph(configuration);
 
                assertThat(jobGraph.getName(), 
is(equalTo(TestJob.class.getCanonicalName() + "-suffix")));
                assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
@@ -59,13 +59,13 @@ public class StandaloneJobClusterEntryPointTest extends 
TestLogger {
        public void testSavepointRestoreSettings() throws FlinkException {
                final Configuration configuration = new Configuration();
                final SavepointRestoreSettings savepointRestoreSettings = 
SavepointRestoreSettings.forPath("foobar", true);
-               final StandaloneJobClusterEntryPoint jobClusterEntryPoint = new 
StandaloneJobClusterEntryPoint(
-                       configuration,
+
+               final ClassPathJobGraphRetriever classPathJobGraphRetriever = 
new ClassPathJobGraphRetriever(
                        TestJob.class.getCanonicalName(),
                        savepointRestoreSettings,
                        PROGRAM_ARGUMENTS);
 
-               final JobGraph jobGraph = 
jobClusterEntryPoint.retrieveJobGraph(configuration);
+               final JobGraph jobGraph = 
classPathJobGraphRetriever.retrieveJobGraph(configuration);
 
                assertThat(jobGraph.getSavepointRestoreSettings(), 
is(equalTo(savepointRestoreSettings)));
        }
diff --git 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
index ada434d..27d1f32 100644
--- 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
+++ 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 
 /**
- * Test job which is used for {@link StandaloneJobClusterEntryPointTest}.
+ * Test job which is used for {@link ClassPathJobGraphRetrieverTest}.
  */
 public class TestJob {
 
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 38b61d8..ed2175a 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -20,31 +20,21 @@ package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerFactory;
 import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterComponent;
+import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever;
+import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.cli.CommandLine;
@@ -52,13 +42,6 @@ import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 
-import javax.annotation.Nullable;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -66,8 +49,6 @@ import java.util.concurrent.CompletableFuture;
  */
 public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 
-       public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
-
        // 
------------------------------------------------------------------------
        //  Command-line options
        // 
------------------------------------------------------------------------
@@ -114,58 +95,6 @@ public class MesosJobClusterEntrypoint extends 
JobClusterEntrypoint {
        }
 
        @Override
-       protected ResourceManager<?> createResourceManager(
-                       Configuration configuration,
-                       ResourceID resourceId,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
-                       FatalErrorHandler fatalErrorHandler,
-                       ClusterInformation clusterInformation,
-                       @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-               final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
-                       rmServicesConfiguration,
-                       highAvailabilityServices,
-                       rpcService.getScheduledExecutor());
-
-               return new MesosResourceManager(
-                       rpcService,
-                       ResourceManager.RESOURCE_MANAGER_NAME,
-                       resourceId,
-                       highAvailabilityServices,
-                       heartbeatServices,
-                       rmRuntimeServices.getSlotManager(),
-                       metricRegistry,
-                       rmRuntimeServices.getJobLeaderIdService(),
-                       clusterInformation,
-                       fatalErrorHandler,
-                       configuration,
-                       mesosServices,
-                       schedulerConfiguration,
-                       taskManagerParameters,
-                       taskManagerContainerSpec,
-                       webInterfaceUrl);
-       }
-
-       @Override
-       protected JobGraph retrieveJobGraph(Configuration configuration) throws 
FlinkException {
-               String jobGraphFile = 
configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
-               File fp = new File(jobGraphFile);
-
-               try (FileInputStream input = new FileInputStream(fp);
-                       ObjectInputStream obInput = new 
ObjectInputStream(input)) {
-
-                       return (JobGraph) obInput.readObject();
-               } catch (FileNotFoundException e) {
-                       throw new FlinkException("Could not find the JobGraph 
file.", e);
-               } catch (ClassNotFoundException | IOException e) {
-                       throw new FlinkException("Could not load the JobGraph 
from file.", e);
-               }
-       }
-
-       @Override
        protected CompletableFuture<Void> stopClusterServices(boolean 
cleanupHaData) {
                final CompletableFuture<Void> serviceShutDownFuture = 
super.stopClusterServices(cleanupHaData);
 
@@ -179,7 +108,15 @@ public class MesosJobClusterEntrypoint extends 
JobClusterEntrypoint {
        }
 
        @Override
-       protected void 
registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) 
{}
+       protected ClusterComponent<?> createDispatcherComponent(Configuration 
configuration) {
+               return new JobClusterComponent(
+                       new MesosResourceManagerFactory(
+                               mesosServices,
+                               schedulerConfiguration,
+                               taskManagerParameters,
+                               taskManagerContainerSpec),
+                       FileJobGraphRetriever.createFrom(configuration));
+       }
 
        public static void main(String[] args) {
                // startup checks and logging
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index 70369f6..0cc3053 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -20,25 +20,17 @@ package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerFactory;
 import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterComponent;
+import org.apache.flink.runtime.entrypoint.SessionClusterComponent;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
@@ -49,8 +41,6 @@ import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 
-import javax.annotation.Nullable;
-
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -104,42 +94,6 @@ public class MesosSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
        }
 
        @Override
-       protected ResourceManager<?> createResourceManager(
-                       Configuration configuration,
-                       ResourceID resourceId,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
-                       FatalErrorHandler fatalErrorHandler,
-                       ClusterInformation clusterInformation,
-                       @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-               final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
-                       rmServicesConfiguration,
-                       highAvailabilityServices,
-                       rpcService.getScheduledExecutor());
-
-               return new MesosResourceManager(
-                       rpcService,
-                       ResourceManager.RESOURCE_MANAGER_NAME,
-                       resourceId,
-                       highAvailabilityServices,
-                       heartbeatServices,
-                       rmRuntimeServices.getSlotManager(),
-                       metricRegistry,
-                       rmRuntimeServices.getJobLeaderIdService(),
-                       clusterInformation,
-                       fatalErrorHandler,
-                       configuration,
-                       mesosServices,
-                       mesosConfig,
-                       taskManagerParameters,
-                       taskManagerContainerSpec,
-                       webInterfaceUrl);
-       }
-
-       @Override
        protected CompletableFuture<Void> stopClusterServices(boolean 
cleanupHaData) {
                final CompletableFuture<Void> serviceShutDownFuture = 
super.stopClusterServices(cleanupHaData);
 
@@ -152,6 +106,15 @@ public class MesosSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
                        });
        }
 
+       @Override
+       protected ClusterComponent<?> createDispatcherComponent(Configuration 
configuration) {
+               return new SessionClusterComponent(new 
MesosResourceManagerFactory(
+                       mesosServices,
+                       mesosConfig,
+                       taskManagerParameters,
+                       taskManagerContainerSpec));
+       }
+
        public static void main(String[] args) {
                // startup checks and logging
                EnvironmentInformation.logEnvironmentInfo(LOG, 
MesosSessionClusterEntrypoint.class.getSimpleName(), args);
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
new file mode 100644
index 0000000..9582e9f
--- /dev/null
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * {@link ResourceManagerFactory} which creates a {@link MesosResourceManager}.
+ */
+public class MesosResourceManagerFactory implements 
ResourceManagerFactory<RegisteredMesosWorkerNode> {
+
+       @Nonnull
+       private final MesosServices mesosServices;
+
+       @Nonnull
+       private final MesosConfiguration schedulerConfiguration;
+
+       @Nonnull
+       private final MesosTaskManagerParameters taskManagerParameters;
+
+       @Nonnull
+       private final ContainerSpecification taskManagerContainerSpec;
+
+       public MesosResourceManagerFactory(@Nonnull MesosServices 
mesosServices, @Nonnull MesosConfiguration schedulerConfiguration, @Nonnull 
MesosTaskManagerParameters taskManagerParameters, @Nonnull 
ContainerSpecification taskManagerContainerSpec) {
+               this.mesosServices = mesosServices;
+               this.schedulerConfiguration = schedulerConfiguration;
+               this.taskManagerParameters = taskManagerParameters;
+               this.taskManagerContainerSpec = taskManagerContainerSpec;
+       }
+
+       @Override
+       public ResourceManager<RegisteredMesosWorkerNode> 
createResourceManager(Configuration configuration, ResourceID resourceId, 
RpcService rpcService, HighAvailabilityServices highAvailabilityServices, 
HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, 
FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, 
@Nullable String webInterfaceUrl) throws Exception {
+               final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+               final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
+                       rmServicesConfiguration,
+                       highAvailabilityServices,
+                       rpcService.getScheduledExecutor());
+
+               return new MesosResourceManager(
+                       rpcService,
+                       ResourceManager.RESOURCE_MANAGER_NAME,
+                       resourceId,
+                       highAvailabilityServices,
+                       heartbeatServices,
+                       rmRuntimeServices.getSlotManager(),
+                       metricRegistry,
+                       rmRuntimeServices.getJobLeaderIdService(),
+                       clusterInformation,
+                       fatalErrorHandler,
+                       configuration,
+                       mesosServices,
+                       schedulerConfiguration,
+                       taskManagerParameters,
+                       taskManagerContainerSpec,
+                       webInterfaceUrl);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
new file mode 100644
index 0000000..5952299
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link Dispatcher} factory interface.
+ */
+public interface DispatcherFactory<T extends Dispatcher> {
+
+       /**
+        * Create a {@link Dispatcher} of the given type {@link T}.
+        */
+       T createDispatcher(
+               Configuration configuration,
+               RpcService rpcService,
+               HighAvailabilityServices highAvailabilityServices,
+               ResourceManagerGateway resourceManagerGateway,
+               BlobServer blobServer,
+               HeartbeatServices heartbeatServices,
+               JobManagerMetricGroup jobManagerMetricGroup,
+               @Nullable String metricQueryServicePath,
+               ArchivedExecutionGraphStore archivedExecutionGraphStore,
+               FatalErrorHandler fatalErrorHandler,
+               @Nullable String restAddress,
+               HistoryServerArchivist historyServerArchivist) throws Exception;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
new file mode 100644
index 0000000..488f2fc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.JobGraphRetriever;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+import static 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.EXECUTION_MODE;
+
+/**
+ * {@link DispatcherFactory} which creates a {@link MiniDispatcher}.
+ */
+public class JobDispatcherFactory implements DispatcherFactory<MiniDispatcher> 
{
+
+       private final JobGraphRetriever jobGraphRetriever;
+
+       public JobDispatcherFactory(JobGraphRetriever jobGraphRetriever) {
+               this.jobGraphRetriever = jobGraphRetriever;
+       }
+
+       @Override
+       public MiniDispatcher createDispatcher(
+                       Configuration configuration,
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       ResourceManagerGateway resourceManagerGateway,
+                       BlobServer blobServer,
+                       HeartbeatServices heartbeatServices,
+                       JobManagerMetricGroup jobManagerMetricGroup,
+                       @Nullable String metricQueryServicePath,
+                       ArchivedExecutionGraphStore archivedExecutionGraphStore,
+                       FatalErrorHandler fatalErrorHandler,
+                       @Nullable String restAddress,
+                       HistoryServerArchivist historyServerArchivist) throws 
Exception {
+               final JobGraph jobGraph = 
jobGraphRetriever.retrieveJobGraph(configuration);
+
+               final String executionModeValue = 
configuration.getString(EXECUTION_MODE);
+
+               final ClusterEntrypoint.ExecutionMode executionMode = 
ClusterEntrypoint.ExecutionMode.valueOf(executionModeValue);
+
+               return new MiniDispatcher(
+                       rpcService,
+                       Dispatcher.DISPATCHER_NAME,
+                       configuration,
+                       highAvailabilityServices,
+                       resourceManagerGateway,
+                       blobServer,
+                       heartbeatServices,
+                       jobManagerMetricGroup,
+                       metricQueryServicePath,
+                       archivedExecutionGraphStore,
+                       Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+                       fatalErrorHandler,
+                       restAddress,
+                       historyServerArchivist,
+                       jobGraph,
+                       executionMode);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
new file mode 100644
index 0000000..18e29a0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link DispatcherFactory} which creates a {@link StandaloneDispatcher}.
+ */
+public enum SessionDispatcherFactory implements DispatcherFactory<Dispatcher> {
+       INSTANCE;
+
+       @Override
+       public Dispatcher createDispatcher(
+                               Configuration configuration,
+                               RpcService rpcService,
+                               HighAvailabilityServices 
highAvailabilityServices,
+                               ResourceManagerGateway resourceManagerGateway,
+                               BlobServer blobServer,
+                               HeartbeatServices heartbeatServices,
+                               JobManagerMetricGroup jobManagerMetricGroup,
+                               @Nullable String metricQueryServicePath,
+                               ArchivedExecutionGraphStore 
archivedExecutionGraphStore,
+                               FatalErrorHandler fatalErrorHandler,
+                               @Nullable String restAddress,
+                               HistoryServerArchivist historyServerArchivist) 
throws Exception {
+               // create the default dispatcher
+               return new StandaloneDispatcher(
+                       rpcService,
+                       Dispatcher.DISPATCHER_NAME,
+                       configuration,
+                       highAvailabilityServices,
+                       resourceManagerGateway,
+                       blobServer,
+                       heartbeatServices,
+                       jobManagerMetricGroup,
+                       metricQueryServicePath,
+                       archivedExecutionGraphStore,
+                       Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+                       fatalErrorHandler,
+                       restAddress,
+                       historyServerArchivist);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java
new file mode 100644
index 0000000..af02729
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rest.RestEndpointFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Component which starts a {@link Dispatcher}, {@link ResourceManager} and 
{@link WebMonitorEndpoint}
+ * in the same process.
+ */
+public class ClusterComponent<T extends Dispatcher> implements 
AutoCloseableAsync {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ClusterComponent.class);
+
+       private final Object lock = new Object();
+
+       private final DispatcherFactory<T> dispatcherFactory;
+
+       private final ResourceManagerFactory<?> resourceManagerFactory;
+
+       private final RestEndpointFactory<?> restEndpointFactory;
+
+       private final CompletableFuture<Void> terminationFuture;
+
+       private final CompletableFuture<ApplicationStatus> shutDownFuture;
+
+       @GuardedBy("lock")
+       private State state;
+
+       @GuardedBy("lock")
+       private ResourceManager<?> resourceManager;
+
+       @GuardedBy("lock")
+       private T dispatcher;
+
+       @GuardedBy("lock")
+       private LeaderRetrievalService dispatcherLeaderRetrievalService;
+
+       @GuardedBy("lock")
+       private LeaderRetrievalService resourceManagerRetrievalService;
+
+       @GuardedBy("lock")
+       private WebMonitorEndpoint<?> webMonitorEndpoint;
+
+       @GuardedBy("lock")
+       private JobManagerMetricGroup jobManagerMetricGroup;
+
+       public ClusterComponent(
+                       DispatcherFactory<T> dispatcherFactory,
+                       ResourceManagerFactory<?> resourceManagerFactory,
+                       RestEndpointFactory<?> restEndpointFactory) {
+               this.dispatcherFactory = dispatcherFactory;
+               this.resourceManagerFactory = resourceManagerFactory;
+               this.restEndpointFactory = restEndpointFactory;
+               this.terminationFuture = new CompletableFuture<>();
+               this.shutDownFuture = new CompletableFuture<>();
+               this.state = State.CREATED;
+
+               terminationFuture.whenComplete(
+                       (aVoid, throwable) -> {
+                               if (throwable != null) {
+                                       
shutDownFuture.completeExceptionally(throwable);
+                               } else {
+                                       
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+                               }
+                       });
+       }
+
+       public T getDispatcher() {
+               synchronized (lock) {
+                       return dispatcher;
+               }
+       }
+
+       public CompletableFuture<Void> getTerminationFuture() {
+               return terminationFuture;
+       }
+
+       public CompletableFuture<ApplicationStatus> getShutDownFuture() {
+               return shutDownFuture;
+       }
+
+       public void startComponent(
+                       Configuration configuration,
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       BlobServer blobServer,
+                       HeartbeatServices heartbeatServices,
+                       MetricRegistry metricRegistry,
+                       ArchivedExecutionGraphStore archivedExecutionGraphStore,
+                       FatalErrorHandler fatalErrorHandler) throws Exception {
+               synchronized (lock) {
+                       Preconditions.checkState(state == State.CREATED);
+                       state = State.RUNNING;
+
+                       dispatcherLeaderRetrievalService = 
highAvailabilityServices.getDispatcherLeaderRetriever();
+
+                       resourceManagerRetrievalService = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
+
+                       LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
+                               rpcService,
+                               DispatcherGateway.class,
+                               DispatcherId::fromUuid,
+                               10,
+                               Time.milliseconds(50L));
+
+                       LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
+                               rpcService,
+                               ResourceManagerGateway.class,
+                               ResourceManagerId::fromUuid,
+                               10,
+                               Time.milliseconds(50L));
+
+                       // TODO: Remove once we have ported the MetricFetcher 
to the RpcEndpoint
+                       final ActorSystem actorSystem = ((AkkaRpcService) 
rpcService).getActorSystem();
+                       final Time timeout = 
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+
+                       webMonitorEndpoint = 
restEndpointFactory.createRestEndpoint(
+                               configuration,
+                               dispatcherGatewayRetriever,
+                               resourceManagerGatewayRetriever,
+                               blobServer,
+                               rpcService.getExecutor(),
+                               new AkkaQueryServiceRetriever(actorSystem, 
timeout),
+                               
highAvailabilityServices.getWebMonitorLeaderElectionService(),
+                               fatalErrorHandler);
+
+                       LOG.debug("Starting Dispatcher REST endpoint.");
+                       webMonitorEndpoint.start();
+
+                       resourceManager = 
resourceManagerFactory.createResourceManager(
+                               configuration,
+                               ResourceID.generate(),
+                               rpcService,
+                               highAvailabilityServices,
+                               heartbeatServices,
+                               metricRegistry,
+                               fatalErrorHandler,
+                               new ClusterInformation(rpcService.getAddress(), 
blobServer.getPort()),
+                               webMonitorEndpoint.getRestBaseUrl());
+
+                       jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(
+                               metricRegistry,
+                               rpcService.getAddress(),
+                               
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
+
+                       final HistoryServerArchivist historyServerArchivist = 
HistoryServerArchivist.createHistoryServerArchivist(configuration, 
webMonitorEndpoint);
+
+                       dispatcher = dispatcherFactory.createDispatcher(
+                               configuration,
+                               rpcService,
+                               highAvailabilityServices,
+                               
resourceManager.getSelfGateway(ResourceManagerGateway.class),
+                               blobServer,
+                               heartbeatServices,
+                               jobManagerMetricGroup,
+                               metricRegistry.getMetricQueryServicePath(),
+                               archivedExecutionGraphStore,
+                               fatalErrorHandler,
+                               webMonitorEndpoint.getRestBaseUrl(),
+                               historyServerArchivist);
+
+                       registerShutDownFuture(dispatcher, shutDownFuture);
+
+                       LOG.debug("Starting ResourceManager.");
+                       resourceManager.start();
+                       
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
+
+                       LOG.debug("Starting Dispatcher.");
+                       dispatcher.start();
+                       
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
+               }
+       }
+
+       protected void registerShutDownFuture(T dispatcher, 
CompletableFuture<ApplicationStatus> shutDownFuture) {
+                       dispatcher
+                               .getTerminationFuture()
+                               .whenComplete(
+                                       (aVoid, throwable) -> {
+                                               if (throwable != null) {
+                                                       
shutDownFuture.completeExceptionally(throwable);
+                                               } else {
+                                                       
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+                                               }
+                                       });
+       }
+
+       @Override
+       public CompletableFuture<Void> closeAsync() {
+               synchronized (lock) {
+                       if (state == State.RUNNING) {
+                               Exception exception = null;
+
+                               final Collection<CompletableFuture<Void>> 
terminationFutures = new ArrayList<>(4);
+
+                               if (dispatcherLeaderRetrievalService != null) {
+                                       try {
+                                               
dispatcherLeaderRetrievalService.stop();
+                                       } catch (Exception e) {
+                                               exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
+                                       }
+                               }
+
+                               if (resourceManagerRetrievalService != null) {
+                                       try {
+                                               
resourceManagerRetrievalService.stop();
+                                       } catch (Exception e) {
+                                               exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
+                                       }
+                               }
+
+                               if (webMonitorEndpoint != null) {
+                                       
terminationFutures.add(webMonitorEndpoint.closeAsync());
+                               }
+
+                               if (dispatcher != null) {
+                                       dispatcher.shutDown();
+                                       
terminationFutures.add(dispatcher.getTerminationFuture());
+                               }
+
+                               if (resourceManager != null) {
+                                       resourceManager.shutDown();
+                                       
terminationFutures.add(resourceManager.getTerminationFuture());
+                               }
+
+                               if (exception != null) {
+                                       
terminationFutures.add(FutureUtils.completedExceptionally(exception));
+                               }
+
+                               final CompletableFuture<Void> 
componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
+
+                               final CompletableFuture<Void> 
metricGroupTerminationFuture;
+
+                               if (jobManagerMetricGroup != null) {
+                                       metricGroupTerminationFuture = 
FutureUtils.runAfterwards(
+                                               componentTerminationFuture,
+                                               () -> {
+                                                       synchronized (lock) {
+                                                               
jobManagerMetricGroup.close();
+                                                       }
+                                               });
+                               } else {
+                                       metricGroupTerminationFuture = 
componentTerminationFuture;
+                               }
+
+                               
metricGroupTerminationFuture.whenComplete((aVoid, throwable) -> {
+                                       if (throwable != null) {
+                                               
terminationFuture.completeExceptionally(throwable);
+                                       } else {
+                                               
terminationFuture.complete(aVoid);
+                                       }
+                               });
+                       } else if (state == State.CREATED) {
+                               terminationFuture.complete(null);
+                       }
+
+                       state = State.TERMINATED;
+                       return terminationFuture;
+               }
+       }
+
+       /**
+        * Deregister the Flink application from the resource management system 
by signalling
+        * the {@link ResourceManager}.
+        *
+        * @param applicationStatus to terminate the application with
+        * @param diagnostics additional information about the shut down, can 
be {@code null}
+        * @return Future which is completed once the shut down
+        */
+       public CompletableFuture<Void> deregisterApplication(ApplicationStatus 
applicationStatus, @Nullable String diagnostics) {
+               synchronized (lock) {
+                       if (resourceManager != null) {
+                               final ResourceManagerGateway selfGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
+                               return 
selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack 
-> null);
+                       } else {
+                               return CompletableFuture.completedFuture(null);
+                       }
+               }
+       }
+
+       enum State {
+               CREATED,
+               RUNNING,
+               TERMINATED
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 1a8c058..39e1265 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -32,32 +32,19 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.TransientBlobCache;
-import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.DispatcherId;
-import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
@@ -65,11 +52,6 @@ import 
org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
-import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
@@ -109,7 +91,6 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
 
        protected static final Logger LOG = 
LoggerFactory.getLogger(ClusterEntrypoint.class);
 
-       protected static final int SUCCESS_RETURN_CODE = 0;
        protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
        protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
 
@@ -125,6 +106,9 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
        private final AtomicBoolean isShutDown = new AtomicBoolean(false);
 
        @GuardedBy("lock")
+       private ClusterComponent<?> dispatcherComponent;
+
+       @GuardedBy("lock")
        private MetricRegistryImpl metricRegistry;
 
        @GuardedBy("lock")
@@ -140,32 +124,11 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
        private RpcService commonRpcService;
 
        @GuardedBy("lock")
-       private ResourceManager<?> resourceManager;
-
-       @GuardedBy("lock")
-       private Dispatcher dispatcher;
-
-       @GuardedBy("lock")
-       private LeaderRetrievalService dispatcherLeaderRetrievalService;
-
-       @GuardedBy("lock")
-       private LeaderRetrievalService resourceManagerRetrievalService;
-
-       @GuardedBy("lock")
-       private WebMonitorEndpoint<?> webMonitorEndpoint;
-
-       @GuardedBy("lock")
        private ArchivedExecutionGraphStore archivedExecutionGraphStore;
 
        @GuardedBy("lock")
        private TransientBlobCache transientBlobCache;
 
-       @GuardedBy("lock")
-       private ClusterInformation clusterInformation;
-
-       @GuardedBy("lock")
-       private JobManagerMetricGroup jobManagerMetricGroup;
-
        private final Thread shutDownHook;
 
        protected ClusterEntrypoint(Configuration configuration) {
@@ -223,7 +186,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                return SecurityUtils.getInstalledContext();
        }
 
-       protected void runCluster(Configuration configuration) throws Exception 
{
+       private void runCluster(Configuration configuration) throws Exception {
                synchronized (lock) {
                        initializeServices(configuration);
 
@@ -231,27 +194,35 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                        configuration.setString(JobManagerOptions.ADDRESS, 
commonRpcService.getAddress());
                        configuration.setInteger(JobManagerOptions.PORT, 
commonRpcService.getPort());
 
-                       startClusterComponents(
+                       dispatcherComponent = 
createDispatcherComponent(configuration);
+
+                       dispatcherComponent.startComponent(
                                configuration,
                                commonRpcService,
                                haServices,
                                blobServer,
                                heartbeatServices,
-                               metricRegistry);
+                               metricRegistry,
+                               archivedExecutionGraphStore,
+                               this);
 
-                       dispatcher.getTerminationFuture().whenComplete(
-                               (Void value, Throwable throwable) -> {
+                       dispatcherComponent.getShutDownFuture().whenComplete(
+                               (ApplicationStatus applicationStatus, Throwable 
throwable) -> {
                                        if (throwable != null) {
-                                               LOG.info("Could not properly 
terminate the Dispatcher.", throwable);
+                                               shutDownAndTerminate(
+                                                       
RUNTIME_FAILURE_RETURN_CODE,
+                                                       
ApplicationStatus.UNKNOWN,
+                                                       
ExceptionUtils.stringifyException(throwable),
+                                                       false);
+                                       } else {
+                                               // This is the general shutdown 
path. If a separate more specific shutdown was
+                                               // already triggered, this will 
do nothing
+                                               shutDownAndTerminate(
+                                                       
applicationStatus.processExitCode(),
+                                                       applicationStatus,
+                                                       null,
+                                                       true);
                                        }
-
-                                       // This is the general shutdown path. 
If a separate more specific shutdown was
-                                       // already triggered, this will do 
nothing
-                                       shutDownAndTerminate(
-                                               SUCCESS_RETURN_CODE,
-                                               ApplicationStatus.SUCCEEDED,
-                                               throwable != null ? 
throwable.getMessage() : null,
-                                               true);
                                });
                }
        }
@@ -283,99 +254,11 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
 
                        archivedExecutionGraphStore = 
createSerializableExecutionGraphStore(configuration, 
commonRpcService.getScheduledExecutor());
 
-                       clusterInformation = new ClusterInformation(
-                               commonRpcService.getAddress(),
-                               blobServer.getPort());
-
                        transientBlobCache = new TransientBlobCache(
                                configuration,
                                new InetSocketAddress(
-                                       
clusterInformation.getBlobServerHostname(),
-                                       
clusterInformation.getBlobServerPort()));
-               }
-       }
-
-       protected void startClusterComponents(
-                       Configuration configuration,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       BlobServer blobServer,
-                       HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry) throws Exception {
-               synchronized (lock) {
-                       dispatcherLeaderRetrievalService = 
highAvailabilityServices.getDispatcherLeaderRetriever();
-
-                       resourceManagerRetrievalService = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
-
-                       LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
-                               rpcService,
-                               DispatcherGateway.class,
-                               DispatcherId::fromUuid,
-                               10,
-                               Time.milliseconds(50L));
-
-                       LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
-                               rpcService,
-                               ResourceManagerGateway.class,
-                               ResourceManagerId::fromUuid,
-                               10,
-                               Time.milliseconds(50L));
-
-                       // TODO: Remove once we have ported the MetricFetcher 
to the RpcEndpoint
-                       final ActorSystem actorSystem = ((AkkaRpcService) 
rpcService).getActorSystem();
-                       final Time timeout = 
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
-
-                       webMonitorEndpoint = createRestEndpoint(
-                               configuration,
-                               dispatcherGatewayRetriever,
-                               resourceManagerGatewayRetriever,
-                               transientBlobCache,
-                               rpcService.getExecutor(),
-                               new AkkaQueryServiceRetriever(actorSystem, 
timeout),
-                               
highAvailabilityServices.getWebMonitorLeaderElectionService());
-
-                       LOG.debug("Starting Dispatcher REST endpoint.");
-                       webMonitorEndpoint.start();
-
-                       resourceManager = createResourceManager(
-                               configuration,
-                               ResourceID.generate(),
-                               rpcService,
-                               highAvailabilityServices,
-                               heartbeatServices,
-                               metricRegistry,
-                               this,
-                               clusterInformation,
-                               webMonitorEndpoint.getRestBaseUrl());
-
-                       jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(
-                               metricRegistry,
-                               rpcService.getAddress(),
-                               
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
-
-                       final HistoryServerArchivist historyServerArchivist = 
HistoryServerArchivist.createHistoryServerArchivist(configuration, 
webMonitorEndpoint);
-
-                       dispatcher = createDispatcher(
-                               configuration,
-                               rpcService,
-                               highAvailabilityServices,
-                               
resourceManager.getSelfGateway(ResourceManagerGateway.class),
-                               blobServer,
-                               heartbeatServices,
-                               jobManagerMetricGroup,
-                               metricRegistry.getMetricQueryServicePath(),
-                               archivedExecutionGraphStore,
-                               this,
-                               webMonitorEndpoint.getRestBaseUrl(),
-                               historyServerArchivist);
-
-                       LOG.debug("Starting ResourceManager.");
-                       resourceManager.start();
-                       
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
-
-                       LOG.debug("Starting Dispatcher.");
-                       dispatcher.start();
-                       
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
+                                       commonRpcService.getAddress(),
+                                       blobServer.getPort()));
                }
        }
 
@@ -477,63 +360,6 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                }
        }
 
-       protected CompletableFuture<Void> stopClusterComponents() {
-               synchronized (lock) {
-
-                       Exception exception = null;
-
-                       final Collection<CompletableFuture<Void>> 
terminationFutures = new ArrayList<>(4);
-
-                       if (dispatcherLeaderRetrievalService != null) {
-                               try {
-                                       dispatcherLeaderRetrievalService.stop();
-                               } catch (Exception e) {
-                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
-                               }
-                       }
-
-                       if (resourceManagerRetrievalService != null) {
-                               try {
-                                       resourceManagerRetrievalService.stop();
-                               } catch (Exception e) {
-                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
-                               }
-                       }
-
-                       if (webMonitorEndpoint != null) {
-                               
terminationFutures.add(webMonitorEndpoint.closeAsync());
-                       }
-
-                       if (dispatcher != null) {
-                               dispatcher.shutDown();
-                               
terminationFutures.add(dispatcher.getTerminationFuture());
-                       }
-
-                       if (resourceManager != null) {
-                               resourceManager.shutDown();
-                               
terminationFutures.add(resourceManager.getTerminationFuture());
-                       }
-
-                       if (exception != null) {
-                               
terminationFutures.add(FutureUtils.completedExceptionally(exception));
-                       }
-
-                       final CompletableFuture<Void> 
componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
-
-                       if (jobManagerMetricGroup != null) {
-                               return FutureUtils.runAfterwards(
-                                       componentTerminationFuture,
-                                       () -> {
-                                               synchronized (lock) {
-                                                       
jobManagerMetricGroup.close();
-                                               }
-                                       });
-                       } else {
-                               return componentTerminationFuture;
-                       }
-               }
-       }
-
        @Override
        public void onFatalError(Throwable exception) {
                LOG.error("Fatal error occurred in the cluster entrypoint.", 
exception);
@@ -563,14 +389,10 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                if (isShutDown.compareAndSet(false, true)) {
                        LOG.info("Stopping {}.", getClass().getSimpleName());
 
-                       final CompletableFuture<Void> shutDownApplicationFuture 
= deregisterApplication(applicationStatus, diagnostics);
-
-                       final CompletableFuture<Void> componentShutdownFuture = 
FutureUtils.composeAfterwards(
-                               shutDownApplicationFuture,
-                               this::stopClusterComponents);
+                       final CompletableFuture<Void> shutDownApplicationFuture 
= closeClusterComponent(applicationStatus, diagnostics);
 
                        final CompletableFuture<Void> serviceShutdownFuture = 
FutureUtils.composeAfterwards(
-                               componentShutdownFuture,
+                               shutDownApplicationFuture,
                                () -> stopClusterServices(cleanupHaData));
 
                        final CompletableFuture<Void> cleanupDirectoriesFuture 
= FutureUtils.runAfterwards(
@@ -597,10 +419,11 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                boolean cleanupHaData) {
 
                if (isTerminating.compareAndSet(false, true)) {
-                       LOG.info("Shut down and terminate {} with return code 
{} and application status {}.",
+                       LOG.info("Shut down and terminate {} with return code 
{} and application status {}. Diagnostics {}.",
                                getClass().getSimpleName(),
                                returnCode,
-                               applicationStatus);
+                               applicationStatus,
+                               diagnostics);
 
                        shutDownAsync(
                                cleanupHaData,
@@ -628,11 +451,12 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
         * @param diagnostics additional information about the shut down, can 
be {@code null}
         * @return Future which is completed once the shut down
         */
-       private CompletableFuture<Void> deregisterApplication(ApplicationStatus 
applicationStatus, @Nullable String diagnostics) {
+       private CompletableFuture<Void> closeClusterComponent(ApplicationStatus 
applicationStatus, @Nullable String diagnostics) {
                synchronized (lock) {
-                       if (resourceManager != null) {
-                               final ResourceManagerGateway selfGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
-                               return 
selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack 
-> null);
+                       if (dispatcherComponent != null) {
+                               final CompletableFuture<Void> 
deregisterApplicationFuture = 
dispatcherComponent.deregisterApplication(applicationStatus, diagnostics);
+
+                               return 
FutureUtils.runAfterwards(deregisterApplicationFuture, 
dispatcherComponent::closeAsync);
                        } else {
                                return CompletableFuture.completedFuture(null);
                        }
@@ -656,39 +480,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
        // Abstract methods
        // --------------------------------------------------
 
-       protected abstract Dispatcher createDispatcher(
-               Configuration configuration,
-               RpcService rpcService,
-               HighAvailabilityServices highAvailabilityServices,
-               ResourceManagerGateway resourceManagerGateway,
-               BlobServer blobServer,
-               HeartbeatServices heartbeatServices,
-               JobManagerMetricGroup jobManagerMetricGroup,
-               @Nullable String metricQueryServicePath,
-               ArchivedExecutionGraphStore archivedExecutionGraphStore,
-               FatalErrorHandler fatalErrorHandler,
-               @Nullable String restAddress,
-               HistoryServerArchivist historyServerArchivist) throws Exception;
-
-       protected abstract ResourceManager<?> createResourceManager(
-               Configuration configuration,
-               ResourceID resourceId,
-               RpcService rpcService,
-               HighAvailabilityServices highAvailabilityServices,
-               HeartbeatServices heartbeatServices,
-               MetricRegistry metricRegistry,
-               FatalErrorHandler fatalErrorHandler,
-               ClusterInformation clusterInformation,
-               @Nullable String webInterfaceUrl) throws Exception;
-
-       protected abstract WebMonitorEndpoint<?> createRestEndpoint(
-               Configuration configuration,
-               LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever,
-               LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
-               TransientBlobService transientBlobService,
-               Executor executor,
-               MetricQueryServiceRetriever metricQueryServiceRetriever,
-               LeaderElectionService leaderElectionService) throws Exception;
+       protected abstract ClusterComponent<?> 
createDispatcherComponent(Configuration configuration);
 
        protected abstract ArchivedExecutionGraphStore 
createSerializableExecutionGraphStore(
                Configuration configuration,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java
new file mode 100644
index 0000000..7a194f6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * {@link JobGraphRetriever} implementation which retrieves the {@link 
JobGraph} from
+ * a file on disk.
+ */
+public class FileJobGraphRetriever implements JobGraphRetriever {
+
+       public static final ConfigOption<String> JOB_GRAPH_FILE_PATH = 
ConfigOptions
+               .key("internal.jobgraph-path")
+               .defaultValue("job.graph");
+
+       @Nonnull
+       private final String jobGraphFile;
+
+       public FileJobGraphRetriever(@Nonnull String jobGraphFile) {
+               this.jobGraphFile = jobGraphFile;
+       }
+
+       @Override
+       public JobGraph retrieveJobGraph(Configuration configuration) throws 
FlinkException {
+               File fp = new File(jobGraphFile);
+
+               try (FileInputStream input = new FileInputStream(fp);
+                       ObjectInputStream obInput = new 
ObjectInputStream(input)) {
+
+                       return (JobGraph) obInput.readObject();
+               } catch (FileNotFoundException e) {
+                       throw new FlinkException("Could not find the JobGraph 
file.", e);
+               } catch (ClassNotFoundException | IOException e) {
+                       throw new FlinkException("Could not load the JobGraph 
from file.", e);
+               }
+       }
+
+       public static FileJobGraphRetriever createFrom(Configuration 
configuration) {
+               return new 
FileJobGraphRetriever(configuration.getString(JOB_GRAPH_FILE_PATH));
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterComponent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterComponent.java
new file mode 100644
index 0000000..17583ac
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterComponent.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.JobDispatcherFactory;
+import org.apache.flink.runtime.dispatcher.MiniDispatcher;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link ClusterComponent} for a job cluster. The dispatcher component starts
+ * a {@link MiniDispatcher}.
+ */
+public class JobClusterComponent extends ClusterComponent<MiniDispatcher> {
+
+       public JobClusterComponent(ResourceManagerFactory<?> 
resourceManagerFactory, JobGraphRetriever jobActions) {
+               super(new JobDispatcherFactory(jobActions), 
resourceManagerFactory, JobRestEndpointFactory.INSTANCE);
+       }
+
+       @Override
+       protected void registerShutDownFuture(MiniDispatcher dispatcher, 
CompletableFuture<ApplicationStatus> shutDownFuture) {
+               super.registerShutDownFuture(dispatcher, shutDownFuture);
+
+               
dispatcher.getJobTerminationFuture().whenComplete((applicationStatus, 
throwable) -> {
+                       if (throwable != null) {
+                               shutDownFuture.completeExceptionally(throwable);
+                       } else {
+                               shutDownFuture.complete(applicationStatus);
+                       }
+               });
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 80a9da2..0426df9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -19,35 +19,9 @@
 package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.TransientBlobService;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.MiniDispatcher;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
-import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.FlinkException;
-
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 
 /**
  * Base class for per-job cluster entry points.
@@ -59,81 +33,9 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
        }
 
        @Override
-       protected MiniDispatcherRestEndpoint createRestEndpoint(
-                       Configuration configuration,
-                       LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever,
-                       LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
-                       TransientBlobService transientBlobService,
-                       Executor executor,
-                       MetricQueryServiceRetriever metricQueryServiceRetriever,
-                       LeaderElectionService leaderElectionService) throws 
Exception {
-               final RestHandlerConfiguration restHandlerConfiguration = 
RestHandlerConfiguration.fromConfiguration(configuration);
-
-               return new MiniDispatcherRestEndpoint(
-                       
RestServerEndpointConfiguration.fromConfiguration(configuration),
-                       dispatcherGatewayRetriever,
-                       configuration,
-                       restHandlerConfiguration,
-                       resourceManagerGatewayRetriever,
-                       transientBlobService,
-                       executor,
-                       metricQueryServiceRetriever,
-                       leaderElectionService,
-                       this);
-       }
-
-       @Override
        protected ArchivedExecutionGraphStore 
createSerializableExecutionGraphStore(
                        Configuration configuration,
                        ScheduledExecutor scheduledExecutor) {
                return new MemoryArchivedExecutionGraphStore();
        }
-
-       @Override
-       protected Dispatcher createDispatcher(
-                       Configuration configuration,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       ResourceManagerGateway resourceManagerGateway,
-                       BlobServer blobServer,
-                       HeartbeatServices heartbeatServices,
-                       JobManagerMetricGroup jobManagerMetricGroup,
-                       @Nullable String metricQueryServicePath,
-                       ArchivedExecutionGraphStore archivedExecutionGraphStore,
-                       FatalErrorHandler fatalErrorHandler,
-                       @Nullable String restAddress,
-                       HistoryServerArchivist historyServerArchivist) throws 
Exception {
-
-               final JobGraph jobGraph = retrieveJobGraph(configuration);
-
-               final String executionModeValue = 
configuration.getString(EXECUTION_MODE);
-
-               final ExecutionMode executionMode = 
ExecutionMode.valueOf(executionModeValue);
-
-               final MiniDispatcher dispatcher = new MiniDispatcher(
-                       rpcService,
-                       Dispatcher.DISPATCHER_NAME,
-                       configuration,
-                       highAvailabilityServices,
-                       resourceManagerGateway,
-                       blobServer,
-                       heartbeatServices,
-                       jobManagerMetricGroup,
-                       metricQueryServicePath,
-                       archivedExecutionGraphStore,
-                       Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
-                       fatalErrorHandler,
-                       restAddress,
-                       historyServerArchivist,
-                       jobGraph,
-                       executionMode);
-
-               registerShutdownActions(dispatcher.getJobTerminationFuture());
-
-               return dispatcher;
-       }
-
-       protected abstract JobGraph retrieveJobGraph(Configuration 
configuration) throws FlinkException;
-
-       protected abstract void 
registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java
new file mode 100644
index 0000000..e2ace15
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Interface which allows to retrieve the {@link JobGraph}.
+ */
+public interface JobGraphRetriever {
+
+       /**
+        * Retrieve the {@link JobGraph}.
+        *
+        * @param configuration cluster configuration
+        * @return the retrieved {@link JobGraph}.
+        * @throws FlinkException if the {@link JobGraph} could not be retrieved
+        */
+       JobGraph retrieveJobGraph(Configuration configuration) throws 
FlinkException;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterComponent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterComponent.java
new file mode 100644
index 0000000..8ab0701
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterComponent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
+
+/**
+ * {@link ClusterComponent} used by session clusters.
+ */
+public class SessionClusterComponent extends ClusterComponent<Dispatcher> {
+
+       public SessionClusterComponent(ResourceManagerFactory<?> 
resourceManagerFactory) {
+               super(SessionDispatcherFactory.INSTANCE, 
resourceManagerFactory, SessionRestEndpointFactory.INSTANCE);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 40eb8b7..1fb693c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -22,35 +22,14 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
-import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
-import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
 import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
 
-import javax.annotation.Nullable;
-
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.Executor;
 
 /**
  * Base class for session cluster entry points.
@@ -77,62 +56,4 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        scheduledExecutor,
                        Ticker.systemTicker());
        }
-
-       @Override
-       protected DispatcherRestEndpoint createRestEndpoint(
-                       Configuration configuration,
-                       LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever,
-                       LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
-                       TransientBlobService transientBlobService,
-                       Executor executor,
-                       MetricQueryServiceRetriever metricQueryServiceRetriever,
-                       LeaderElectionService leaderElectionService) throws 
Exception {
-
-               final RestHandlerConfiguration restHandlerConfiguration = 
RestHandlerConfiguration.fromConfiguration(configuration);
-
-               return new DispatcherRestEndpoint(
-                       
RestServerEndpointConfiguration.fromConfiguration(configuration),
-                       dispatcherGatewayRetriever,
-                       configuration,
-                       restHandlerConfiguration,
-                       resourceManagerGatewayRetriever,
-                       transientBlobService,
-                       executor,
-                       metricQueryServiceRetriever,
-                       leaderElectionService,
-                       this);
-       }
-
-       @Override
-       protected Dispatcher createDispatcher(
-                       Configuration configuration,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       ResourceManagerGateway resourceManagerGateway,
-                       BlobServer blobServer,
-                       HeartbeatServices heartbeatServices,
-                       JobManagerMetricGroup jobManagerMetricGroup,
-                       @Nullable String metricQueryServicePath,
-                       ArchivedExecutionGraphStore archivedExecutionGraphStore,
-                       FatalErrorHandler fatalErrorHandler,
-                       @Nullable String restAddress,
-                       HistoryServerArchivist historyServerArchivist) throws 
Exception {
-
-               // create the default dispatcher
-               return new StandaloneDispatcher(
-                       rpcService,
-                       Dispatcher.DISPATCHER_NAME,
-                       configuration,
-                       highAvailabilityServices,
-                       resourceManagerGateway,
-                       blobServer,
-                       heartbeatServices,
-                       jobManagerMetricGroup,
-                       metricQueryServicePath,
-                       archivedExecutionGraphStore,
-                       Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
-                       fatalErrorHandler,
-                       restAddress,
-                       historyServerArchivist);
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index edff87b..e92248c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -19,24 +19,12 @@
 package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 
-import javax.annotation.Nullable;
-
 /**
  * Entry point for the standalone session cluster.
  */
@@ -47,33 +35,8 @@ public class StandaloneSessionClusterEntrypoint extends 
SessionClusterEntrypoint
        }
 
        @Override
-       protected ResourceManager<?> createResourceManager(
-                       Configuration configuration,
-                       ResourceID resourceId,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
-                       FatalErrorHandler fatalErrorHandler,
-                       ClusterInformation clusterInformation,
-                       @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-               final ResourceManagerRuntimeServices 
resourceManagerRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
-                       resourceManagerRuntimeServicesConfiguration,
-                       highAvailabilityServices,
-                       rpcService.getScheduledExecutor());
-
-               return new StandaloneResourceManager(
-                       rpcService,
-                       FlinkResourceManager.RESOURCE_MANAGER_NAME,
-                       resourceId,
-                       highAvailabilityServices,
-                       heartbeatServices,
-                       resourceManagerRuntimeServices.getSlotManager(),
-                       metricRegistry,
-                       resourceManagerRuntimeServices.getJobLeaderIdService(),
-                       clusterInformation,
-                       fatalErrorHandler);
+       protected ClusterComponent<?> createDispatcherComponent(Configuration 
configuration) {
+               return new 
SessionClusterComponent(StandaloneResourceManagerFactory.INSTANCE);
        }
 
        public static void main(String[] args) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
new file mode 100644
index 0000000..91a7b26
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link ResourceManager} factory.
+ *
+ * @param <T> type of the workers of the ResourceManager
+ */
+public interface ResourceManagerFactory<T extends ResourceIDRetrievable> {
+
+       ResourceManager<T> createResourceManager(
+               Configuration configuration,
+               ResourceID resourceId,
+               RpcService rpcService,
+               HighAvailabilityServices highAvailabilityServices,
+               HeartbeatServices heartbeatServices,
+               MetricRegistry metricRegistry,
+               FatalErrorHandler fatalErrorHandler,
+               ClusterInformation clusterInformation,
+               @Nullable String webInterfaceUrl) throws Exception;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
similarity index 56%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
index edff87b..c8e314f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
@@ -16,38 +16,28 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.entrypoint;
+package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.JvmShutdownSafeguard;
-import org.apache.flink.runtime.util.SignalHandler;
 
 import javax.annotation.Nullable;
 
 /**
- * Entry point for the standalone session cluster.
+ * {@link ResourceManagerFactory} which creates a {@link 
StandaloneResourceManager}.
  */
-public class StandaloneSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
-
-       public StandaloneSessionClusterEntrypoint(Configuration configuration) {
-               super(configuration);
-       }
+public enum StandaloneResourceManagerFactory implements 
ResourceManagerFactory<ResourceID> {
+       INSTANCE;
 
        @Override
-       protected ResourceManager<?> createResourceManager(
+       public ResourceManager<ResourceID> createResourceManager(
                        Configuration configuration,
                        ResourceID resourceId,
                        RpcService rpcService,
@@ -75,28 +65,4 @@ public class StandaloneSessionClusterEntrypoint extends 
SessionClusterEntrypoint
                        clusterInformation,
                        fatalErrorHandler);
        }
-
-       public static void main(String[] args) {
-               // startup checks and logging
-               EnvironmentInformation.logEnvironmentInfo(LOG, 
StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
-               SignalHandler.register(LOG);
-               JvmShutdownSafeguard.installAsShutdownHook(LOG);
-
-               EntrypointClusterConfiguration entrypointClusterConfiguration = 
null;
-               final CommandLineParser<EntrypointClusterConfiguration> 
commandLineParser = new CommandLineParser<>(new 
EntrypointClusterConfigurationParserFactory());
-
-               try {
-                       entrypointClusterConfiguration = 
commandLineParser.parse(args);
-               } catch (FlinkParseException e) {
-                       LOG.error("Could not parse command line arguments {}.", 
args, e);
-                       
commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName());
-                       System.exit(1);
-               }
-
-               Configuration configuration = 
loadConfiguration(entrypointClusterConfiguration);
-
-               StandaloneSessionClusterEntrypoint entrypoint = new 
StandaloneSessionClusterEntrypoint(configuration);
-
-               entrypoint.startCluster();
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
new file mode 100644
index 0000000..da4b063
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link RestEndpointFactory} which creates a {@link 
MiniDispatcherRestEndpoint}.
+ */
+public enum JobRestEndpointFactory implements 
RestEndpointFactory<RestfulGateway> {
+       INSTANCE;
+
+       @Override
+       public WebMonitorEndpoint<RestfulGateway> createRestEndpoint(
+                       Configuration configuration,
+                       LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever,
+                       LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+                       TransientBlobService transientBlobService,
+                       Executor executor,
+                       MetricQueryServiceRetriever metricQueryServiceRetriever,
+                       LeaderElectionService leaderElectionService,
+                       FatalErrorHandler fatalErrorHandler) throws Exception {
+               final RestHandlerConfiguration restHandlerConfiguration = 
RestHandlerConfiguration.fromConfiguration(configuration);
+
+               return new MiniDispatcherRestEndpoint(
+                       
RestServerEndpointConfiguration.fromConfiguration(configuration),
+                       dispatcherGatewayRetriever,
+                       configuration,
+                       restHandlerConfiguration,
+                       resourceManagerGatewayRetriever,
+                       transientBlobService,
+                       executor,
+                       metricQueryServiceRetriever,
+                       leaderElectionService,
+                       fatalErrorHandler);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
new file mode 100644
index 0000000..ffdc0cb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link WebMonitorEndpoint} factory.
+ *
+ * @param <T> type of the {@link RestfulGateway}
+ */
+public interface RestEndpointFactory<T extends RestfulGateway> {
+
+       WebMonitorEndpoint<T> createRestEndpoint(
+               Configuration configuration,
+               LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever,
+               LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+               TransientBlobService transientBlobService,
+               Executor executor,
+               MetricQueryServiceRetriever metricQueryServiceRetriever,
+               LeaderElectionService leaderElectionService,
+               FatalErrorHandler fatalErrorHandler) throws Exception;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
new file mode 100644
index 0000000..359efbf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link RestEndpointFactory} which creates a {@link DispatcherRestEndpoint}.
+ */
+public enum SessionRestEndpointFactory implements 
RestEndpointFactory<DispatcherGateway> {
+       INSTANCE;
+
+       @Override
+       public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
+                       Configuration configuration,
+                       LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever,
+                       LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+                       TransientBlobService transientBlobService,
+                       Executor executor,
+                       MetricQueryServiceRetriever metricQueryServiceRetriever,
+                       LeaderElectionService leaderElectionService,
+                       FatalErrorHandler fatalErrorHandler) throws Exception {
+               final RestHandlerConfiguration restHandlerConfiguration = 
RestHandlerConfiguration.fromConfiguration(configuration);
+
+               return new DispatcherRestEndpoint(
+                       
RestServerEndpointConfiguration.fromConfiguration(configuration),
+                       dispatcherGatewayRetriever,
+                       configuration,
+                       restHandlerConfiguration,
+                       resourceManagerGatewayRetriever,
+                       transientBlobService,
+                       executor,
+                       metricQueryServiceRetriever,
+                       leaderElectionService,
+                       fatalErrorHandler);
+       }
+}
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index 482e2b7..a52975a 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -19,48 +19,27 @@
 package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterComponent;
+import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever;
+import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.yarn.YarnResourceManager;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 
-import javax.annotation.Nullable;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * Entry point for Yarn per-job clusters.
  */
 public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 
-       /** The job graph file path. */
-       public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
-
        private final String workingDirectory;
 
        public YarnJobClusterEntrypoint(
@@ -82,58 +61,10 @@ public class YarnJobClusterEntrypoint extends 
JobClusterEntrypoint {
        }
 
        @Override
-       protected ResourceManager<?> createResourceManager(
-                       Configuration configuration,
-                       ResourceID resourceId,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
-                       FatalErrorHandler fatalErrorHandler,
-                       ClusterInformation clusterInformation,
-                       @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-               final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
-                       rmServicesConfiguration,
-                       highAvailabilityServices,
-                       rpcService.getScheduledExecutor());
-
-               return new YarnResourceManager(
-                       rpcService,
-                       ResourceManager.RESOURCE_MANAGER_NAME,
-                       resourceId,
-                       configuration,
-                       System.getenv(),
-                       highAvailabilityServices,
-                       heartbeatServices,
-                       rmRuntimeServices.getSlotManager(),
-                       metricRegistry,
-                       rmRuntimeServices.getJobLeaderIdService(),
-                       clusterInformation,
-                       fatalErrorHandler,
-                       webInterfaceUrl);
-       }
-
-       @Override
-       protected JobGraph retrieveJobGraph(Configuration configuration) throws 
FlinkException {
-               String jobGraphFile = 
configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
-               File fp = new File(jobGraphFile);
-
-               try (FileInputStream input = new FileInputStream(fp);
-                       ObjectInputStream obInput = new 
ObjectInputStream(input)) {
-
-                       return (JobGraph) obInput.readObject();
-               } catch (FileNotFoundException e) {
-                       throw new FlinkException("Could not find the JobGraph 
file.", e);
-               } catch (ClassNotFoundException | IOException e) {
-                       throw new FlinkException("Could not load the JobGraph 
from file.", e);
-               }
-       }
-
-       @Override
-       protected void 
registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) 
{
-               terminationFuture.thenAccept((status) ->
-                       shutDownAndTerminate(status.processExitCode(), status, 
null, true));
+       protected ClusterComponent<?> createDispatcherComponent(Configuration 
configuration) {
+               return new JobClusterComponent(
+                       YarnResourceManagerFactory.INSTANCE,
+                       FileJobGraphRetriever.createFrom(configuration));
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
similarity index 54%
copy from 
flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
copy to 
flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
index 6c92861..bfd1b4a 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
@@ -21,56 +21,28 @@ package org.apache.flink.yarn.entrypoint;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
-import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
 import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.security.SecurityContext;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.JvmShutdownSafeguard;
-import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.YarnResourceManager;
-import org.apache.flink.yarn.configuration.YarnConfigOptions;
-
-import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.flink.yarn.YarnWorkerNode;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
-import java.util.Map;
-
 /**
- * Entry point for Yarn session clusters.
+ * {@link ResourceManagerFactory} implementation which creates a {@link 
YarnResourceManager}.
  */
-public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
-
-       private final String workingDirectory;
-
-       public YarnSessionClusterEntrypoint(
-                       Configuration configuration,
-                       String workingDirectory) {
-               super(configuration);
-               this.workingDirectory = 
Preconditions.checkNotNull(workingDirectory);
-       }
+public enum YarnResourceManagerFactory implements 
ResourceManagerFactory<YarnWorkerNode> {
+       INSTANCE;
 
        @Override
-       protected SecurityContext installSecurityContext(Configuration 
configuration) throws Exception {
-               return 
YarnEntrypointUtils.installSecurityContext(configuration, workingDirectory);
-       }
-
-       @Override
-       protected String getRPCPortRange(Configuration configuration) {
-               return 
configuration.getString(YarnConfigOptions.APPLICATION_MASTER_PORT);
-       }
-
-       @Override
-       protected ResourceManager<?> createResourceManager(
+       public ResourceManager<YarnWorkerNode> createResourceManager(
                        Configuration configuration,
                        ResourceID resourceId,
                        RpcService rpcService,
@@ -101,33 +73,4 @@ public class YarnSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
                        fatalErrorHandler,
                        webInterfaceUrl);
        }
-
-       public static void main(String[] args) {
-               // startup checks and logging
-               EnvironmentInformation.logEnvironmentInfo(LOG, 
YarnSessionClusterEntrypoint.class.getSimpleName(), args);
-               SignalHandler.register(LOG);
-               JvmShutdownSafeguard.installAsShutdownHook(LOG);
-
-               Map<String, String> env = System.getenv();
-
-               final String workingDirectory = 
env.get(ApplicationConstants.Environment.PWD.key());
-               Preconditions.checkArgument(
-                       workingDirectory != null,
-                       "Working directory variable (%s) not set",
-                       ApplicationConstants.Environment.PWD.key());
-
-               try {
-                       YarnEntrypointUtils.logYarnEnvironmentInformation(env, 
LOG);
-               } catch (IOException e) {
-                       LOG.warn("Could not log YARN environment information.", 
e);
-               }
-
-               Configuration configuration = 
YarnEntrypointUtils.loadConfiguration(workingDirectory, env, LOG);
-
-               YarnSessionClusterEntrypoint yarnSessionClusterEntrypoint = new 
YarnSessionClusterEntrypoint(
-                       configuration,
-                       workingDirectory);
-
-               yarnSessionClusterEntrypoint.startCluster();
-       }
 }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index 6c92861..116e2ff 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -19,29 +19,18 @@
 package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterComponent;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.entrypoint.SessionClusterComponent;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.yarn.YarnResourceManager;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.Map;
 
@@ -70,36 +59,8 @@ public class YarnSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
        }
 
        @Override
-       protected ResourceManager<?> createResourceManager(
-                       Configuration configuration,
-                       ResourceID resourceId,
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
-                       FatalErrorHandler fatalErrorHandler,
-                       ClusterInformation clusterInformation,
-                       @Nullable String webInterfaceUrl) throws Exception {
-               final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-               final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
-                       rmServicesConfiguration,
-                       highAvailabilityServices,
-                       rpcService.getScheduledExecutor());
-
-               return new YarnResourceManager(
-                       rpcService,
-                       ResourceManager.RESOURCE_MANAGER_NAME,
-                       resourceId,
-                       configuration,
-                       System.getenv(),
-                       highAvailabilityServices,
-                       heartbeatServices,
-                       rmRuntimeServices.getSlotManager(),
-                       metricRegistry,
-                       rmRuntimeServices.getJobLeaderIdService(),
-                       clusterInformation,
-                       fatalErrorHandler,
-                       webInterfaceUrl);
+       protected ClusterComponent<?> createDispatcherComponent(Configuration 
configuration) {
+               return new 
SessionClusterComponent(YarnResourceManagerFactory.INSTANCE);
        }
 
        public static void main(String[] args) {

Reply via email to