This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8434d686473a088a876967dc2fe9b5dee0e3169
Author: Till Rohrmann <[email protected]>
AuthorDate: Sat Sep 22 23:16:50 2018 +0200

    [FLINK-10397] Remove CoreOptions#MODE
    
    Removes the MODE option used to switch between the new and legacy mode.
    
    This closes #6752.
---
 docs/_includes/generated/core_configuration.html   |  5 --
 .../org/apache/flink/client/LocalExecutor.java     | 50 ++++++----------
 .../org/apache/flink/client/RemoteExecutor.java    |  8 +--
 .../org/apache/flink/client/cli/CliFrontend.java   | 12 +---
 .../flink/client/cli/CliFrontendTestBase.java      | 26 +-------
 .../apache/flink/configuration/CoreOptions.java    | 22 -------
 .../org/apache/flink/api/scala/FlinkShell.scala    | 34 ++++-------
 .../apache/flink/api/scala/ScalaShellITCase.scala  |  1 -
 .../api/environment/RemoteStreamEnvironment.java   |  8 +--
 .../environment/StreamExecutionEnvironment.java    |  9 +--
 .../test/operators/RemoteEnvironmentITCase.java    | 69 ++++------------------
 .../flink/yarn/CliFrontendRunWithYarnTest.java     |  5 --
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 32 +++-------
 13 files changed, 57 insertions(+), 224 deletions(-)

diff --git a/docs/_includes/generated/core_configuration.html 
b/docs/_includes/generated/core_configuration.html
index 98cca91..4366e8b 100644
--- a/docs/_includes/generated/core_configuration.html
+++ b/docs/_includes/generated/core_configuration.html
@@ -28,11 +28,6 @@
             <td></td>
         </tr>
         <tr>
