[FLINK-8872][flip6] fix yarn detached mode command parsing

The detached flag if given by "-yd" was not passed correctly into the
CliFrontend and resulted in the CLI waiting for submitted jobs to finish instead
of detaching from the execution.

[FLINK-8872][yarn] add tests for YARN detached mode command line parsing with 
CliFrontend

- create a test-jar of flink-clients
- create CliFrontendRunWithYarnTest based on CliFrontendRunTest that verifies
  CliFrontend's parsing in conjunction with FlinkYarnSessionCli
-> verify detached mode in this test (can be extended further in the future)

This closes #5672.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81e1e4c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81e1e4c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81e1e4c3

Branch: refs/heads/release-1.5
Commit: 81e1e4c312b467dc64c664e30a7132a9f7d55140
Parents: 4317185
Author: Nico Kruber <n...@data-artisans.com>
Authored: Mon Mar 5 18:24:17 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Apr 13 12:17:57 2018 -0700

----------------------------------------------------------------------
 flink-clients/pom.xml                           |  14 +-
 .../flink/client/cli/CliFrontendParser.java     |   9 ++
 .../org/apache/flink/client/cli/DefaultCLI.java |   1 +
 .../apache/flink/client/cli/ProgramOptions.java |   4 +-
 .../flink/client/program/ClusterClient.java     |   2 +-
 .../client/program/rest/RestClusterClient.java  |   2 +-
 .../flink/client/cli/CliFrontendRunTest.java    |   2 +-
 flink-yarn-tests/pom.xml                        |   8 +
 .../flink/yarn/CliFrontendRunWithYarnTest.java  | 148 +++++++++++++++++++
 .../flink/yarn/util/FakeClusterClient.java      |  79 ++++++++++
 .../util/NonDeployingYarnClusterDescriptor.java |  98 ++++++++++++
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  22 +--
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  19 +++
 13 files changed, 392 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 399d080..c7f9e55 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -96,6 +96,18 @@ under the License.
        <build>
                <plugins>
                        <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <plugin>
                                <artifactId>maven-assembly-plugin</artifactId>
                                <version>2.4</version><!--$NO-MVN-MAN-VER$-->
                                <executions>
@@ -120,7 +132,7 @@ under the License.
                                        </execution>
                                </executions>
                        </plugin>
-                       <!--Remove the external jar test code from the 
test-classes directory since it musn't be in the
+                       <!--Remove the external jar test code from the 
test-classes directory since it mustn't be in the
                        classpath when running the tests to actually test 
whether the user code class loader
                        is properly used.-->
                        <plugin>

