asfgit closed pull request #6752: [FLINK-10397] Remove CoreOptions#MODE
URL: https://github.com/apache/flink/pull/6752
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/core_configuration.html 
b/docs/_includes/generated/core_configuration.html
index 98cca9125a0..4366e8b246f 100644
--- a/docs/_includes/generated/core_configuration.html
+++ b/docs/_includes/generated/core_configuration.html
@@ -27,11 +27,6 @@
             <td style="word-wrap: break-word;">'LOCAL_DIRS' on Yarn. 
'_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in 
standalone.</td>
             <td></td>
         </tr>
-        <tr>
-            <td><h5>mode</h5></td>
-            <td style="word-wrap: break-word;">"new"</td>
-            <td>Switch to select the execution mode. Possible values are 'new' 
and 'legacy'.</td>
-        </tr>
         <tr>
             <td><h5>parallelism.default</h5></td>
             <td style="word-wrap: break-word;">1</td>
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 4e4993a88db..14d3ee50a14 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -37,7 +37,6 @@
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
@@ -125,39 +124,28 @@ public void start() throws Exception {
        }
 
        private JobExecutorService createJobExecutorService(Configuration 
configuration) throws Exception {
-               final JobExecutorService newJobExecutorService;
-               if 
(CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
+               if (!configuration.contains(RestOptions.PORT)) {
+                       configuration.setInteger(RestOptions.PORT, 0);
+               }
 
-                       if (!configuration.contains(RestOptions.PORT)) {
-                               configuration.setInteger(RestOptions.PORT, 0);
-                       }
+               final MiniClusterConfiguration miniClusterConfiguration = new 
MiniClusterConfiguration.Builder()
+                       .setConfiguration(configuration)
+                       .setNumTaskManagers(
+                               configuration.getInteger(
+                                       
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
+                                       
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
+                       .setRpcServiceSharing(RpcServiceSharing.SHARED)
+                       .setNumSlotsPerTaskManager(
+                               configuration.getInteger(
+                                       TaskManagerOptions.NUM_TASK_SLOTS, 1))
+                       .build();
 
-                       final MiniClusterConfiguration miniClusterConfiguration 
= new MiniClusterConfiguration.Builder()
-                               .setConfiguration(configuration)
-                               .setNumTaskManagers(
-                                       configuration.getInteger(
-                                               
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
-                                               
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
-                               .setRpcServiceSharing(RpcServiceSharing.SHARED)
-                               .setNumSlotsPerTaskManager(
-                                       configuration.getInteger(
-                                               
TaskManagerOptions.NUM_TASK_SLOTS, 1))
-                               .build();
-
-                       final MiniCluster miniCluster = new 
MiniCluster(miniClusterConfiguration);
-                       miniCluster.start();
-
-                       configuration.setInteger(RestOptions.PORT, 
miniCluster.getRestAddress().getPort());
-
-                       newJobExecutorService = miniCluster;
-               } else {
-                       final LocalFlinkMiniCluster localFlinkMiniCluster = new 
LocalFlinkMiniCluster(configuration, true);
-                       localFlinkMiniCluster.start();
-
-                       newJobExecutorService = localFlinkMiniCluster;
-               }
+               final MiniCluster miniCluster = new 
MiniCluster(miniClusterConfiguration);
+               miniCluster.start();
+
+               configuration.setInteger(RestOptions.PORT, 
miniCluster.getRestAddress().getPort());
 
-               return newJobExecutorService;
+               return miniCluster;
        }
 
        @Override
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 0a2f1b49411..a4424eb0b96 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -24,10 +24,8 @@
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.optimizer.DataStatistics;
@@ -151,11 +149,7 @@ public int getDefaultParallelism() {
        public void start() throws Exception {
                synchronized (lock) {
                        if (client == null) {
-                               if 
(CoreOptions.LEGACY_MODE.equals(clientConfiguration.getString(CoreOptions.MODE)))
 {
-                                       client = new 
StandaloneClusterClient(clientConfiguration);
-                               } else {
-                                       client = new 
RestClusterClient<>(clientConfiguration, "RemoteExecutor");
-                               }
+                               client = new 
RestClusterClient<>(clientConfiguration, "RemoteExecutor");
                                
client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
                        }
                        else {
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index c7e6344447e..c7c664d3f86 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -123,8 +123,6 @@
 
        private final int defaultParallelism;
 
-       private final boolean isNewMode;
-
        public CliFrontend(
                        Configuration configuration,
                        List<CustomCommandLine<?>> customCommandLines) throws 
Exception {
@@ -147,8 +145,6 @@ public CliFrontend(
 
                this.clientTimeout = 
AkkaUtils.getClientTimeout(this.configuration);
                this.defaultParallelism = 
configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
-
-               this.isNewMode = 
CoreOptions.NEW_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE));
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -233,7 +229,7 @@ protected void run(String[] args) throws Exception {
                        final ClusterClient<T> client;
 
                        // directly deploy the job if the cluster is started in 
job mode and detached
-                       if (isNewMode && clusterId == null && 
runOptions.getDetachedMode()) {
+                       if (clusterId == null && runOptions.getDetachedMode()) {
                                int parallelism = runOptions.getParallelism() 
== -1 ? defaultParallelism : runOptions.getParallelism();
 
                                final JobGraph jobGraph = 
PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
@@ -1200,11 +1196,7 @@ static void setJobManagerAddressInConfig(Configuration 
config, InetSocketAddress
                        LOG.warn("Could not load CLI class {}.", 
flinkYarnSessionCLI, e);
                }
 
-               if 
(configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE))
 {
-                       customCommandLines.add(new DefaultCLI(configuration));
-               } else {
-                       customCommandLines.add(new LegacyCLI(configuration));
-               }
+               customCommandLines.add(new DefaultCLI(configuration));
 
                return customCommandLines;
        }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
index 3c243763a0b..8ff426c8057 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
@@ -19,43 +19,21 @@
 package org.apache.flink.client.cli;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
 /**
- * Base test class for {@link CliFrontend} tests that wraps the new vs. legacy 
mode.
+ * Base test class for {@link CliFrontend} tests.
  */
-@RunWith(Parameterized.class)
 public abstract class CliFrontendTestBase extends TestLogger {
-       @Parameterized.Parameter
-       public String mode;
-
-       @Parameterized.Parameters(name = "Mode = {0}")
-       public static List<String> parameters() {
-               return Arrays.asList(CoreOptions.LEGACY_MODE, 
CoreOptions.NEW_MODE);
-       }
 
        protected Configuration getConfiguration() {
                final Configuration configuration = GlobalConfiguration
                        .loadConfiguration(CliFrontendTestUtils.getConfigDir());
-               configuration.setString(CoreOptions.MODE, mode);
                return configuration;
        }
 
        static AbstractCustomCommandLine<?> getCli(Configuration configuration) 
{
-               switch (configuration.getString(CoreOptions.MODE)) {
-                       case CoreOptions.LEGACY_MODE:
-                               return new LegacyCLI(configuration);
-                       case CoreOptions.NEW_MODE:
-                               return new DefaultCLI(configuration);
-               }
-               throw new IllegalStateException();
+               return new DefaultCLI(configuration);
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 9ae807ef145..4c928fef686 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -304,26 +304,4 @@
        public static ConfigOption<Long> 
fileSystemConnectionLimitStreamInactivityTimeout(String scheme) {
                return ConfigOptions.key("fs." + scheme + 
".limit.stream-timeout").defaultValue(0L);
        }
-
-       // 
------------------------------------------------------------------------
-       //  Distributed architecture
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Constant value for the new execution mode.
-        */
-       public static final String NEW_MODE = "new";
-
-       /**
-        * Constant value for the old execution mode.
-        */
-       public static final String LEGACY_MODE = "legacy";
-
-       /**
-        * Switch to select the execution mode. Possible values are {@link 
CoreOptions#NEW_MODE}
-        * and {@link CoreOptions#LEGACY_MODE}.
-        */
-       public static final ConfigOption<String> MODE = key("mode")
-               .defaultValue(NEW_MODE)
-               .withDescription("Switch to select the execution mode. Possible 
values are 'new' and 'legacy'.");
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index c9a17227f79..9eaef34a33a 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -54,6 +54,7 @@
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
@@ -88,7 +89,7 @@
  *
  * <p>Specialization of this class can be used for the session mode and the 
per-job mode
  */
-public abstract class ClusterEntrypoint implements FatalErrorHandler {
+public abstract class ClusterEntrypoint implements AutoCloseableAsync, 
FatalErrorHandler {
 
        public static final ConfigOption<String> EXECUTION_MODE = ConfigOptions
                .key("internal.cluster.execution-mode")
@@ -147,7 +148,7 @@ protected ClusterEntrypoint(Configuration configuration) {
                return terminationFuture;
        }
 
-       protected void startCluster() throws ClusterEntrypointException {
+       public void startCluster() throws ClusterEntrypointException {
                LOG.info("Starting {}.", getClass().getSimpleName());
 
                try {
@@ -312,6 +313,14 @@ protected MetricRegistryImpl 
createMetricRegistry(Configuration configuration) {
                return new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
        }
 
+       @Override
+       public CompletableFuture<Void> closeAsync() {
+               return shutDownAsync(
+                       ApplicationStatus.UNKNOWN,
+                       "Cluster entrypoint has been closed externally.",
+                       true).thenAccept(ignored -> {});
+       }
+
        protected CompletableFuture<Void> stopClusterServices(boolean 
cleanupHaData) {
                synchronized (lock) {
                        Throwable exception = null;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
index 94925b2aba0..b07095c2b6d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
@@ -116,6 +116,16 @@ private void registerShutDownFuture() {
                return shutDownFuture;
        }
 
+       @Nonnull
+       public T getDispatcher() {
+               return dispatcher;
+       }
+
+       @Nonnull
+       public WebMonitorEndpoint<?> getWebMonitorEndpoint() {
+               return webMonitorEndpoint;
+       }
+
        @Override
        public CompletableFuture<Void> closeAsync() {
                if (isRunning.compareAndSet(true, false)) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
new file mode 100644
index 00000000000..85d3caa26fa
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
+import org.apache.flink.runtime.jobmanager.JobManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link Dispatcher} instance running in a separate JVM.
+ */
+public class DispatcherProcess extends TestJvmProcess {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(JobManagerProcess.class);
+
+       /** ID for this JobManager. */
+       private final int id;
+
+       /** The configuration for the JobManager. */
+       private final Configuration config;
+
+       /** Configuration parsed as args for {@link 
JobManagerProcess.JobManagerProcessEntryPoint}. */
+       private final String[] jvmArgs;
+
+       /**
+        * Creates a {@link JobManager} running in a separate JVM.
+        *
+        * @param id     ID for the JobManager
+        * @param config Configuration for the job manager process
+        *
+        * @throws Exception
+        */
+       public DispatcherProcess(int id, Configuration config) throws Exception 
{
+               checkArgument(id >= 0, "Negative ID");
+               this.id = id;
+               this.config = checkNotNull(config, "Configuration");
+
+               ArrayList<String> args = new ArrayList<>();
+
+               for (Map.Entry<String, String> entry : 
config.toMap().entrySet()) {
+                       args.add("--" + entry.getKey());
+                       args.add(entry.getValue());
+               }
+
+               this.jvmArgs = new String[args.size()];
+               args.toArray(jvmArgs);
+       }
+
+       @Override
+       public String getName() {
+               return "JobManager " + id;
+       }
+
+       @Override
+       public String[] getJvmArgs() {
+               return jvmArgs;
+       }
+
+       @Override
+       public String getEntryPointClassName() {
+               return DispatcherProcessEntryPoint.class.getName();
+       }
+
+       public Configuration getConfig() {
+               return config;
+       }
+
+       @Override
+       public String toString() {
+               return String.format("JobManagerProcess(id=%d)", id);
+       }
+
+       /**
+        * Entry point for the JobManager process.
+        */
+       public static class DispatcherProcessEntryPoint {
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(DispatcherProcessEntryPoint.class);
+
+               /**
+                * Entrypoint of the DispatcherProcessEntryPoint.
+                *
+                * <p>Other arguments are parsed to a {@link Configuration} and 
passed to the
+                * JobManager, for instance: <code>--high-availability 
ZOOKEEPER --high-availability.zookeeper.quorum
+                * "xyz:123:456"</code>.
+                */
+               public static void main(String[] args) {
+                       try {
+                               ParameterTool params = 
ParameterTool.fromArgs(args);
+                               Configuration config = 
params.getConfiguration();
+                               LOG.info("Configuration: {}.", config);
+
+                               config.setInteger(JobManagerOptions.PORT, 0);
+                               config.setInteger(RestOptions.PORT, 0);
+
+                               final StandaloneSessionClusterEntrypoint 
clusterEntrypoint = new StandaloneSessionClusterEntrypoint(config);
+
+                               
ClusterEntrypoint.runClusterEntrypoint(clusterEntrypoint);
+                       }
+                       catch (Throwable t) {
+                               LOG.error("Failed to start JobManager process", 
t);
+                               System.exit(1);
+                       }
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
deleted file mode 100644
index b381f62b97e..00000000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testutils;
-
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link TaskManager} instance running in a separate JVM.
- */
-public class TaskManagerProcess extends TestJvmProcess {
-
-       /** ID for this TaskManager */
-       private final int id;
-
-       /** The configuration for the TaskManager */
-       private final Configuration config;
-
-       /** Configuration parsed as args for {@link 
TaskManagerProcess.TaskManagerProcessEntryPoint} */
-       private final String[] jvmArgs;
-
-       public TaskManagerProcess(int id, Configuration config) throws 
Exception {
-               checkArgument(id >= 0, "Negative ID");
-               this.id = id;
-               this.config = checkNotNull(config, "Configuration");
-
-               ArrayList<String> args = new ArrayList<>();
-
-               for (Map.Entry<String, String> entry : 
config.toMap().entrySet()) {
-                       args.add("--" + entry.getKey());
-                       args.add(entry.getValue());
-               }
-
-               this.jvmArgs = new String[args.size()];
-               args.toArray(jvmArgs);
-       }
-
-       @Override
-       public String getName() {
-               return "TaskManager " + id;
-       }
-
-       @Override
-       public String[] getJvmArgs() {
-               return jvmArgs;
-       }
-
-       @Override
-       public String getEntryPointClassName() {
-               return TaskManagerProcessEntryPoint.class.getName();
-       }
-
-       public int getId() {
-               return id;
-       }
-
-       @Override
-       public String toString() {
-               return String.format("TaskManagerProcess(id=%d)", id);
-       }
-
-       /**
-        * Entry point for the TaskManager process.
-        */
-       public static class TaskManagerProcessEntryPoint {
-
-               private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
-
-               /**
-                * All arguments are parsed to a {@link Configuration} and 
passed to the Taskmanager,
-                * for instance: <code>--high-availability ZOOKEEPER 
--high-availability.zookeeper.quorum "xyz:123:456"</code>.
-                */
-               public static void main(String[] args) throws Exception {
-                       try {
-                               Configuration config = 
ParameterTool.fromArgs(args).getConfiguration();
-
-                               if 
(!config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
-                                       
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
-                               }
-
-                               if 
(!config.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
-                                       
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-                               }
-
-
-                               LOG.info("Configuration: {}.", config);
-
-                               // Run the TaskManager
-                               
TaskManager.selectNetworkInterfaceAndRunTaskManager(
-                                       config,
-                                       ResourceID.generate(),
-                                       TaskManager.class);
-
-                               // Run forever
-                               new CountDownLatch(1).await();
-                       }
-                       catch (Throwable t) {
-                               LOG.error("Failed to start TaskManager 
process", t);
-                               System.exit(1);
-                       }
-               }
-       }
-
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
index 080ecf81e66..654b2bd66d7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
@@ -62,4 +62,8 @@ protected void after() {
        public int getBlobServerPort() {
                return blobServer.getPort();
        }
+
+       public BlobServer getBlobServer() {
+               return blobServer;
+       }
 }
diff --git 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index c04e845c2fc..d493495d484 100644
--- 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -139,36 +139,24 @@ object FlinkShell {
     }
   }
 
-  private type LocalCluster = Either[StandaloneMiniCluster, MiniCluster]
-
   def fetchConnectionInfo(
     configuration: Configuration,
     config: Config
-  ): (String, Int, Option[Either[LocalCluster , ClusterClient[_]]]) = {
+  ): (String, Int, Option[Either[MiniCluster , ClusterClient[_]]]) = {
     config.executionMode match {
       case ExecutionMode.LOCAL => // Local mode
         val config = configuration
         config.setInteger(JobManagerOptions.PORT, 0)
 
-        val (miniCluster, port) = config.getString(CoreOptions.MODE) match {
-          case CoreOptions.LEGACY_MODE => {
-            val cluster = new StandaloneMiniCluster(config)
-
-            (Left(cluster), cluster.getPort)
-          }
-          case CoreOptions.NEW_MODE => {
-            val miniClusterConfig = new MiniClusterConfiguration.Builder()
-              .setConfiguration(config)
-              .build()
-            val cluster = new MiniCluster(miniClusterConfig)
-            cluster.start()
-
-            (Right(cluster), cluster.getRestAddress.getPort)
-          }
-        }
+        val miniClusterConfig = new MiniClusterConfiguration.Builder()
+          .setConfiguration(config)
+          .build()
+        val cluster = new MiniCluster(miniClusterConfig)
+        cluster.start()
+        val port = cluster.getRestAddress.getPort
 
         println(s"\nStarting local Flink cluster (host: localhost, port: 
$port).\n")
-        ("localhost", port, Some(Left(miniCluster)))
+        ("localhost", port, Some(Left(cluster)))
 
       case ExecutionMode.REMOTE => // Remote mode
         if (config.host.isEmpty || config.port.isEmpty) {
@@ -211,8 +199,7 @@ object FlinkShell {
     val (repl, cluster) = try {
       val (host, port, cluster) = fetchConnectionInfo(configuration, config)
       val conf = cluster match {
-        case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration
-        case Some(Left(Right(_))) => configuration
+        case Some(Left(_)) => configuration
         case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
         case None => configuration
       }
@@ -242,8 +229,7 @@ object FlinkShell {
     } finally {
       repl.closeInterpreter()
       cluster match {
-        case Some(Left(Left(legacyMiniCluster))) => legacyMiniCluster.close()
-        case Some(Left(Right(newMiniCluster))) => newMiniCluster.close()
+        case Some(Left(miniCluster)) => miniCluster.close()
         case Some(Right(yarnCluster)) => yarnCluster.shutdown()
         case _ =>
       }
diff --git 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 54bb16f038c..731bbf6b288 100644
--- 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -319,7 +319,6 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-    configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
     // set to different than default so not to interfere with 
ScalaShellLocalStartupITCase
     configuration.setInteger(RestOptions.PORT, 8082)
     val miniConfig = new MiniClusterConfiguration.Builder()
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 9c36dab75fd..0af6d937294 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -24,10 +24,8 @@
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -206,11 +204,7 @@ protected JobExecutionResult executeRemotely(StreamGraph 
streamGraph, List<URL>
 
                final ClusterClient<?> client;
                try {
-                       if 
(CoreOptions.LEGACY_MODE.equals(configuration.getString(CoreOptions.MODE))) {
-                               client = new 
StandaloneClusterClient(configuration);
-                       } else {
-                               client = new RestClusterClient<>(configuration, 
"RemoteStreamEnvironment");
-                       }
+                       client = new RestClusterClient<>(configuration, 
"RemoteStreamEnvironment");
                }
                catch (Exception e) {
                        throw new ProgramInvocationException("Cannot establish 
connection to JobManager: " + e.getMessage(),
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b7259de196f..d4e14f07667 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -47,7 +47,6 @@
 import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -1653,13 +1652,9 @@ public static LocalStreamEnvironment 
createLocalEnvironment(int parallelism) {
        public static LocalStreamEnvironment createLocalEnvironment(int 
parallelism, Configuration configuration) {
                final LocalStreamEnvironment currentEnvironment;
 
-               if 
(CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
-                       currentEnvironment = new 
LocalStreamEnvironment(configuration);
-               } else {
-                       currentEnvironment = new 
LegacyLocalStreamEnvironment(configuration);
-               }
-
+               currentEnvironment = new LocalStreamEnvironment(configuration);
                currentEnvironment.setParallelism(parallelism);
+
                return currentEnvironment;
        }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index c2d63415b2a..451108b8627 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -20,37 +20,27 @@
 
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.runtime.minicluster.StandaloneMiniCluster;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assume.assumeTrue;
 
 /**
  * Integration tests for {@link org.apache.flink.api.java.RemoteEnvironment}.
@@ -78,31 +68,22 @@
        public static void setupCluster() throws Exception {
                configuration = new Configuration();
 
-               if 
(CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
-                       configuration.setInteger(WebOptions.PORT, 0);
-                       final MiniCluster miniCluster = new MiniCluster(
-                               new MiniClusterConfiguration.Builder()
-                                       .setConfiguration(configuration)
-                                       .setNumSlotsPerTaskManager(TM_SLOTS)
-                                       .build());
+               configuration.setInteger(WebOptions.PORT, 0);
+               final MiniCluster miniCluster = new MiniCluster(
+                       new MiniClusterConfiguration.Builder()
+                               .setConfiguration(configuration)
+                               .setNumSlotsPerTaskManager(TM_SLOTS)
+                               .build());
 
-                       miniCluster.start();
+               miniCluster.start();
 
-                       final URI uri = miniCluster.getRestAddress();
-                       hostname = uri.getHost();
-                       port = uri.getPort();
+               final URI uri = miniCluster.getRestAddress();
+               hostname = uri.getHost();
+               port = uri.getPort();
 
-                       configuration.setInteger(WebOptions.PORT, port);
+               configuration.setInteger(WebOptions.PORT, port);
 
-                       resource = miniCluster;
-               } else {
-                       
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, TM_SLOTS);
-                       final StandaloneMiniCluster standaloneMiniCluster = new 
StandaloneMiniCluster(configuration);
-                       hostname = standaloneMiniCluster.getHostname();
-                       port = standaloneMiniCluster.getPort();
-
-                       resource = standaloneMiniCluster;
-               }
+               resource = miniCluster;
        }
 
        @AfterClass
@@ -110,32 +91,6 @@ public static void tearDownCluster() throws Exception {
                resource.close();
        }
 
-       /**
-        * Ensure that that Akka configuration parameters can be set.
-        */
-       @Test(expected = FlinkException.class)
-       public void testInvalidAkkaConfiguration() throws Throwable {
-               
assumeTrue(CoreOptions.LEGACY_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE)));
-               Configuration config = new Configuration();
-               config.setString(AkkaOptions.STARTUP_TIMEOUT, 
INVALID_STARTUP_TIMEOUT);
-
-               final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
-                               hostname,
-                               port,
-                               config
-               );
-               env.getConfig().disableSysoutLogging();
-
-               DataSet<String> result = env.createInput(new 
TestNonRichInputFormat());
-               result.output(new LocalCollectionOutputFormat<>(new 
ArrayList<String>()));
-               try {
-                       env.execute();
-                       Assert.fail("Program should not run successfully, cause 
of invalid akka settings.");
-               } catch (ProgramInvocationException ex) {
-                       throw ex.getCause();
-               }
-       }
-
        /**
         * Ensure that the program parallelism can be set even if the 
configuration is supplied.
         */
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 56327adaae0..5d7f26bb886 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -19,28 +19,19 @@
 package org.apache.flink.test.recovery;
 
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -51,17 +42,8 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.StringWriter;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
 import static org.junit.Assert.assertFalse;
@@ -91,6 +73,9 @@
        @Rule
        public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+       @Rule
+       public final BlobServerResource blobServerResource = new 
BlobServerResource();
+
        @Test
        public void testTaskManagerProcessFailure() throws Exception {
 
@@ -98,15 +83,24 @@ public void testTaskManagerProcessFailure() throws 
Exception {
                final StringWriter processOutput2 = new StringWriter();
                final StringWriter processOutput3 = new StringWriter();
 
-               ActorSystem jmActorSystem = null;
-               HighAvailabilityServices highAvailabilityServices = null;
                Process taskManagerProcess1 = null;
                Process taskManagerProcess2 = null;
                Process taskManagerProcess3 = null;
 
                File coordinateTempDir = null;
 
-               try {
+               final int jobManagerPort = NetUtils.getAvailablePort();
+               final int restPort = NetUtils.getAvailablePort();
+
+               Configuration jmConfig = new Configuration();
+               jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+               jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
+               jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+               jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 
500L);
+               jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 
10000L);
+               jmConfig.setInteger(RestOptions.PORT, restPort);
+
+               try (final StandaloneSessionClusterEntrypoint clusterEntrypoint 
= new StandaloneSessionClusterEntrypoint(jmConfig)) {
                        // check that we run this test only if the java command
                        // is available on this machine
                        String javaCommand = getJavaCommandPath();
@@ -123,37 +117,7 @@ public void testTaskManagerProcessFailure() throws 
Exception {
                        // coordination between the processes goes through a 
directory
                        coordinateTempDir = temporaryFolder.newFolder();
 
-                       // find a free port to start the JobManager
-                       final int jobManagerPort = NetUtils.getAvailablePort();
-
-                       // start a JobManager
-                       Tuple2<String, Object> localAddress = new 
Tuple2<String, Object>("localhost", jobManagerPort);
-
-                       Configuration jmConfig = new Configuration();
-                       
jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms");
-                       jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, 
"6 s");
-                       jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 9);
-                       
jmConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "10 s");
-                       jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
-                       jmConfig.setString(JobManagerOptions.ADDRESS, 
localAddress._1());
-                       jmConfig.setInteger(JobManagerOptions.PORT, 
jobManagerPort);
-
-                       highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
-                               jmConfig,
-                               TestingUtils.defaultExecutor(),
-                               
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-                       jmActorSystem = AkkaUtils.createActorSystem(jmConfig, 
new Some<>(localAddress));
-                       ActorRef jmActor = JobManager.startJobManagerActors(
-                               jmConfig,
-                               jmActorSystem,
-                               TestingUtils.defaultExecutor(),
-                               TestingUtils.defaultExecutor(),
-                               highAvailabilityServices,
-                               NoOpMetricRegistry.INSTANCE,
-                               Option.empty(),
-                               JobManager.class,
-                               MemoryArchivist.class)._1();
+                       clusterEntrypoint.startCluster();
 
                        // the TaskManager java command
                        String[] command = new String[] {
@@ -162,7 +126,7 @@ public void testTaskManagerProcessFailure() throws 
Exception {
                                        "-Dlog4j.configuration=file:" + 
tempLogFile.getAbsolutePath(),
                                        "-Xms80m", "-Xmx80m",
                                        "-classpath", getCurrentClasspath(),
-                                       
TaskManagerProcessEntryPoint.class.getName(),
+                                       
TaskExecutorProcessEntryPoint.class.getName(),
                                        String.valueOf(jobManagerPort)
                        };
 
@@ -172,10 +136,6 @@ public void testTaskManagerProcessFailure() throws 
Exception {
                        taskManagerProcess2 = new 
ProcessBuilder(command).start();
                        new 
CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), 
processOutput2);
 
-                       // we wait for the JobManager to have the two 
TaskManagers available
-                       // since some of the CI environments are very hostile, 
we need to give this a lot of time (2 minutes)
-                       waitUntilNumTaskManagersAreRegistered(jmActor, 2, 
120000);
-
                        // the program will set a marker file in each of its 
parallel tasks once they are ready, so that
                        // this coordinating code is aware of this.
                        // the program will very slowly consume elements until 
the marker file (later created by the
@@ -188,7 +148,7 @@ public void testTaskManagerProcessFailure() throws 
Exception {
                                @Override
                                public void run() {
                                        try {
-                                               
testTaskManagerFailure(jobManagerPort, coordinateDirClosure);
+                                               
testTaskManagerFailure(restPort, coordinateDirClosure);
                                        }
                                        catch (Throwable t) {
                                                t.printStackTrace();
@@ -219,10 +179,6 @@ public void run() {
                        taskManagerProcess3 = new 
ProcessBuilder(command).start();
                        new 
CommonTestUtils.PipeForwarder(taskManagerProcess3.getErrorStream(), 
processOutput3);
 
-                       // we wait for the third TaskManager to register
-                       // since some of the CI environments are very hostile, 
we need to give this a lot of time (2 minutes)
-                       waitUntilNumTaskManagersAreRegistered(jmActor, 3, 
120000);
-
                        // kill one of the previous TaskManagers, triggering a 
failure and recovery
                        taskManagerProcess1.destroy();
                        taskManagerProcess1 = null;
@@ -269,13 +225,6 @@ public void run() {
                        if (taskManagerProcess3 != null) {
                                taskManagerProcess3.destroy();
                        }
-                       if (jmActorSystem != null) {
-                               jmActorSystem.shutdown();
-                       }
-
-                       if (highAvailabilityServices != null) {
-                               
highAvailabilityServices.closeAndCleanupAllData();
-                       }
                }
        }
 
@@ -289,44 +238,6 @@ public void run() {
         */
        public abstract void testTaskManagerFailure(int jobManagerPort, File 
coordinateDir) throws Exception;
 
-       protected void waitUntilNumTaskManagersAreRegistered(ActorRef 
jobManager, int numExpected, long maxDelayMillis)
-                       throws Exception {
-               final long pollInterval = 10_000_000; // 10 ms = 10,000,000 
nanos
-               final long deadline = System.nanoTime() + maxDelayMillis * 
1_000_000;
-
-               long time;
-
-               while ((time = System.nanoTime()) < deadline) {
-                       FiniteDuration timeout = new 
FiniteDuration(pollInterval, TimeUnit.NANOSECONDS);
-
-                       try {
-                               Future<?> result = Patterns.ask(jobManager,
-                                               
JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-                                               new Timeout(timeout));
-
-                               int numTMs = (Integer) Await.result(result, 
timeout);
-
-                               if (numTMs == numExpected) {
-                                       return;
-                               }
-                       }
-                       catch (TimeoutException e) {
-                               // ignore and retry
-                       }
-                       catch (ClassCastException e) {
-                               fail("Wrong response: " + e.getMessage());
-                       }
-
-                       long timePassed = System.nanoTime() - time;
-                       long remainingMillis = (pollInterval - timePassed) / 
1_000_000;
-                       if (remainingMillis > 0) {
-                               Thread.sleep(remainingMillis);
-                       }
-               }
-
-               fail("The TaskManagers did not register within the expected 
time (" + maxDelayMillis + "msecs)");
-       }
-
        protected static void printProcessLog(String processName, String log) {
                if (log == null || log.length() == 0) {
                        return;
@@ -387,11 +298,11 @@ protected static boolean waitForMarkerFiles(File basedir, 
String prefix, int num
        // 
--------------------------------------------------------------------------------------------
 
        /**
-        * The entry point for the TaskManager JVM. Simply configures and runs 
a TaskManager.
+        * The entry point for the TaskExecutor JVM. Simply configures and runs 
a TaskExecutor.
         */
-       public static class TaskManagerProcessEntryPoint {
+       public static class TaskExecutorProcessEntryPoint {
 
-               private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
+               private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutorProcessEntryPoint.class);
 
                public static void main(String[] args) {
                        try {
@@ -405,14 +316,7 @@ public static void main(String[] args) {
                                
cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
                                cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
 
-                               
TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg,
-                                       ResourceID.generate(), 
TaskManager.class);
-
-                               // wait forever
-                               Object lock = new Object();
-                               synchronized (lock) {
-                                       lock.wait();
-                               }
+                               TaskManagerRunner.runTaskManager(cfg, 
ResourceID.generate());
                        }
                        catch (Throwable t) {
                                LOG.error("Failed to start TaskManager 
process", t);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index d3accffcbf8..9e9ce076197 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -22,36 +22,38 @@
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.runtime.testutils.JobManagerProcess;
+import org.apache.flink.runtime.testutils.DispatcherProcess;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import org.apache.commons.io.FileUtils;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -59,12 +61,15 @@
 import org.junit.runners.Parameterized;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -90,23 +95,28 @@
 @RunWith(Parameterized.class)
 public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 
-       private static final ZooKeeperTestEnvironment ZooKeeper = new 
ZooKeeperTestEnvironment(1);
+       private static ZooKeeperTestEnvironment zooKeeper;
 
        private static final FiniteDuration TestTimeOut = new FiniteDuration(5, 
TimeUnit.MINUTES);
 
        @Rule
        public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-       @AfterClass
-       public static void tearDown() throws Exception {
-               if (ZooKeeper != null) {
-                       ZooKeeper.shutdown();
-               }
+       @BeforeClass
+       public static void setup() {
+               zooKeeper = new ZooKeeperTestEnvironment(1);
        }
 
        @Before
        public void cleanUp() throws Exception {
-               ZooKeeper.deleteAll();
+               zooKeeper.deleteAll();
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               if (zooKeeper != null) {
+                       zooKeeper.shutdown();
+               }
        }
 
        protected static final String READY_MARKER_FILE_PREFIX = "ready_";
@@ -141,7 +151,6 @@ public 
JobManagerHAProcessFailureBatchRecoveryITCase(ExecutionMode executionMode
         */
        private void testJobManagerFailure(String zkQuorum, final File 
coordinateDir, final File zookeeperStoragePath) throws Exception {
                Configuration config = new Configuration();
-               config.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkQuorum);
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
zookeeperStoragePath.getAbsolutePath());
@@ -149,7 +158,7 @@ private void testJobManagerFailure(String zkQuorum, final 
File coordinateDir, fi
                ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
                                "leader", 1, config);
                env.setParallelism(PARALLELISM);
-               env.setNumberOfExecutionRetries(1);
+               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
0L));
                env.getConfig().setExecutionMode(executionMode);
                env.getConfig().disableSysoutLogging();
 
@@ -212,7 +221,8 @@ public void flatMap(Long value, Collector<Long> out) throws 
Exception {
        }
 
        @Test
-       public void testJobManagerProcessFailure() throws Exception {
+       public void testDispatcherProcessFailure() throws Exception {
+               final Time timeout = Time.seconds(30L);
                final File zookeeperStoragePath = temporaryFolder.newFolder();
 
                // Config
@@ -222,15 +232,11 @@ public void testJobManagerProcessFailure() throws 
Exception {
 
                assertEquals(PARALLELISM, numberOfTaskManagers * 
numberOfSlotsPerTaskManager);
 
-               // Setup
-               // Test actor system
-               ActorSystem testActorSystem;
-
                // Job managers
-               final JobManagerProcess[] jmProcess = new 
JobManagerProcess[numberOfJobManagers];
+               final DispatcherProcess[] dispatcherProcesses = new 
DispatcherProcess[numberOfJobManagers];
 
                // Task managers
-               final ActorSystem[] tmActorSystem = new 
ActorSystem[numberOfTaskManagers];
+               TaskManagerRunner[] taskManagerRunners = new 
TaskManagerRunner[numberOfTaskManagers];
 
                HighAvailabilityServices highAvailabilityServices = null;
 
@@ -239,24 +245,25 @@ public void testJobManagerProcessFailure() throws 
Exception {
                // Coordination between the processes goes through a directory
                File coordinateTempDir = null;
 
+               // Cluster config
+               Configuration config = 
ZooKeeperTestUtils.createZooKeeperHAConfig(
+                       zooKeeper.getConnectString(), 
zookeeperStoragePath.getPath());
+               // Task manager configuration
+               config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+               config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+               config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+
+               final RpcService rpcService = 
AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
+
                try {
                        final Deadline deadline = TestTimeOut.fromNow();
 
                        // Coordination directory
                        coordinateTempDir = temporaryFolder.newFolder();
 
-                       // Job Managers
-                       Configuration config = 
ZooKeeperTestUtils.createZooKeeperHAConfig(
-                                       ZooKeeper.getConnectString(), 
zookeeperStoragePath.getPath());
-
                        // Start first process
-                       jmProcess[0] = new JobManagerProcess(0, config);
-                       jmProcess[0].startProcess();
-
-                       // Task manager configuration
-                       
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
-                       
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-                       config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+                       dispatcherProcesses[0] = new DispatcherProcess(0, 
config);
+                       dispatcherProcesses[0].startProcess();
 
                        highAvailabilityServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
                                config,
@@ -264,27 +271,13 @@ public void testJobManagerProcessFailure() throws 
Exception {
 
                        // Start the task manager process
                        for (int i = 0; i < numberOfTaskManagers; i++) {
-                               tmActorSystem[i] = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-                               TaskManager.startTaskManagerComponentsAndActor(
-                                       config,
-                                       ResourceID.generate(),
-                                       tmActorSystem[i],
-                                       highAvailabilityServices,
-                                       NoOpMetricRegistry.INSTANCE,
-                                       "localhost",
-                                       Option.<String>empty(),
-                                       false,
-                                       TaskManager.class);
+                               taskManagerRunners[i] = new 
TaskManagerRunner(config, ResourceID.generate());
+                               taskManagerRunners[i].start();
                        }
 
-                       // Test actor system
-                       testActorSystem = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-
-                       jmProcess[0].getActorRef(testActorSystem, 
deadline.timeLeft());
-
                        // Leader listener
                        TestingListener leaderListener = new TestingListener();
-                       leaderRetrievalService = 
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
+                       leaderRetrievalService = 
highAvailabilityServices.getDispatcherLeaderRetriever();
                        leaderRetrievalService.start(leaderListener);
 
                        // Initial submission
@@ -293,13 +286,14 @@ public void testJobManagerProcessFailure() throws 
Exception {
                        String leaderAddress = leaderListener.getAddress();
                        UUID leaderId = leaderListener.getLeaderSessionID();
 
-                       // Get the leader ref
-                       ActorRef leaderRef = 
AkkaUtils.getActorRef(leaderAddress, testActorSystem, deadline.timeLeft());
-                       ActorGateway leaderGateway = new 
AkkaActorGateway(leaderRef, leaderId);
+                       final CompletableFuture<DispatcherGateway> 
dispatcherGatewayFuture = rpcService.connect(
+                               leaderAddress,
+                               DispatcherId.fromUuid(leaderId),
+                               DispatcherGateway.class);
+                       final DispatcherGateway dispatcherGateway = 
dispatcherGatewayFuture.get();
 
                        // Wait for all task managers to connect to the leading 
job manager
-                       
JobManagerActorTestUtils.waitForTaskManagers(numberOfTaskManagers, 
leaderGateway,
-                                       deadline.timeLeft());
+                       waitForTaskManagers(numberOfTaskManagers, 
dispatcherGateway, deadline.timeLeft());
 
                        final File coordinateDirClosure = coordinateTempDir;
                        final Throwable[] errorRef = new Throwable[1];
@@ -309,7 +303,7 @@ public void testJobManagerProcessFailure() throws Exception 
{
                                @Override
                                public void run() {
                                        try {
-                                               
testJobManagerFailure(ZooKeeper.getConnectString(), coordinateDirClosure, 
zookeeperStoragePath);
+                                               
testJobManagerFailure(zooKeeper.getConnectString(), coordinateDirClosure, 
zookeeperStoragePath);
                                        }
                                        catch (Throwable t) {
                                                t.printStackTrace();
@@ -326,12 +320,10 @@ public void run() {
                                        READY_MARKER_FILE_PREFIX, PARALLELISM, 
deadline.timeLeft().toMillis());
 
                        // Kill one of the job managers and trigger recovery
-                       jmProcess[0].destroy();
+                       dispatcherProcesses[0].destroy();
 
-                       jmProcess[1] = new JobManagerProcess(1, config);
-                       jmProcess[1].startProcess();
-
-                       jmProcess[1].getActorRef(testActorSystem, 
deadline.timeLeft());
+                       dispatcherProcesses[1] = new DispatcherProcess(1, 
config);
+                       dispatcherProcesses[1].startProcess();
 
                        // we create the marker file which signals the program 
functions tasks that they can complete
                        
AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new 
File(coordinateTempDir, PROCEED_MARKER_FILE));
@@ -358,7 +350,7 @@ public void run() {
                        // for Travis and the root problem is not shown)
                        t.printStackTrace();
 
-                       for (JobManagerProcess p : jmProcess) {
+                       for (DispatcherProcess p : dispatcherProcesses) {
                                if (p != null) {
                                        p.printProcessLog();
                                }
@@ -368,8 +360,8 @@ public void run() {
                }
                finally {
                        for (int i = 0; i < numberOfTaskManagers; i++) {
-                               if (tmActorSystem[i] != null) {
-                                       tmActorSystem[i].shutdown();
+                               if (taskManagerRunners[i] != null) {
+                                       taskManagerRunners[i].close();
                                }
                        }
 
@@ -377,7 +369,7 @@ public void run() {
                                leaderRetrievalService.stop();
                        }
 
-                       for (JobManagerProcess jmProces : jmProcess) {
+                       for (DispatcherProcess jmProces : dispatcherProcesses) {
                                if (jmProces != null) {
                                        jmProces.destroy();
                                }
@@ -387,6 +379,8 @@ public void run() {
                                
highAvailabilityServices.closeAndCleanupAllData();
                        }
 
+                       RpcUtils.terminateRpcService(rpcService, timeout);
+
                        // Delete coordination directory
                        if (coordinateTempDir != null) {
                                try {
@@ -398,4 +392,14 @@ public void run() {
                }
        }
 
+       private void waitForTaskManagers(int numberOfTaskManagers, 
DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws 
ExecutionException, InterruptedException {
+               FutureUtils.retrySuccesfulWithDelay(
+                       () -> 
dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())),
+                       Time.milliseconds(50L),
+                       
org.apache.flink.api.common.time.Deadline.fromNow(Duration.ofMillis(timeLeft.toMillis())),
+                       clusterOverview -> 
clusterOverview.getNumTaskManagersConnected() >= numberOfTaskManagers,
+                       new 
ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor()))
+                       .get();
+       }
+
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index b85a410e3a6..afca8f12100 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -21,51 +21,58 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
+import 
org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.NetUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.CheckedSupplier;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.StringWriter;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * This test makes sure that jobs are canceled properly in cases where
@@ -74,15 +81,36 @@
 @SuppressWarnings("serial")
 public class ProcessFailureCancelingITCase extends TestLogger {
 
+       @Rule
+       public final BlobServerResource blobServerResource = new 
BlobServerResource();
+
        @Test
        public void testCancelingOnProcessFailure() throws Exception {
                final StringWriter processOutput = new StringWriter();
+               final Time timeout = Time.minutes(2L);
 
-               ActorSystem jmActorSystem = null;
+               RestClusterClient<String> clusterClient = null;
                Process taskManagerProcess = null;
-               HighAvailabilityServices highAvailabilityServices = null;
+               final TestingFatalErrorHandler fatalErrorHandler = new 
TestingFatalErrorHandler();
+
+               Configuration jmConfig = new Configuration();
+               jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+               jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
+               jmConfig.setInteger(RestOptions.PORT, 0);
+
+               final RpcService rpcService = 
AkkaRpcServiceUtils.createRpcService("localhost", 0, jmConfig);
+               final int jobManagerPort = rpcService.getPort();
+               jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+
+               final SessionDispatcherResourceManagerComponentFactory 
resourceManagerComponentFactory = new 
SessionDispatcherResourceManagerComponentFactory(
+                       StandaloneResourceManagerFactory.INSTANCE);
+               DispatcherResourceManagerComponent<?> 
dispatcherResourceManagerComponent = null;
+
+               try (final HighAvailabilityServices haServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
+                               jmConfig,
+                               TestingUtils.defaultExecutor(),
+                               
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION)) {
 
-               try {
                        // check that we run this test only if the java command
                        // is available on this machine
                        String javaCommand = getJavaCommandPath();
@@ -96,36 +124,22 @@ public void testCancelingOnProcessFailure() throws 
Exception {
                        tempLogFile.deleteOnExit();
                        CommonTestUtils.printLog4jDebugConfig(tempLogFile);
 
-                       // find a free port to start the JobManager
-                       final int jobManagerPort = NetUtils.getAvailablePort();
-
-                       // start a JobManager
-                       Tuple2<String, Object> localAddress = new 
Tuple2<String, Object>("localhost", jobManagerPort);
-
-                       Configuration jmConfig = new Configuration();
-                       
jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "5 s");
-                       jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, 
"2000 s");
-                       jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 10);
-                       jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
-                       jmConfig.setString(JobManagerOptions.ADDRESS, 
localAddress._1());
-                       jmConfig.setInteger(JobManagerOptions.PORT, 
jobManagerPort);
-
-                       highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
-                               jmConfig,
-                               TestingUtils.defaultExecutor(),
-                               
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-                       jmActorSystem = AkkaUtils.createActorSystem(jmConfig, 
new Some<>(localAddress));
-                       ActorRef jmActor = JobManager.startJobManagerActors(
+                       dispatcherResourceManagerComponent = 
resourceManagerComponentFactory.create(
                                jmConfig,
-                               jmActorSystem,
-                               TestingUtils.defaultExecutor(),
-                               TestingUtils.defaultExecutor(),
-                               highAvailabilityServices,
+                               rpcService,
+                               haServices,
+                               blobServerResource.getBlobServer(),
+                               new HeartbeatServices(100L, 1000L),
                                NoOpMetricRegistry.INSTANCE,
-                               Option.empty(),
-                               JobManager.class,
-                               MemoryArchivist.class)._1();
+                               new MemoryArchivedExecutionGraphStore(),
+                               fatalErrorHandler);
+
+                       // update the rest ports
+                       final int restPort = dispatcherResourceManagerComponent
+                               .getWebMonitorEndpoint()
+                               .getServerAddress()
+                               .getPort();
+                       jmConfig.setInteger(RestOptions.PORT, restPort);
 
                        // the TaskManager java command
                        String[] command = new String[] {
@@ -134,7 +148,7 @@ public void testCancelingOnProcessFailure() throws 
Exception {
                                        "-Dlog4j.configuration=file:" + 
tempLogFile.getAbsolutePath(),
                                        "-Xms80m", "-Xmx80m",
                                        "-classpath", getCurrentClasspath(),
-                                       
AbstractTaskManagerProcessFailureRecoveryTest.TaskManagerProcessEntryPoint.class.getName(),
+                                       
AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName(),
                                        String.valueOf(jobManagerPort)
                        };
 
@@ -142,21 +156,14 @@ public void testCancelingOnProcessFailure() throws 
Exception {
                        taskManagerProcess = new 
ProcessBuilder(command).start();
                        new 
CommonTestUtils.PipeForwarder(taskManagerProcess.getErrorStream(), 
processOutput);
 
-                       // we wait for the JobManager to have the two 
TaskManagers available
-                       // since some of the CI environments are very hostile, 
we need to give this a lot of time (2 minutes)
-                       waitUntilNumTaskManagersAreRegistered(jmActor, 1, 
120000);
-
                        final Throwable[] errorRef = new Throwable[1];
 
-                       final Configuration configuration = new Configuration();
-                       configuration.setString(CoreOptions.MODE, 
CoreOptions.LEGACY_MODE);
-
                        // start the test program, which infinitely blocks
                        Runnable programRunner = new Runnable() {
                                @Override
                                public void run() {
                                        try {
-                                               ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, 
configuration);
+                                               ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("localhost", restPort, new 
Configuration());
                                                env.setParallelism(2);
                                                
env.setRestartStrategy(RestartStrategies.noRestart());
                                                
env.getConfig().disableSysoutLogging();
@@ -187,15 +194,30 @@ public Long map(Long value) throws Exception {
                        Thread programThread = new Thread(programRunner);
 
                        // kill the TaskManager
+                       programThread.start();
+
+                       final LeaderConnectionInfo leaderConnectionInfo = 
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(),
 Time.seconds(10L));
+
+                       final DispatcherGateway dispatcherGateway = 
rpcService.connect(
+                               leaderConnectionInfo.getAddress(),
+                               
DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()),
+                               DispatcherGateway.class).get();
+
+                       waitUntilAllSlotsAreUsed(dispatcherGateway, timeout);
+
+                       clusterClient = new RestClusterClient<>(jmConfig, 
"standalone");
+
+                       final Collection<JobID> jobIds = 
waitForRunningJobs(clusterClient, timeout);
+
+                       assertThat(jobIds, hasSize(1));
+                       final JobID jobId = jobIds.iterator().next();
+
+                       // kill the TaskManager after the job started to run
                        taskManagerProcess.destroy();
                        taskManagerProcess = null;
 
-                       // immediately submit the job. this should hit the case
-                       // where the JobManager still thinks it has the 
TaskManager and tries to send it tasks
-                       programThread.start();
-
                        // try to cancel the job
-                       cancelRunningJob(jmActor);
+                       clusterClient.cancel(jobId);
 
                        // we should see a failure within reasonable time (10s 
is the ask timeout).
                        // since the CI environment is often slow, we 
conservatively give it up to 2 minutes,
@@ -223,88 +245,42 @@ public Long map(Long value) throws Exception {
                        if (taskManagerProcess != null) {
                                taskManagerProcess.destroy();
                        }
-                       if (jmActorSystem != null) {
-                               jmActorSystem.shutdown();
-                       }
-
-                       if (highAvailabilityServices != null) {
-                               
highAvailabilityServices.closeAndCleanupAllData();
-                       }
-               }
-       }
-
-       private void cancelRunningJob(ActorRef jobManager) throws Exception {
-               final FiniteDuration askTimeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
-
-               // try at most for 30 seconds
-               final long deadline = System.currentTimeMillis() + 30000;
-
-               JobID jobId = null;
-
-               do {
-                       Future<Object> response = Patterns.ask(jobManager,
-                                       
JobManagerMessages.getRequestRunningJobsStatus(), new Timeout(askTimeout));
-
-                       Object result;
-                       try {
-                               result = Await.result(response, askTimeout);
+                       if (clusterClient != null) {
+                               clusterClient.shutdown();
                        }
-                       catch (Exception e) {
-                               throw new Exception("Could not retrieve running 
jobs from the JobManager.", e);
+                       if (dispatcherResourceManagerComponent != null) {
+                               dispatcherResourceManagerComponent.close();
                        }
 
-                       if (result instanceof 
JobManagerMessages.RunningJobsStatus) {
-
-                               List<JobStatusMessage> jobs = 
((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
+                       fatalErrorHandler.rethrowError();
 
-                               if (jobs.size() == 1) {
-                                       jobId = jobs.get(0).getJobId();
-                                       break;
-                               }
-                       }
-               }
-               while (System.currentTimeMillis() < deadline);
-
-               if (jobId == null) {
-                       // we never found it running, must have failed already
-                       return;
+                       RpcUtils.terminateRpcService(rpcService, 
Time.seconds(10L));
                }
-
-               // tell the JobManager to cancel the job
-               jobManager.tell(
-                       new JobManagerMessages.LeaderSessionMessage(
-                               HighAvailabilityServices.DEFAULT_LEADER_ID,
-                               new JobManagerMessages.CancelJob(jobId)),
-                       ActorRef.noSender());
        }
 
-       private void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, 
int numExpected, long maxDelay)
-                       throws Exception {
-               final long deadline = System.currentTimeMillis() + maxDelay;
-               while (true) {
-                       long remaining = deadline - System.currentTimeMillis();
-                       if (remaining <= 0) {
-                               fail("The TaskManagers did not register within 
the expected time (" + maxDelay + "msecs)");
-                       }
-
-                       FiniteDuration timeout = new FiniteDuration(remaining, 
TimeUnit.MILLISECONDS);
+       private void waitUntilAllSlotsAreUsed(DispatcherGateway 
dispatcherGateway, Time timeout) throws ExecutionException, 
InterruptedException {
+               FutureUtils.retrySuccesfulWithDelay(
+                       () -> dispatcherGateway.requestClusterOverview(timeout),
+                       Time.milliseconds(50L),
+                       
Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
+                       clusterOverview -> 
clusterOverview.getNumTaskManagersConnected() >= 1 &&
+                               clusterOverview.getNumSlotsAvailable() == 0 &&
+                               clusterOverview.getNumSlotsTotal() == 2,
+                       TestingUtils.defaultScheduledExecutor())
+                       .get();
+       }
 
-                       try {
-                               Future<?> result = Patterns.ask(jobManager,
-                                               
JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-                                               new Timeout(timeout));
-                               Integer numTMs = (Integer) Await.result(result, 
timeout);
-                               if (numTMs == numExpected) {
-                                       break;
-                               }
-                       }
-                       catch (TimeoutException e) {
-                               // ignore and retry
-                       }
-                       catch (ClassCastException e) {
-                               fail("Wrong response: " + e.getMessage());
-                       }
-               }
+       private Collection<JobID> waitForRunningJobs(ClusterClient<?> 
clusterClient, Time timeout) throws ExecutionException, InterruptedException {
+               return FutureUtils.retrySuccesfulWithDelay(
+                               
CheckedSupplier.unchecked(clusterClient::listJobs),
+                               Time.milliseconds(50L),
+                               
Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
+                               jobs -> !jobs.isEmpty(),
+                               TestingUtils.defaultScheduledExecutor())
+                       .get()
+                       .stream()
+                       .map(JobStatusMessage::getJobId)
+                       .collect(Collectors.toList());
        }
 
        private void printProcessLog(String processName, String log) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
index 7dc6f0cf1d6..4815c4938f7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
@@ -25,7 +25,6 @@
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -68,10 +67,9 @@ public 
TaskManagerProcessFailureBatchRecoveryITCase(ExecutionMode executionMode)
        public void testTaskManagerFailure(int jobManagerPort, final File 
coordinateDir) throws Exception {
 
                final Configuration configuration = new Configuration();
-               configuration.setString(CoreOptions.MODE, 
CoreOptions.LEGACY_MODE);
                ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, 
configuration);
                env.setParallelism(PARALLELISM);
-               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
10000));
+               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
0L));
                env.getConfig().setExecutionMode(executionMode);
                env.getConfig().disableSysoutLogging();
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
index 766a7993c45..fbf6b5b71e3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
@@ -23,7 +23,6 @@
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -67,7 +66,6 @@ public void testTaskManagerFailure(int jobManagerPort, final 
File coordinateDir)
                final File tempCheckpointDir = tempFolder.newFolder();
 
                final Configuration configuration = new Configuration();
-               configuration.setString(CoreOptions.MODE, 
CoreOptions.LEGACY_MODE);
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
                        "localhost",
                        jobManagerPort,
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
index d6a029f21b9..75204d9fa21 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
@@ -23,7 +23,6 @@
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.FlinkException;
@@ -40,8 +39,6 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend;
@@ -53,7 +50,6 @@
  *
  * @see org.apache.flink.client.cli.CliFrontendRunTest
  */
-@RunWith(Parameterized.class)
 public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
 
        @Rule
@@ -74,7 +70,6 @@ public void testRun() throws Exception {
                String testJarPath = 
getTestJarPath("BatchWordCount.jar").getAbsolutePath();
 
                Configuration configuration = new Configuration();
-               configuration.setString(CoreOptions.MODE, mode);
                configuration.setString(JobManagerOptions.ADDRESS, "localhost");
                configuration.setInteger(JobManagerOptions.PORT, 8081);
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 65f813e52f7..7ba21502e67 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -41,7 +41,6 @@
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
-import org.apache.flink.yarn.LegacyYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
@@ -163,8 +162,6 @@
 
        private final String yarnPropertiesFileLocation;
 
-       private final boolean isNewMode;
-
        private final YarnConfiguration yarnConfiguration;
 
        public FlinkYarnSessionCli(
@@ -185,8 +182,6 @@ public FlinkYarnSessionCli(
                this.configurationDirectory = 
Preconditions.checkNotNull(configurationDirectory);
                this.acceptInteractiveInput = acceptInteractiveInput;
 
-               this.isNewMode = 
configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE);
-
                // Create the command line options
 
                query = new Option(shortPrefix + "q", longPrefix + "query", 
false, "Display available YARN resources (memory, cores)");
@@ -375,10 +370,8 @@ private AbstractYarnClusterDescriptor createDescriptor(
        }
 
        private ClusterSpecification createClusterSpecification(Configuration 
configuration, CommandLine cmd) {
-               if (!isNewMode && !cmd.hasOption(container.getOpt())) { // 
number of containers is required option!
-                       LOG.error("Missing required argument {}", 
container.getOpt());
-                       printUsage();
-                       throw new IllegalArgumentException("Missing required 
argument " + container.getOpt());
+               if (cmd.hasOption(container.getOpt())) { // number of 
containers is required option!
+                       LOG.info("The argument {} is deprecated in will be 
ignored.", container.getOpt());
                }
 
                // TODO: The number of task manager should be deprecated soon
@@ -989,20 +982,11 @@ private AbstractYarnClusterDescriptor 
getClusterDescriptor(
                yarnClient.init(yarnConfiguration);
                yarnClient.start();
 
-               if (isNewMode) {
-                       return new YarnClusterDescriptor(
-                               configuration,
-                               yarnConfiguration,
-                               configurationDirectory,
-                               yarnClient,
-                               false);
-               } else {
-                       return new LegacyYarnClusterDescriptor(
-                               configuration,
-                               yarnConfiguration,
-                               configurationDirectory,
-                               yarnClient,
-                               false);
-               }
+               return new YarnClusterDescriptor(
+                       configuration,
+                       yarnConfiguration,
+                       configurationDirectory,
+                       yarnClient,
+                       false);
        }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to