This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a8434d686473a088a876967dc2fe9b5dee0e3169 Author: Till Rohrmann <[email protected]> AuthorDate: Sat Sep 22 23:16:50 2018 +0200 [FLINK-10397] Remove CoreOptions#MODE Removes the MODE option used to switch between the new and legacy mode. This closes #6752. --- docs/_includes/generated/core_configuration.html | 5 -- .../org/apache/flink/client/LocalExecutor.java | 50 ++++++---------- .../org/apache/flink/client/RemoteExecutor.java | 8 +-- .../org/apache/flink/client/cli/CliFrontend.java | 12 +--- .../flink/client/cli/CliFrontendTestBase.java | 26 +------- .../apache/flink/configuration/CoreOptions.java | 22 ------- .../org/apache/flink/api/scala/FlinkShell.scala | 34 ++++------- .../apache/flink/api/scala/ScalaShellITCase.scala | 1 - .../api/environment/RemoteStreamEnvironment.java | 8 +-- .../environment/StreamExecutionEnvironment.java | 9 +-- .../test/operators/RemoteEnvironmentITCase.java | 69 ++++------------------ .../flink/yarn/CliFrontendRunWithYarnTest.java | 5 -- .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 32 +++------- 13 files changed, 57 insertions(+), 224 deletions(-) diff --git a/docs/_includes/generated/core_configuration.html b/docs/_includes/generated/core_configuration.html index 98cca91..4366e8b 100644 --- a/docs/_includes/generated/core_configuration.html +++ b/docs/_includes/generated/core_configuration.html @@ -28,11 +28,6 @@ <td></td> </tr> <tr> - <td><h5>mode</h5></td> - <td style="word-wrap: break-word;">"new"</td> - <td>Switch to select the execution mode. Possible values are 'new' and 'legacy'.</td> - </tr> - <tr> <td><h5>parallelism.default</h5></td> <td style="word-wrap: break-word;">1</td> <td></td> diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java index 4e4993a..14d3ee5 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java @@ -37,7 +37,6 @@ import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.minicluster.JobExecutorService; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.runtime.minicluster.RpcServiceSharing; @@ -125,39 +124,28 @@ public class LocalExecutor extends PlanExecutor { } private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception { - final JobExecutorService newJobExecutorService; - if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) { + if (!configuration.contains(RestOptions.PORT)) { + configuration.setInteger(RestOptions.PORT, 0); + } - if (!configuration.contains(RestOptions.PORT)) { - configuration.setInteger(RestOptions.PORT, 0); - } + final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .setNumTaskManagers( + configuration.getInteger( + ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, + ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)) + .setRpcServiceSharing(RpcServiceSharing.SHARED) + .setNumSlotsPerTaskManager( + configuration.getInteger( + TaskManagerOptions.NUM_TASK_SLOTS, 1)) + .build(); - final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() - .setConfiguration(configuration) - .setNumTaskManagers( - configuration.getInteger( - ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, - ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)) - .setRpcServiceSharing(RpcServiceSharing.SHARED) - .setNumSlotsPerTaskManager( - configuration.getInteger( - TaskManagerOptions.NUM_TASK_SLOTS, 1)) - .build(); - - final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration); - miniCluster.start(); - - configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort()); - - newJobExecutorService = miniCluster; - } else { - final LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true); - localFlinkMiniCluster.start(); - - newJobExecutorService = localFlinkMiniCluster; - } + final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration); + miniCluster.start(); + + configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort()); - return newJobExecutorService; + return miniCluster; } @Override diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java index 0a2f1b4..a4424eb 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java @@ -24,10 +24,8 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.PlanExecutor; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.JobWithJars; -import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.optimizer.DataStatistics; @@ -151,11 +149,7 @@ public class RemoteExecutor extends PlanExecutor { public void start() throws Exception { synchronized (lock) { if (client == null) { - if (CoreOptions.LEGACY_MODE.equals(clientConfiguration.getString(CoreOptions.MODE))) { - client = new StandaloneClusterClient(clientConfiguration); - } else { - client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor"); - } + client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor"); client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution()); } else { diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index c7e6344..c7c664d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -123,8 +123,6 @@ public class CliFrontend { private final int defaultParallelism; - private final boolean isNewMode; - public CliFrontend( Configuration configuration, List<CustomCommandLine<?>> customCommandLines) throws Exception { @@ -147,8 +145,6 @@ public class CliFrontend { this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration); this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); - - this.isNewMode = CoreOptions.NEW_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE)); } // -------------------------------------------------------------------------------------------- @@ -233,7 +229,7 @@ public class CliFrontend { final ClusterClient<T> client; // directly deploy the job if the cluster is started in job mode and detached - if (isNewMode && clusterId == null && runOptions.getDetachedMode()) { + if (clusterId == null && runOptions.getDetachedMode()) { int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism(); final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism); @@ -1200,11 +1196,7 @@ public class CliFrontend { LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e); } - if (configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE)) { - customCommandLines.add(new DefaultCLI(configuration)); - } else { - customCommandLines.add(new LegacyCLI(configuration)); - } + customCommandLines.add(new DefaultCLI(configuration)); return customCommandLines; } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java index 3c24376..8ff426c 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java @@ -19,43 +19,21 @@ package org.apache.flink.client.cli; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.util.TestLogger; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.List; - /** - * Base test class for {@link CliFrontend} tests that wraps the new vs. legacy mode. + * Base test class for {@link CliFrontend} tests. */ -@RunWith(Parameterized.class) public abstract class CliFrontendTestBase extends TestLogger { - @Parameterized.Parameter - public String mode; - - @Parameterized.Parameters(name = "Mode = {0}") - public static List<String> parameters() { - return Arrays.asList(CoreOptions.LEGACY_MODE, CoreOptions.NEW_MODE); - } protected Configuration getConfiguration() { final Configuration configuration = GlobalConfiguration .loadConfiguration(CliFrontendTestUtils.getConfigDir()); - configuration.setString(CoreOptions.MODE, mode); return configuration; } static AbstractCustomCommandLine<?> getCli(Configuration configuration) { - switch (configuration.getString(CoreOptions.MODE)) { - case CoreOptions.LEGACY_MODE: - return new LegacyCLI(configuration); - case CoreOptions.NEW_MODE: - return new DefaultCLI(configuration); - } - throw new IllegalStateException(); + return new DefaultCLI(configuration); } } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java index 9ae807e..4c928fe 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java @@ -304,26 +304,4 @@ public class CoreOptions { public static ConfigOption<Long> fileSystemConnectionLimitStreamInactivityTimeout(String scheme) { return ConfigOptions.key("fs." + scheme + ".limit.stream-timeout").defaultValue(0L); } - - // ------------------------------------------------------------------------ - // Distributed architecture - // ------------------------------------------------------------------------ - - /** - * Constant value for the new execution mode. - */ - public static final String NEW_MODE = "new"; - - /** - * Constant value for the old execution mode. - */ - public static final String LEGACY_MODE = "legacy"; - - /** - * Switch to select the execution mode. Possible values are {@link CoreOptions#NEW_MODE} - * and {@link CoreOptions#LEGACY_MODE}. - */ - public static final ConfigOption<String> MODE = key("mode") - .defaultValue(NEW_MODE) - .withDescription("Switch to select the execution mode. Possible values are 'new' and 'legacy'."); } diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index c04e845..d493495 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -139,36 +139,24 @@ object FlinkShell { } } - private type LocalCluster = Either[StandaloneMiniCluster, MiniCluster] - def fetchConnectionInfo( configuration: Configuration, config: Config - ): (String, Int, Option[Either[LocalCluster , ClusterClient[_]]]) = { + ): (String, Int, Option[Either[MiniCluster , ClusterClient[_]]]) = { config.executionMode match { case ExecutionMode.LOCAL => // Local mode val config = configuration config.setInteger(JobManagerOptions.PORT, 0) - val (miniCluster, port) = config.getString(CoreOptions.MODE) match { - case CoreOptions.LEGACY_MODE => { - val cluster = new StandaloneMiniCluster(config) - - (Left(cluster), cluster.getPort) - } - case CoreOptions.NEW_MODE => { - val miniClusterConfig = new MiniClusterConfiguration.Builder() - .setConfiguration(config) - .build() - val cluster = new MiniCluster(miniClusterConfig) - cluster.start() - - (Right(cluster), cluster.getRestAddress.getPort) - } - } + val miniClusterConfig = new MiniClusterConfiguration.Builder() + .setConfiguration(config) + .build() + val cluster = new MiniCluster(miniClusterConfig) + cluster.start() + val port = cluster.getRestAddress.getPort println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") - ("localhost", port, Some(Left(miniCluster))) + ("localhost", port, Some(Left(cluster))) case ExecutionMode.REMOTE => // Remote mode if (config.host.isEmpty || config.port.isEmpty) { @@ -211,8 +199,7 @@ object FlinkShell { val (repl, cluster) = try { val (host, port, cluster) = fetchConnectionInfo(configuration, config) val conf = cluster match { - case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration - case Some(Left(Right(_))) => configuration + case Some(Left(_)) => configuration case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration case None => configuration } @@ -242,8 +229,7 @@ object FlinkShell { } finally { repl.closeInterpreter() cluster match { - case Some(Left(Left(legacyMiniCluster))) => legacyMiniCluster.close() - case Some(Left(Right(newMiniCluster))) => newMiniCluster.close() + case Some(Left(miniCluster)) => miniCluster.close() case Some(Right(yarnCluster)) => yarnCluster.shutdown() case _ => } diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala index 54bb16f..731bbf6 100644 --- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala +++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala @@ -319,7 +319,6 @@ object ScalaShellITCase { @BeforeClass def beforeAll(): Unit = { - configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE) // set to different than default so not to interfere with ScalaShellLocalStartupITCase configuration.setInteger(RestOptions.PORT, 8082) val miniConfig = new MiniClusterConfiguration.Builder() diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java index 9c36dab..0af6d93 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java @@ -24,10 +24,8 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.JobWithJars; import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -206,11 +204,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { final ClusterClient<?> client; try { - if (CoreOptions.LEGACY_MODE.equals(configuration.getString(CoreOptions.MODE))) { - client = new StandaloneClusterClient(configuration); - } else { - client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment"); - } + client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment"); } catch (Exception e) { throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index b7259de..d4e14f0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -47,7 +47,6 @@ import org.apache.flink.client.program.OptimizerPlanEnvironment; import org.apache.flink.client.program.PreviewPlanEnvironment; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -1653,13 +1652,9 @@ public abstract class StreamExecutionEnvironment { public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) { final LocalStreamEnvironment currentEnvironment; - if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) { - currentEnvironment = new LocalStreamEnvironment(configuration); - } else { - currentEnvironment = new LegacyLocalStreamEnvironment(configuration); - } - + currentEnvironment = new LocalStreamEnvironment(configuration); currentEnvironment.setParallelism(parallelism); + return currentEnvironment; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java index c2d6341..451108b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java @@ -20,37 +20,27 @@ package org.apache.flink.test.operators; import org.apache.flink.api.common.functions.RichMapPartitionFunction; import org.apache.flink.api.common.io.GenericInputFormat; -import org.apache.flink.api.common.operators.util.TestNonRichInputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.LocalCollectionOutputFormat; -import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; -import org.apache.flink.runtime.minicluster.StandaloneMiniCluster; import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.Collector; -import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; import java.net.URI; -import java.util.ArrayList; import java.util.List; import static org.junit.Assert.assertEquals; -import static org.junit.Assume.assumeTrue; /** * Integration tests for {@link org.apache.flink.api.java.RemoteEnvironment}. @@ -78,31 +68,22 @@ public class RemoteEnvironmentITCase extends TestLogger { public static void setupCluster() throws Exception { configuration = new Configuration(); - if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) { - configuration.setInteger(WebOptions.PORT, 0); - final MiniCluster miniCluster = new MiniCluster( - new MiniClusterConfiguration.Builder() - .setConfiguration(configuration) - .setNumSlotsPerTaskManager(TM_SLOTS) - .build()); + configuration.setInteger(WebOptions.PORT, 0); + final MiniCluster miniCluster = new MiniCluster( + new MiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .setNumSlotsPerTaskManager(TM_SLOTS) + .build()); - miniCluster.start(); + miniCluster.start(); - final URI uri = miniCluster.getRestAddress(); - hostname = uri.getHost(); - port = uri.getPort(); + final URI uri = miniCluster.getRestAddress(); + hostname = uri.getHost(); + port = uri.getPort(); - configuration.setInteger(WebOptions.PORT, port); + configuration.setInteger(WebOptions.PORT, port); - resource = miniCluster; - } else { - configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, TM_SLOTS); - final StandaloneMiniCluster standaloneMiniCluster = new StandaloneMiniCluster(configuration); - hostname = standaloneMiniCluster.getHostname(); - port = standaloneMiniCluster.getPort(); - - resource = standaloneMiniCluster; - } + resource = miniCluster; } @AfterClass @@ -111,32 +92,6 @@ public class RemoteEnvironmentITCase extends TestLogger { } /** - * Ensure that that Akka configuration parameters can be set. - */ - @Test(expected = FlinkException.class) - public void testInvalidAkkaConfiguration() throws Throwable { - assumeTrue(CoreOptions.LEGACY_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE))); - Configuration config = new Configuration(); - config.setString(AkkaOptions.STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT); - - final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( - hostname, - port, - config - ); - env.getConfig().disableSysoutLogging(); - - DataSet<String> result = env.createInput(new TestNonRichInputFormat()); - result.output(new LocalCollectionOutputFormat<>(new ArrayList<String>())); - try { - env.execute(); - Assert.fail("Program should not run successfully, cause of invalid akka settings."); - } catch (ProgramInvocationException ex) { - throw ex.getCause(); - } - } - - /** * Ensure that the program parallelism can be set even if the configuration is supplied. */ @Test diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java index d6a029f..75204d9 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java @@ -23,7 +23,6 @@ import org.apache.flink.client.cli.CliFrontendTestUtils; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.util.FlinkException; @@ -40,8 +39,6 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import static junit.framework.TestCase.assertTrue; import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend; @@ -53,7 +50,6 @@ import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath; * * @see org.apache.flink.client.cli.CliFrontendRunTest */ -@RunWith(Parameterized.class) public class CliFrontendRunWithYarnTest extends CliFrontendTestBase { @Rule @@ -74,7 +70,6 @@ public class CliFrontendRunWithYarnTest extends CliFrontendTestBase { String testJarPath = getTestJarPath("BatchWordCount.jar").getAbsolutePath(); Configuration configuration = new Configuration(); - configuration.setString(CoreOptions.MODE, mode); configuration.setString(JobManagerOptions.ADDRESS, "localhost"); configuration.setInteger(JobManagerOptions.PORT, 8081); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 65f813e..7ba2150 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -41,7 +41,6 @@ import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; -import org.apache.flink.yarn.LegacyYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.configuration.YarnConfigOptions; @@ -163,8 +162,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId private final String yarnPropertiesFileLocation; - private final boolean isNewMode; - private final YarnConfiguration yarnConfiguration; public FlinkYarnSessionCli( @@ -185,8 +182,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory); this.acceptInteractiveInput = acceptInteractiveInput; - this.isNewMode = configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE); - // Create the command line options query = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)"); @@ -375,10 +370,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId } private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) { - if (!isNewMode && !cmd.hasOption(container.getOpt())) { // number of containers is required option! - LOG.error("Missing required argument {}", container.getOpt()); - printUsage(); - throw new IllegalArgumentException("Missing required argument " + container.getOpt()); + if (cmd.hasOption(container.getOpt())) { // number of containers is required option! + LOG.info("The argument {} is deprecated in will be ignored.", container.getOpt()); } // TODO: The number of task manager should be deprecated soon @@ -989,20 +982,11 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId yarnClient.init(yarnConfiguration); yarnClient.start(); - if (isNewMode) { - return new YarnClusterDescriptor( - configuration, - yarnConfiguration, - configurationDirectory, - yarnClient, - false); - } else { - return new LegacyYarnClusterDescriptor( - configuration, - yarnConfiguration, - configurationDirectory, - yarnClient, - false); - } + return new YarnClusterDescriptor( + configuration, + yarnConfiguration, + configurationDirectory, + yarnClient, + false); } }