-            <td><h5>mode</h5></td>
-            <td style="word-wrap: break-word;">"new"</td>
-            <td>Switch to select the execution mode. Possible values are 'new' 
and 'legacy'.</td>
-        </tr>
-        <tr>
             <td><h5>parallelism.default</h5></td>
             <td style="word-wrap: break-word;">1</td>
             <td></td>
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 4e4993a..14d3ee5 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -37,7 +37,6 @@ import 
org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
@@ -125,39 +124,28 @@ public class LocalExecutor extends PlanExecutor {
        }
 
        private JobExecutorService createJobExecutorService(Configuration 
configuration) throws Exception {
-               final JobExecutorService newJobExecutorService;
-               if 
(CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
+               if (!configuration.contains(RestOptions.PORT)) {
+                       configuration.setInteger(RestOptions.PORT, 0);
+               }
 
-                       if (!configuration.contains(RestOptions.PORT)) {
-                               configuration.setInteger(RestOptions.PORT, 0);
-                       }
+               final MiniClusterConfiguration miniClusterConfiguration = new 
MiniClusterConfiguration.Builder()
+                       .setConfiguration(configuration)
+                       .setNumTaskManagers(
+                               configuration.getInteger(
+                                       
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
+                                       
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
+                       .setRpcServiceSharing(RpcServiceSharing.SHARED)
+                       .setNumSlotsPerTaskManager(
+                               configuration.getInteger(
+                                       TaskManagerOptions.NUM_TASK_SLOTS, 1))
+                       .build();
 
-                       final MiniClusterConfiguration miniClusterConfiguration 
= new MiniClusterConfiguration.Builder()
-                               .setConfiguration(configuration)
-                               .setNumTaskManagers(
-                                       configuration.getInteger(
-                                               
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
-                                               
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
-                               .setRpcServiceSharing(RpcServiceSharing.SHARED)
-                               .setNumSlotsPerTaskManager(
-                                       configuration.getInteger(
-                                               
TaskManagerOptions.NUM_TASK_SLOTS, 1))
-                               .build();
-
-                       final MiniCluster miniCluster = new 
MiniCluster(miniClusterConfiguration);
-                       miniCluster.start();
-
-                       configuration.setInteger(RestOptions.PORT, 
miniCluster.getRestAddress().getPort());
-
-                       newJobExecutorService = miniCluster;
-               } else {
-                       final LocalFlinkMiniCluster localFlinkMiniCluster = new 
LocalFlinkMiniCluster(configuration, true);
-                       localFlinkMiniCluster.start();
-
-                       newJobExecutorService = localFlinkMiniCluster;
-               }
+               final MiniCluster miniCluster = new 
MiniCluster(miniClusterConfiguration);
+               miniCluster.start();
+
+               configuration.setInteger(RestOptions.PORT, 
miniCluster.getRestAddress().getPort());
 
-               return newJobExecutorService;
+               return miniCluster;
        }
 
        @Override
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 0a2f1b4..a4424eb 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -24,10 +24,8 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.optimizer.DataStatistics;
@@ -151,11 +149,7 @@ public class RemoteExecutor extends PlanExecutor {
        public void start() throws Exception {
                synchronized (lock) {
                        if (client == null) {
-                               if 
(CoreOptions.LEGACY_MODE.equals(clientConfiguration.getString(CoreOptions.MODE)))
 {
-                                       client = new 
StandaloneClusterClient(clientConfiguration);
-                               } else {
-                                       client = new 
RestClusterClient<>(clientConfiguration, "RemoteExecutor");
-                               }
+                               client = new 
RestClusterClient<>(clientConfiguration, "RemoteExecutor");
                                
client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
                        }
                        else {
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index c7e6344..c7c664d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -123,8 +123,6 @@ public class CliFrontend {
 
        private final int defaultParallelism;
 
-       private final boolean isNewMode;
-
        public CliFrontend(
                        Configuration configuration,
                        List<CustomCommandLine<?>> customCommandLines) throws 
Exception {
@@ -147,8 +145,6 @@ public class CliFrontend {
 
                this.clientTimeout = 
AkkaUtils.getClientTimeout(this.configuration);
                this.defaultParallelism = 
configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
-
-               this.isNewMode = 
CoreOptions.NEW_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE));
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -233,7 +229,7 @@ public class CliFrontend {
                        final ClusterClient<T> client;
 
                        // directly deploy the job if the cluster is started in 
job mode and detached
-                       if (isNewMode && clusterId == null && 
runOptions.getDetachedMode()) {
+                       if (clusterId == null && runOptions.getDetachedMode()) {
                                int parallelism = runOptions.getParallelism() 
== -1 ? defaultParallelism : runOptions.getParallelism();
 
                                final JobGraph jobGraph = 
PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
@@ -1200,11 +1196,7 @@ public class CliFrontend {
                        LOG.warn("Could not load CLI class {}.", 
flinkYarnSessionCLI, e);
                }
 
-               if 
(configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE))
 {
-                       customCommandLines.add(new DefaultCLI(configuration));
-               } else {
-                       customCommandLines.add(new LegacyCLI(configuration));
-               }
+               customCommandLines.add(new DefaultCLI(configuration));
 
                return customCommandLines;
        }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
index 3c24376..8ff426c 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
@@ -19,43 +19,21 @@
 package org.apache.flink.client.cli;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
 /**
- * Base test class for {@link CliFrontend} tests that wraps the new vs. legacy 
mode.
+ * Base test class for {@link CliFrontend} tests.
  */
-@RunWith(Parameterized.class)
 public abstract class CliFrontendTestBase extends TestLogger {
-       @Parameterized.Parameter
-       public String mode;
-
-       @Parameterized.Parameters(name = "Mode = {0}")
-       public static List<String> parameters() {
-               return Arrays.asList(CoreOptions.LEGACY_MODE, 
CoreOptions.NEW_MODE);
-       }
 
        protected Configuration getConfiguration() {
                final Configuration configuration = GlobalConfiguration
                        .loadConfiguration(CliFrontendTestUtils.getConfigDir());
-               configuration.setString(CoreOptions.MODE, mode);
                return configuration;
        }
 
        static AbstractCustomCommandLine<?> getCli(Configuration configuration) 
{
-               switch (configuration.getString(CoreOptions.MODE)) {
-                       case CoreOptions.LEGACY_MODE:
-                               return new LegacyCLI(configuration);
-                       case CoreOptions.NEW_MODE:
-                               return new DefaultCLI(configuration);
-               }
-               throw new IllegalStateException();
+               return new DefaultCLI(configuration);
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 9ae807e..4c928fe 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -304,26 +304,4 @@ public class CoreOptions {
        public static ConfigOption<Long> 
fileSystemConnectionLimitStreamInactivityTimeout(String scheme) {
                return ConfigOptions.key("fs." + scheme + 
".limit.stream-timeout").defaultValue(0L);
        }
-
-       // 
------------------------------------------------------------------------
-       //  Distributed architecture
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Constant value for the new execution mode.
-        */
-       public static final String NEW_MODE = "new";
-
-       /**
-        * Constant value for the old execution mode.
-        */
-       public static final String LEGACY_MODE = "legacy";
-
-       /**
-        * Switch to select the execution mode. Possible values are {@link 
CoreOptions#NEW_MODE}
-        * and {@link CoreOptions#LEGACY_MODE}.
-        */
-       public static final ConfigOption<String> MODE = key("mode")
-               .defaultValue(NEW_MODE)
-               .withDescription("Switch to select the execution mode. Possible 
values are 'new' and 'legacy'.");
 }
diff --git 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index c04e845..d493495 100644
--- 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -139,36 +139,24 @@ object FlinkShell {
     }
   }
 
-  private type LocalCluster = Either[StandaloneMiniCluster, MiniCluster]
-
   def fetchConnectionInfo(
     configuration: Configuration,
     config: Config
-  ): (String, Int, Option[Either[LocalCluster , ClusterClient[_]]]) = {
+  ): (String, Int, Option[Either[MiniCluster , ClusterClient[_]]]) = {
     config.executionMode match {
       case ExecutionMode.LOCAL => // Local mode
         val config = configuration
         config.setInteger(JobManagerOptions.PORT, 0)
 
-        val (miniCluster, port) = config.getString(CoreOptions.MODE) match {
-          case CoreOptions.LEGACY_MODE => {
-            val cluster = new StandaloneMiniCluster(config)
-
-            (Left(cluster), cluster.getPort)
-          }
-          case CoreOptions.NEW_MODE => {
-            val miniClusterConfig = new MiniClusterConfiguration.Builder()
-              .setConfiguration(config)
-              .build()
-            val cluster = new MiniCluster(miniClusterConfig)
-            cluster.start()
-
-            (Right(cluster), cluster.getRestAddress.getPort)
-          }
-        }
+        val miniClusterConfig = new MiniClusterConfiguration.Builder()
+          .setConfiguration(config)
+          .build()
+        val cluster = new MiniCluster(miniClusterConfig)
+        cluster.start()
+        val port = cluster.getRestAddress.getPort
 
         println(s"\nStarting local Flink cluster (host: localhost, port: 
$port).\n")
-        ("localhost", port, Some(Left(miniCluster)))
+        ("localhost", port, Some(Left(cluster)))
 
       case ExecutionMode.REMOTE => // Remote mode
         if (config.host.isEmpty || config.port.isEmpty) {
@@ -211,8 +199,7 @@ object FlinkShell {
     val (repl, cluster) = try {
       val (host, port, cluster) = fetchConnectionInfo(configuration, config)
       val conf = cluster match {
-        case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration
-        case Some(Left(Right(_))) => configuration
+        case Some(Left(_)) => configuration
         case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
         case None => configuration
       }
@@ -242,8 +229,7 @@ object FlinkShell {
     } finally {
       repl.closeInterpreter()
       cluster match {
-        case Some(Left(Left(legacyMiniCluster))) => legacyMiniCluster.close()
-        case Some(Left(Right(newMiniCluster))) => newMiniCluster.close()
+        case Some(Left(miniCluster)) => miniCluster.close()
         case Some(Right(yarnCluster)) => yarnCluster.shutdown()
         case _ =>
       }
diff --git 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 54bb16f..731bbf6 100644
--- 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -319,7 +319,6 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-    configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
     // set to different than default so not to interfere with 
ScalaShellLocalStartupITCase
     configuration.setInteger(RestOptions.PORT, 8082)
     val miniConfig = new MiniClusterConfiguration.Builder()
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 9c36dab..0af6d93 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -24,10 +24,8 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -206,11 +204,7 @@ public class RemoteStreamEnvironment extends 
StreamExecutionEnvironment {
 
                final ClusterClient<?> client;
                try {
-                       if 
(CoreOptions.LEGACY_MODE.equals(configuration.getString(CoreOptions.MODE))) {
-                               client = new 
StandaloneClusterClient(configuration);
-                       } else {
-                               client = new RestClusterClient<>(configuration, 
"RemoteStreamEnvironment");
-                       }
+                       client = new RestClusterClient<>(configuration, 
"RemoteStreamEnvironment");
                }
                catch (Exception e) {
                        throw new ProgramInvocationException("Cannot establish 
connection to JobManager: " + e.getMessage(),
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b7259de..d4e14f0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -47,7 +47,6 @@ import 
org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -1653,13 +1652,9 @@ public abstract class StreamExecutionEnvironment {
        public static LocalStreamEnvironment createLocalEnvironment(int 
parallelism, Configuration configuration) {
                final LocalStreamEnvironment currentEnvironment;
 
-               if 
(CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
-                       currentEnvironment = new 
LocalStreamEnvironment(configuration);
-               } else {
-                       currentEnvironment = new 
LegacyLocalStreamEnvironment(configuration);
-               }
-
+               currentEnvironment = new LocalStreamEnvironment(configuration);
                currentEnvironment.setParallelism(parallelism);
+
                return currentEnvironment;
        }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index c2d6341..451108b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -20,37 +20,27 @@ package org.apache.flink.test.operators;
 
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.runtime.minicluster.StandaloneMiniCluster;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assume.assumeTrue;
 
 /**
  * Integration tests for {@link org.apache.flink.api.java.RemoteEnvironment}.
@@ -78,31 +68,22 @@ public class RemoteEnvironmentITCase extends TestLogger {
        public static void setupCluster() throws Exception {
                configuration = new Configuration();
 
-               if 
(CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
-                       configuration.setInteger(WebOptions.PORT, 0);
-                       final MiniCluster miniCluster = new MiniCluster(
-                               new MiniClusterConfiguration.Builder()
-                                       .setConfiguration(configuration)
-                                       .setNumSlotsPerTaskManager(TM_SLOTS)
-                                       .build());
+               configuration.setInteger(WebOptions.PORT, 0);
+               final MiniCluster miniCluster = new MiniCluster(
+                       new MiniClusterConfiguration.Builder()
+                               .setConfiguration(configuration)
+                               .setNumSlotsPerTaskManager(TM_SLOTS)
+                               .build());
 
-                       miniCluster.start();
+               miniCluster.start();
 
-                       final URI uri = miniCluster.getRestAddress();
-                       hostname = uri.getHost();
-                       port = uri.getPort();
+               final URI uri = miniCluster.getRestAddress();
+               hostname = uri.getHost();
+               port = uri.getPort();
 
-                       configuration.setInteger(WebOptions.PORT, port);
+               configuration.setInteger(WebOptions.PORT, port);
 
-                       resource = miniCluster;
-               } else {
-                       
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, TM_SLOTS);
-                       final StandaloneMiniCluster standaloneMiniCluster = new 
StandaloneMiniCluster(configuration);
-                       hostname = standaloneMiniCluster.getHostname();
-                       port = standaloneMiniCluster.getPort();
-
-                       resource = standaloneMiniCluster;
-               }
+               resource = miniCluster;
        }
 
        @AfterClass
@@ -111,32 +92,6 @@ public class RemoteEnvironmentITCase extends TestLogger {
        }
 
        /**
-        * Ensure that that Akka configuration parameters can be set.
-        */
-       @Test(expected = FlinkException.class)
-       public void testInvalidAkkaConfiguration() throws Throwable {
-               
assumeTrue(CoreOptions.LEGACY_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE)));
-               Configuration config = new Configuration();
-               config.setString(AkkaOptions.STARTUP_TIMEOUT, 
INVALID_STARTUP_TIMEOUT);
-
-               final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
-                               hostname,
-                               port,
-                               config
-               );
-               env.getConfig().disableSysoutLogging();
-
-               DataSet<String> result = env.createInput(new 
TestNonRichInputFormat());
-               result.output(new LocalCollectionOutputFormat<>(new 
ArrayList<String>()));
-               try {
-                       env.execute();
-                       Assert.fail("Program should not run successfully, cause 
of invalid akka settings.");
-               } catch (ProgramInvocationException ex) {
-                       throw ex.getCause();
-               }
-       }
-
-       /**
         * Ensure that the program parallelism can be set even if the 
configuration is supplied.
         */
        @Test
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
index d6a029f..75204d9 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.client.cli.CliFrontendTestUtils;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.FlinkException;
@@ -40,8 +39,6 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend;
@@ -53,7 +50,6 @@ import static 
org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
  *
  * @see org.apache.flink.client.cli.CliFrontendRunTest
  */
-@RunWith(Parameterized.class)
 public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
 
        @Rule
@@ -74,7 +70,6 @@ public class CliFrontendRunWithYarnTest extends 
CliFrontendTestBase {
                String testJarPath = 
getTestJarPath("BatchWordCount.jar").getAbsolutePath();
 
                Configuration configuration = new Configuration();
-               configuration.setString(CoreOptions.MODE, mode);
                configuration.setString(JobManagerOptions.ADDRESS, "localhost");
                configuration.setInteger(JobManagerOptions.PORT, 8081);
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 65f813e..7ba2150 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -41,7 +41,6 @@ import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
-import org.apache.flink.yarn.LegacyYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
@@ -163,8 +162,6 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
 
        private final String yarnPropertiesFileLocation;
 
-       private final boolean isNewMode;
-
        private final YarnConfiguration yarnConfiguration;
 
        public FlinkYarnSessionCli(
@@ -185,8 +182,6 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
                this.configurationDirectory = 
Preconditions.checkNotNull(configurationDirectory);
                this.acceptInteractiveInput = acceptInteractiveInput;
 
-               this.isNewMode = 
configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE);
-
                // Create the command line options
 
                query = new Option(shortPrefix + "q", longPrefix + "query", 
false, "Display available YARN resources (memory, cores)");
@@ -375,10 +370,8 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
        }
 
        private ClusterSpecification createClusterSpecification(Configuration 
configuration, CommandLine cmd) {
-               if (!isNewMode && !cmd.hasOption(container.getOpt())) { // 
number of containers is required option!
-                       LOG.error("Missing required argument {}", 
container.getOpt());
-                       printUsage();
-                       throw new IllegalArgumentException("Missing required 
argument " + container.getOpt());
+               if (cmd.hasOption(container.getOpt())) { // number of 
containers is required option!
+                       LOG.info("The argument {} is deprecated in will be 
ignored.", container.getOpt());
                }
 
                // TODO: The number of task manager should be deprecated soon
@@ -989,20 +982,11 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
                yarnClient.init(yarnConfiguration);
                yarnClient.start();
 
-               if (isNewMode) {
-                       return new YarnClusterDescriptor(
-                               configuration,
-                               yarnConfiguration,
-                               configurationDirectory,
-                               yarnClient,
-                               false);
-               } else {
-                       return new LegacyYarnClusterDescriptor(
-                               configuration,
-                               yarnConfiguration,
-                               configurationDirectory,
-                               yarnClient,
-                               false);
-               }
+               return new YarnClusterDescriptor(
+                       configuration,
+                       yarnConfiguration,
+                       configurationDirectory,
+                       yarnClient,
+                       false);
        }
 }

Reply via email to