[FLINK-8872][flip6] fix yarn detached mode command parsing The detached flag if given by "-yd" was not passed correctly into the CliFrontend and resulted in the CLI waiting for submitted jobs to finish instead of detaching from the execution.
[FLINK-8872][yarn] add tests for YARN detached mode command line parsing with CliFrontend - create a test-jar of flink-clients - create CliFrontendRunWithYarnTest based on CliFrontendRunTest that verifies CliFrontend's parsing in conjunction with FlinkYarnSessionCli -> verify detached mode in this test (can be extended further in the future) This closes #5672. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81e1e4c3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81e1e4c3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81e1e4c3 Branch: refs/heads/release-1.5 Commit: 81e1e4c312b467dc64c664e30a7132a9f7d55140 Parents: 4317185 Author: Nico Kruber <n...@data-artisans.com> Authored: Mon Mar 5 18:24:17 2018 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Fri Apr 13 12:17:57 2018 -0700 ---------------------------------------------------------------------- flink-clients/pom.xml | 14 +- .../flink/client/cli/CliFrontendParser.java | 9 ++ .../org/apache/flink/client/cli/DefaultCLI.java | 1 + .../apache/flink/client/cli/ProgramOptions.java | 4 +- .../flink/client/program/ClusterClient.java | 2 +- .../client/program/rest/RestClusterClient.java | 2 +- .../flink/client/cli/CliFrontendRunTest.java | 2 +- flink-yarn-tests/pom.xml | 8 + .../flink/yarn/CliFrontendRunWithYarnTest.java | 148 +++++++++++++++++++ .../flink/yarn/util/FakeClusterClient.java | 79 ++++++++++ .../util/NonDeployingYarnClusterDescriptor.java | 98 ++++++++++++ .../flink/yarn/cli/FlinkYarnSessionCli.java | 22 +-- .../flink/yarn/FlinkYarnSessionCliTest.java | 19 +++ 13 files changed, 392 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-clients/pom.xml ---------------------------------------------------------------------- diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml index 399d080..c7f9e55 100644 --- a/flink-clients/pom.xml +++ b/flink-clients/pom.xml @@ -96,6 +96,18 @@ under the License. <build> <plugins> <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.4</version><!--$NO-MVN-MAN-VER$--> <executions> @@ -120,7 +132,7 @@ under the License. </execution> </executions> </plugin> - <!--Remove the external jar test code from the test-classes directory since it musn't be in the + <!--Remove the external jar test code from the test-classes directory since it mustn't be in the classpath when running the tests to actually test whether the user code class loader is properly used.--> <plugin> http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index 5a6c0ff..1588aac 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -62,6 +62,13 @@ public class CliFrontendParser { public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " + "the job in detached mode"); + /** + * @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for both YARN and non-YARN deployments + */ + @Deprecated + public static final Option YARN_DETACHED_OPTION = new Option("yd", "yarndetached", false, "If present, runs " + + "the job in detached mode (deprecated; use non-YARN specific option instead)"); + static final Option ARGS_OPTION = new Option("a", "arguments", true, "Program arguments. Arguments can also be added without -a, simply as trailing parameters."); @@ -117,6 +124,7 @@ public class CliFrontendParser { LOGGING_OPTION.setRequired(false); DETACHED_OPTION.setRequired(false); + YARN_DETACHED_OPTION.setRequired(false); ARGS_OPTION.setRequired(false); ARGS_OPTION.setArgName("programArgs"); @@ -158,6 +166,7 @@ public class CliFrontendParser { options.addOption(ARGS_OPTION); options.addOption(LOGGING_OPTION); options.addOption(DETACHED_OPTION); + options.addOption(YARN_DETACHED_OPTION); return options; } http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java index 43efc63..e9ed9af 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java @@ -40,6 +40,7 @@ public class DefaultCLI extends AbstractCustomCommandLine<StandaloneClusterId> { @Override public boolean isActive(CommandLine commandLine) { + // always active because we can try to read a JobManager address from the config return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java index df25e67..1acda1b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java @@ -38,6 +38,7 @@ import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION; /** * Base class for command line options that refer to a JAR file program. @@ -112,7 +113,8 @@ public abstract class ProgramOptions extends CommandLineOptions { } stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt()); - detachedMode = line.hasOption(DETACHED_OPTION.getOpt()); + detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption( + YARN_DETACHED_OPTION.getOpt()); if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) { String savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt()); http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index fbaa515..2ef0b2e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -385,7 +385,7 @@ public abstract class ClusterClient<T> { return run(jobWithJars, parallelism, prog.getSavepointSettings()); } else if (prog.isUsingInteractiveMode()) { - log.info("Starting program in interactive mode"); + log.info("Starting program in interactive mode (detached: {})", isDetached()); final List<URL> libraries; if (hasUserJarsInClassPath(prog.getAllLibraries())) { http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 4a4f993..a6f676e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -235,7 +235,7 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster @Override public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { - log.info("Submitting job {}.", jobGraph.getJobID()); + log.info("Submitting job {} (detached: {}).", jobGraph.getJobID(), isDetached()); final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph); http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java index efa6a39..ba51b69 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java @@ -144,7 +144,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase { // -------------------------------------------------------------------------------------------- - private static void verifyCliFrontend( + public static void verifyCliFrontend( AbstractCustomCommandLine<?> cli, String[] parameters, int expectedParallelism, http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-yarn-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml index cd76cb9..f1d8a4a 100644 --- a/flink-yarn-tests/pom.xml +++ b/flink-yarn-tests/pom.xml @@ -67,6 +67,14 @@ under the License. <type>test-jar</type> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <!-- Needed for the streaming wordcount example --> <dependency> <groupId>org.apache.flink</groupId> http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java new file mode 100644 index 0000000..d6a029f --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.client.cli.CliFrontendTestBase; +import org.apache.flink.client.cli.CliFrontendTestUtils; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.FlinkException; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.flink.yarn.util.FakeClusterClient; +import org.apache.flink.yarn.util.NonDeployingYarnClusterDescriptor; + +import org.apache.commons.cli.CommandLine; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend; +import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath; + +/** + * Tests for the RUN command using a {@link org.apache.flink.yarn.cli.FlinkYarnSessionCli} inside + * the {@link org.apache.flink.client.cli.CliFrontend}. + * + * @see org.apache.flink.client.cli.CliFrontendRunTest + */ +@RunWith(Parameterized.class) +public class CliFrontendRunWithYarnTest extends CliFrontendTestBase { + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + + @BeforeClass + public static void init() { + CliFrontendTestUtils.pipeSystemOutToNull(); + } + + @AfterClass + public static void shutdown() { + CliFrontendTestUtils.restoreSystemOut(); + } + + @Test + public void testRun() throws Exception { + String testJarPath = getTestJarPath("BatchWordCount.jar").getAbsolutePath(); + + Configuration configuration = new Configuration(); + configuration.setString(CoreOptions.MODE, mode); + configuration.setString(JobManagerOptions.ADDRESS, "localhost"); + configuration.setInteger(JobManagerOptions.PORT, 8081); + + FlinkYarnSessionCli yarnCLI = new TestingFlinkYarnSessionCli( + configuration, + tmp.getRoot().getAbsolutePath(), + "y", + "yarn"); + + // test detached mode + { + String[] parameters = {"-m", "yarn-cluster", "-yn", "1", "-p", "2", "-d", testJarPath}; + verifyCliFrontend(yarnCLI, parameters, 2, true, true); + } + + // test detached mode + { + String[] parameters = {"-m", "yarn-cluster", "-yn", "1", "-p", "2", "-yd", testJarPath}; + verifyCliFrontend(yarnCLI, parameters, 2, true, true); + } + } + + private static class TestingFlinkYarnSessionCli extends FlinkYarnSessionCli { + @SuppressWarnings("unchecked") + private final ClusterClient<ApplicationId> clusterClient; + private final String configurationDirectory; + + private TestingFlinkYarnSessionCli( + Configuration configuration, + String configurationDirectory, + String shortPrefix, + String longPrefix) throws Exception { + super(configuration, configurationDirectory, shortPrefix, longPrefix); + + this.clusterClient = new FakeClusterClient(configuration); + this.configurationDirectory = configurationDirectory; + } + + @Override + public AbstractYarnClusterDescriptor createClusterDescriptor(CommandLine commandLine) + throws FlinkException { + AbstractYarnClusterDescriptor parent = super.createClusterDescriptor(commandLine); + return new NonDeployingDetachedYarnClusterDescriptor( + parent.getFlinkConfiguration(), + (YarnConfiguration) parent.getYarnClient().getConfig(), + configurationDirectory, + parent.getYarnClient(), + clusterClient); + } + } + + private static class NonDeployingDetachedYarnClusterDescriptor extends NonDeployingYarnClusterDescriptor { + + NonDeployingDetachedYarnClusterDescriptor( + Configuration flinkConfiguration, + YarnConfiguration yarnConfiguration, String configurationDirectory, + YarnClient yarnClient, + ClusterClient<ApplicationId> clusterClient) { + super(flinkConfiguration, yarnConfiguration, configurationDirectory, yarnClient, + clusterClient); + } + + @Override + public ClusterClient<ApplicationId> deployJobCluster( + ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) { + assertTrue(detached); + return super.deployJobCluster(clusterSpecification, jobGraph, true); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java new file mode 100644 index 0000000..f6f6a71 --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.util; + +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import java.net.URL; +import java.util.List; + +/** + * Dummy {@link ClusterClient} for testing purposes (extend as needed). + */ +public class FakeClusterClient extends ClusterClient<ApplicationId> { + + public FakeClusterClient(Configuration flinkConfig) throws Exception { + super(flinkConfig); + } + + @Override + public void waitForClusterToBeReady() { + } + + @Override + public String getWebInterfaceURL() { + return ""; + } + + @Override + public GetClusterStatusResponse getClusterStatus() { + throw new UnsupportedOperationException("Not needed in test."); + } + + @Override + public List<String> getNewMessages() { + throw new UnsupportedOperationException("Not needed in test."); + } + + @Override + public ApplicationId getClusterId() { + throw new UnsupportedOperationException("Not needed in test."); + } + + @Override + public int getMaxSlots() { + return 10; + } + + @Override + public boolean hasUserJarsInClassPath(List<URL> userJarFiles) { + return false; + } + + @Override + public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) { + throw new UnsupportedOperationException("Not needed in test."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java new file mode 100644 index 0000000..4916b73 --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.util; + +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.Preconditions; +import org.apache.flink.yarn.AbstractYarnClusterDescriptor; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * Dummy {@link AbstractYarnClusterDescriptor} without an actual deployment for tests. + */ +public class NonDeployingYarnClusterDescriptor extends AbstractYarnClusterDescriptor { + + private final ClusterClient<ApplicationId> clusterClient; + + public NonDeployingYarnClusterDescriptor( + Configuration flinkConfiguration, + YarnConfiguration yarnConfiguration, + String configurationDirectory, + YarnClient yarnClient, + ClusterClient<ApplicationId> clusterClient) { + super(flinkConfiguration, yarnConfiguration, configurationDirectory, yarnClient, true); + + //noinspection unchecked + this.clusterClient = Preconditions.checkNotNull(clusterClient); + } + + @Override + public String getClusterDescription() { + // return parent.getClusterDescription(); + return "NonDeployingYarnClusterDescriptor"; + } + + @Override + protected ClusterClient<ApplicationId> createYarnClusterClient( + AbstractYarnClusterDescriptor descriptor, + int numberTaskManagers, + int slotsPerTaskManager, + ApplicationReport report, + Configuration flinkConfiguration, + boolean perJobCluster) { + return clusterClient; + } + + @Override + public ClusterClient<ApplicationId> retrieve(ApplicationId clusterId) { + return clusterClient; + } + + @Override + public ClusterClient<ApplicationId> deploySessionCluster(ClusterSpecification clusterSpecification) { + return clusterClient; + } + + @Override + public ClusterClient<ApplicationId> deployJobCluster( + ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) { + return clusterClient; + } + + @Override + public void terminateCluster(ApplicationId clusterId) { + } + + @Override + protected String getYarnSessionClusterEntrypoint() { + throw new UnsupportedOperationException("Not needed in test."); + } + + @Override + protected String getYarnJobClusterEntrypoint() { + throw new UnsupportedOperationException("Not needed in test."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 16abffa..fe04662 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -21,7 +21,6 @@ package org.apache.flink.yarn.cli; import org.apache.flink.client.cli.AbstractCustomCommandLine; import org.apache.flink.client.cli.CliArgsException; import org.apache.flink.client.cli.CliFrontend; -import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.ConfigConstants; @@ -84,6 +83,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION; import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID; /** @@ -127,7 +128,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId private final Option tmMemory; private final Option container; private final Option slots; - private final Option detached; private final Option zookeeperNamespace; private final Option help; @@ -162,9 +162,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId private final YarnConfiguration yarnConfiguration; - //------------------------------------ Internal fields ------------------------- - private boolean detachedMode = false; - public FlinkYarnSessionCli( Configuration configuration, String configurationDirectory, @@ -202,7 +199,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId .valueSeparator() .desc("use value for given property") .build(); - detached = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached"); streaming = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode"); name = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN"); zookeeperNamespace = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode"); @@ -218,7 +214,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId allOptions.addOption(shipPath); allOptions.addOption(slots); allOptions.addOption(dynamicproperties); - allOptions.addOption(detached); + allOptions.addOption(DETACHED_OPTION); + allOptions.addOption(YARN_DETACHED_OPTION); allOptions.addOption(streaming); allOptions.addOption(name); allOptions.addOption(applicationId); @@ -348,8 +345,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded); - if (cmd.hasOption(detached.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) { - this.detachedMode = true; + if (cmd.hasOption(YARN_DETACHED_OPTION.getOpt()) || cmd.hasOption(DETACHED_OPTION.getOpt())) { yarnClusterDescriptor.setDetachedMode(true); } @@ -519,7 +515,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId for (Option option : commandLine.getOptions()) { if (allOptions.hasOption(option.getOpt())) { - if (!option.getOpt().equals(detached.getOpt())) { + if (!isDetachedOption(option)) { // don't resume from properties file if yarn options have been specified canApplyYarnProperties = false; break; @@ -530,6 +526,10 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId return canApplyYarnProperties; } + private boolean isDetachedOption(Option option) { + return option.getOpt().equals(YARN_DETACHED_OPTION.getOpt()) || option.getOpt().equals(DETACHED_OPTION.getOpt()); + } + private Configuration applyYarnProperties(Configuration configuration) throws FlinkException { final Configuration effectiveConfiguration = new Configuration(configuration); @@ -621,7 +621,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId } } - if (detachedMode) { + if (yarnClusterDescriptor.isDetachedMode()) { LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + "yarn application -kill " + yarnApplicationId); http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index 62110ed..12c0354 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -119,6 +119,25 @@ public class FlinkYarnSessionCliTest extends TestLogger { } @Test + public void testCorrectSettingOfDetachedMode() throws Exception { + String[] params = + new String[] {"-yd"}; + + FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli( + new Configuration(), + tmp.getRoot().getAbsolutePath(), + "y", + "yarn"); + + final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true); + + AbstractYarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine); + + // each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased. + assertTrue(descriptor.isDetachedMode()); + } + + @Test public void testZookeeperNamespaceProperty() throws Exception { String zkNamespaceCliInput = "flink_test_namespace";