http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 5a6c0ff..1588aac 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -62,6 +62,13 @@ public class CliFrontendParser {
        public static final Option DETACHED_OPTION = new Option("d", 
"detached", false, "If present, runs " +
                        "the job in detached mode");
 
+       /**
+        * @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for 
both YARN and non-YARN deployments
+        */
+       @Deprecated
+       public static final Option YARN_DETACHED_OPTION = new Option("yd", 
"yarndetached", false, "If present, runs " +
+               "the job in detached mode (deprecated; use non-YARN specific 
option instead)");
+
        static final Option ARGS_OPTION = new Option("a", "arguments", true,
                        "Program arguments. Arguments can also be added without 
-a, simply as trailing parameters.");
 
@@ -117,6 +124,7 @@ public class CliFrontendParser {
 
                LOGGING_OPTION.setRequired(false);
                DETACHED_OPTION.setRequired(false);
+               YARN_DETACHED_OPTION.setRequired(false);
 
                ARGS_OPTION.setRequired(false);
                ARGS_OPTION.setArgName("programArgs");
@@ -158,6 +166,7 @@ public class CliFrontendParser {
                options.addOption(ARGS_OPTION);
                options.addOption(LOGGING_OPTION);
                options.addOption(DETACHED_OPTION);
+               options.addOption(YARN_DETACHED_OPTION);
                return options;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 43efc63..e9ed9af 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -40,6 +40,7 @@ public class DefaultCLI extends 
AbstractCustomCommandLine<StandaloneClusterId> {
 
        @Override
        public boolean isActive(CommandLine commandLine) {
+               // always active because we can try to read a JobManager 
address from the config
                return true;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index df25e67..1acda1b 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -38,6 +38,7 @@ import static 
org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
 import static 
org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
 import static 
org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION;
+import static 
org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
 
 /**
  * Base class for command line options that refer to a JAR file program.
@@ -112,7 +113,8 @@ public abstract class ProgramOptions extends 
CommandLineOptions {
                }
 
                stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
-               detachedMode = line.hasOption(DETACHED_OPTION.getOpt());
+               detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || 
line.hasOption(
+                       YARN_DETACHED_OPTION.getOpt());
 
                if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
                        String savepointPath = 
line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());

http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index fbaa515..2ef0b2e 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -385,7 +385,7 @@ public abstract class ClusterClient<T> {
                        return run(jobWithJars, parallelism, 
prog.getSavepointSettings());
                }
                else if (prog.isUsingInteractiveMode()) {
-                       log.info("Starting program in interactive mode");
+                       log.info("Starting program in interactive mode 
(detached: {})", isDetached());
 
                        final List<URL> libraries;
                        if (hasUserJarsInClassPath(prog.getAllLibraries())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 4a4f993..a6f676e 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -235,7 +235,7 @@ public class RestClusterClient<T> extends ClusterClient<T> 
implements NewCluster
 
        @Override
        public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
-               log.info("Submitting job {}.", jobGraph.getJobID());
+               log.info("Submitting job {} (detached: {}).", 
jobGraph.getJobID(), isDetached());
 
                final CompletableFuture<JobSubmissionResult> 
jobSubmissionFuture = submitJob(jobGraph);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index efa6a39..ba51b69 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -144,7 +144,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase 
{
 
        // 
--------------------------------------------------------------------------------------------
 
-       private static void verifyCliFrontend(
+       public static void verifyCliFrontend(
                        AbstractCustomCommandLine<?> cli,
                        String[] parameters,
                        int expectedParallelism,

http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index cd76cb9..f1d8a4a 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -67,6 +67,14 @@ under the License.
                        <type>test-jar</type>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-clients_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
                <!-- Needed for the streaming wordcount example -->
                <dependency>
                        <groupId>org.apache.flink</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
new file mode 100644
index 0000000..d6a029f
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.cli.CliFrontendTestBase;
+import org.apache.flink.client.cli.CliFrontendTestUtils;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.util.FakeClusterClient;
+import org.apache.flink.yarn.util.NonDeployingYarnClusterDescriptor;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend;
+import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
+
+/**
+ * Tests for the RUN command using a {@link 
org.apache.flink.yarn.cli.FlinkYarnSessionCli} inside
+ * the {@link org.apache.flink.client.cli.CliFrontend}.
+ *
+ * @see org.apache.flink.client.cli.CliFrontendRunTest
+ */
+@RunWith(Parameterized.class)
+public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
+
+       @Rule
+       public TemporaryFolder tmp = new TemporaryFolder();
+
+       @BeforeClass
+       public static void init() {
+               CliFrontendTestUtils.pipeSystemOutToNull();
+       }
+
+       @AfterClass
+       public static void shutdown() {
+               CliFrontendTestUtils.restoreSystemOut();
+       }
+
+       @Test
+       public void testRun() throws Exception {
+               String testJarPath = 
getTestJarPath("BatchWordCount.jar").getAbsolutePath();
+
+               Configuration configuration = new Configuration();
+               configuration.setString(CoreOptions.MODE, mode);
+               configuration.setString(JobManagerOptions.ADDRESS, "localhost");
+               configuration.setInteger(JobManagerOptions.PORT, 8081);
+
+               FlinkYarnSessionCli yarnCLI = new TestingFlinkYarnSessionCli(
+                       configuration,
+                       tmp.getRoot().getAbsolutePath(),
+                       "y",
+                       "yarn");
+
+               // test detached mode
+               {
+                       String[] parameters = {"-m", "yarn-cluster", "-yn", 
"1", "-p", "2", "-d", testJarPath};
+                       verifyCliFrontend(yarnCLI, parameters, 2, true, true);
+               }
+
+               // test detached mode
+               {
+                       String[] parameters = {"-m", "yarn-cluster", "-yn", 
"1", "-p", "2", "-yd", testJarPath};
+                       verifyCliFrontend(yarnCLI, parameters, 2, true, true);
+               }
+       }
+
+       private static class TestingFlinkYarnSessionCli extends 
FlinkYarnSessionCli {
+               @SuppressWarnings("unchecked")
+               private final ClusterClient<ApplicationId> clusterClient;
+               private final String configurationDirectory;
+
+               private TestingFlinkYarnSessionCli(
+                               Configuration configuration,
+                               String configurationDirectory,
+                               String shortPrefix,
+                               String longPrefix) throws Exception {
+                       super(configuration, configurationDirectory, 
shortPrefix, longPrefix);
+
+                       this.clusterClient = new 
FakeClusterClient(configuration);
+                       this.configurationDirectory = configurationDirectory;
+               }
+
+               @Override
+               public AbstractYarnClusterDescriptor 
createClusterDescriptor(CommandLine commandLine)
+                       throws FlinkException {
+                       AbstractYarnClusterDescriptor parent = 
super.createClusterDescriptor(commandLine);
+                       return new NonDeployingDetachedYarnClusterDescriptor(
+                                       parent.getFlinkConfiguration(),
+                                       (YarnConfiguration) 
parent.getYarnClient().getConfig(),
+                                       configurationDirectory,
+                                       parent.getYarnClient(),
+                                       clusterClient);
+               }
+       }
+
+       private static class NonDeployingDetachedYarnClusterDescriptor extends 
NonDeployingYarnClusterDescriptor {
+
+               NonDeployingDetachedYarnClusterDescriptor(
+                       Configuration flinkConfiguration,
+                       YarnConfiguration yarnConfiguration, String 
configurationDirectory,
+                       YarnClient yarnClient,
+                       ClusterClient<ApplicationId> clusterClient) {
+                       super(flinkConfiguration, yarnConfiguration, 
configurationDirectory, yarnClient,
+                               clusterClient);
+               }
+
+               @Override
+               public ClusterClient<ApplicationId> deployJobCluster(
+                               ClusterSpecification clusterSpecification, 
JobGraph jobGraph, boolean detached) {
+                       assertTrue(detached);
+                       return super.deployJobCluster(clusterSpecification, 
jobGraph, true);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java
new file mode 100644
index 0000000..f6f6a71
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.util;
+
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import java.net.URL;
+import java.util.List;
+
+/**
+ * Dummy {@link ClusterClient} for testing purposes (extend as needed).
+ */
+public class FakeClusterClient extends ClusterClient<ApplicationId> {
+
+       public FakeClusterClient(Configuration flinkConfig) throws Exception {
+               super(flinkConfig);
+       }
+
+       @Override
+       public void waitForClusterToBeReady() {
+       }
+
+       @Override
+       public String getWebInterfaceURL() {
+               return "";
+       }
+
+       @Override
+       public GetClusterStatusResponse getClusterStatus() {
+               throw new UnsupportedOperationException("Not needed in test.");
+       }
+
+       @Override
+       public List<String> getNewMessages() {
+               throw new UnsupportedOperationException("Not needed in test.");
+       }
+
+       @Override
+       public ApplicationId getClusterId() {
+               throw new UnsupportedOperationException("Not needed in test.");
+       }
+
+       @Override
+       public int getMaxSlots() {
+               return 10;
+       }
+
+       @Override
+       public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
+               return false;
+       }
+
+       @Override
+       public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) {
+               throw new UnsupportedOperationException("Not needed in test.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
new file mode 100644
index 0000000..4916b73
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.util;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Dummy {@link AbstractYarnClusterDescriptor} without an actual deployment 
for tests.
+ */
+public class NonDeployingYarnClusterDescriptor extends 
AbstractYarnClusterDescriptor {
+
+       private final ClusterClient<ApplicationId> clusterClient;
+
+       public NonDeployingYarnClusterDescriptor(
+                       Configuration flinkConfiguration,
+                       YarnConfiguration yarnConfiguration,
+                       String configurationDirectory,
+                       YarnClient yarnClient,
+                       ClusterClient<ApplicationId> clusterClient) {
+               super(flinkConfiguration, yarnConfiguration, 
configurationDirectory, yarnClient, true);
+
+               //noinspection unchecked
+               this.clusterClient = Preconditions.checkNotNull(clusterClient);
+       }
+
+       @Override
+       public String getClusterDescription() {
+               // return parent.getClusterDescription();
+               return "NonDeployingYarnClusterDescriptor";
+       }
+
+       @Override
+       protected ClusterClient<ApplicationId> createYarnClusterClient(
+                       AbstractYarnClusterDescriptor descriptor,
+                       int numberTaskManagers,
+                       int slotsPerTaskManager,
+                       ApplicationReport report,
+                       Configuration flinkConfiguration,
+                       boolean perJobCluster) {
+               return clusterClient;
+       }
+
+       @Override
+       public ClusterClient<ApplicationId> retrieve(ApplicationId clusterId) {
+               return clusterClient;
+       }
+
+       @Override
+       public ClusterClient<ApplicationId> 
deploySessionCluster(ClusterSpecification clusterSpecification) {
+               return clusterClient;
+       }
+
+       @Override
+       public ClusterClient<ApplicationId> deployJobCluster(
+                       ClusterSpecification clusterSpecification, JobGraph 
jobGraph, boolean detached) {
+               return clusterClient;
+       }
+
+       @Override
+       public void terminateCluster(ApplicationId clusterId) {
+       }
+
+       @Override
+       protected String getYarnSessionClusterEntrypoint() {
+               throw new UnsupportedOperationException("Not needed in test.");
+       }
+
+       @Override
+       protected String getYarnJobClusterEntrypoint() {
+               throw new UnsupportedOperationException("Not needed in test.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 16abffa..fe04662 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn.cli;
 import org.apache.flink.client.cli.AbstractCustomCommandLine;
 import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CliFrontend;
-import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
@@ -84,6 +83,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
+import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
+import static 
org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
 import static 
org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID;
 
 /**
@@ -127,7 +128,6 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
        private final Option tmMemory;
        private final Option container;
        private final Option slots;
-       private final Option detached;
        private final Option zookeeperNamespace;
        private final Option help;
 
@@ -162,9 +162,6 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
 
        private final YarnConfiguration yarnConfiguration;
 
-       //------------------------------------ Internal fields 
-------------------------
-       private boolean detachedMode = false;
-
        public FlinkYarnSessionCli(
                        Configuration configuration,
                        String configurationDirectory,
@@ -202,7 +199,6 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
                        .valueSeparator()
                        .desc("use value for given property")
                        .build();
-               detached = new Option(shortPrefix + "d", longPrefix + 
"detached", false, "Start detached");
                streaming = new Option(shortPrefix + "st", longPrefix + 
"streaming", false, "Start Flink in streaming mode");
                name = new Option(shortPrefix + "nm", longPrefix + "name", 
true, "Set a custom name for the application on YARN");
                zookeeperNamespace = new Option(shortPrefix + "z", longPrefix + 
"zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for 
high availability mode");
@@ -218,7 +214,8 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
                allOptions.addOption(shipPath);
                allOptions.addOption(slots);
                allOptions.addOption(dynamicproperties);
-               allOptions.addOption(detached);
+               allOptions.addOption(DETACHED_OPTION);
+               allOptions.addOption(YARN_DETACHED_OPTION);
                allOptions.addOption(streaming);
                allOptions.addOption(name);
                allOptions.addOption(applicationId);
@@ -348,8 +345,7 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
 
                
yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
 
-               if (cmd.hasOption(detached.getOpt()) || 
cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
-                       this.detachedMode = true;
+               if (cmd.hasOption(YARN_DETACHED_OPTION.getOpt()) || 
cmd.hasOption(DETACHED_OPTION.getOpt())) {
                        yarnClusterDescriptor.setDetachedMode(true);
                }
 
@@ -519,7 +515,7 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
 
                for (Option option : commandLine.getOptions()) {
                        if (allOptions.hasOption(option.getOpt())) {
-                               if (!option.getOpt().equals(detached.getOpt())) 
{
+                               if (!isDetachedOption(option)) {
                                        // don't resume from properties file if 
yarn options have been specified
                                        canApplyYarnProperties = false;
                                        break;
@@ -530,6 +526,10 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
                return canApplyYarnProperties;
        }
 
+       private boolean isDetachedOption(Option option) {
+               return option.getOpt().equals(YARN_DETACHED_OPTION.getOpt()) || 
option.getOpt().equals(DETACHED_OPTION.getOpt());
+       }
+
        private Configuration applyYarnProperties(Configuration configuration) 
throws FlinkException {
                final Configuration effectiveConfiguration = new 
Configuration(configuration);
 
@@ -621,7 +621,7 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
                                        }
                                }
 
-                               if (detachedMode) {
+                               if (yarnClusterDescriptor.isDetachedMode()) {
                                        LOG.info("The Flink YARN client has 
been started in detached mode. In order to stop " +
                                                "Flink on YARN, use the 
following command or a YARN web interface to stop it:\n" +
                                                "yarn application -kill " + 
yarnApplicationId);

http://git-wip-us.apache.org/repos/asf/flink/blob/81e1e4c3/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 62110ed..12c0354 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -119,6 +119,25 @@ public class FlinkYarnSessionCliTest extends TestLogger {
        }
 
        @Test
+       public void testCorrectSettingOfDetachedMode() throws Exception {
+               String[] params =
+                       new String[] {"-yd"};
+
+               FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
+                       new Configuration(),
+                       tmp.getRoot().getAbsolutePath(),
+                       "y",
+                       "yarn");
+
+               final CommandLine commandLine = 
yarnCLI.parseCommandLineOptions(params, true);
+
+               AbstractYarnClusterDescriptor descriptor = 
yarnCLI.createClusterDescriptor(commandLine);
+
+               // each task manager has 3 slots but the parallelism is 7. Thus 
the slots should be increased.
+               assertTrue(descriptor.isDetachedMode());
+       }
+
+       @Test
        public void testZookeeperNamespaceProperty() throws Exception {
                String zkNamespaceCliInput = "flink_test_namespace";
 

Reply via email to