This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b32630df20a17a16b768f0a0c42d6cc45d358a1f
Author: Kostas Kloudas <[email protected]>
AuthorDate: Fri Nov 20 17:16:36 2020 +0100

    [FLINK-19969] CLI print run-application help msg
---
 .../org/apache/flink/client/cli/CliFrontend.java   | 12 +++++++---
 .../apache/flink/client/cli/CliFrontendParser.java | 15 +++++++++++-
 .../client/deployment/ClusterClientFactory.java    | 11 +++++++++
 .../deployment/ClusterClientServiceLoader.java     |  8 +++++++
 .../DefaultClusterClientServiceLoader.java         | 28 ++++++++++++++++++++++
 .../cli/util/DummyClusterClientServiceLoader.java  |  7 ++++++
 .../kubernetes/KubernetesClusterClientFactory.java |  7 ++++++
 .../flink/yarn/YarnClusterClientFactory.java       |  7 ++++++
 8 files changed, 91 insertions(+), 4 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 822ed49..27a2a51 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -176,7 +176,7 @@ public class CliFrontend {
                final CommandLine commandLine = getCommandLine(commandOptions, 
args, true);
 
                if (commandLine.hasOption(HELP_OPTION.getOpt())) {
-                       CliFrontendParser.printHelpForRun(customCommandLines);
+                       
CliFrontendParser.printHelpForRunApplication(getApplicationModeTargetNames());
                        return;
                }
 
@@ -207,6 +207,12 @@ public class CliFrontend {
                deployer.run(effectiveConfiguration, applicationConfiguration);
        }
 
+       private static String getApplicationModeTargetNames() {
+               return new 
DefaultClusterClientServiceLoader().getApplicationModeTargetNames()
+                               .map(name -> String.format("\"%s\"", name))
+                               .collect(Collectors.joining(", "));
+       }
+
        /**
         * Executions the run action.
         *
@@ -953,7 +959,7 @@ public class CliFrontend {
 
                // check for action
                if (args.length < 1) {
-                       CliFrontendParser.printHelp(customCommandLines);
+                       CliFrontendParser.printHelp(customCommandLines, 
getApplicationModeTargetNames());
                        System.out.println("Please specify an action.");
                        return 1;
                }
@@ -990,7 +996,7 @@ public class CliFrontend {
                                        return 0;
                                case "-h":
                                case "--help":
-                                       
CliFrontendParser.printHelp(customCommandLines);
+                                       
CliFrontendParser.printHelp(customCommandLines, 
getApplicationModeTargetNames());
                                        return 0;
                                case "-v":
                                case "--version":
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 0d82334..a5c37ab 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -341,12 +341,13 @@ public class CliFrontendParser {
        /**
         * Prints the help for the client.
         */
-       public static void printHelp(Collection<CustomCommandLine> 
customCommandLines) {
+       public static void printHelp(Collection<CustomCommandLine> 
customCommandLines, String availableApplicationModeTargets) {
                System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");
                System.out.println();
                System.out.println("The following actions are available:");
 
                printHelpForRun(customCommandLines);
+               printHelpForRunApplication(availableApplicationModeTargets);
                printHelpForInfo();
                printHelpForList(customCommandLines);
                printHelpForStop(customCommandLines);
@@ -371,6 +372,18 @@ public class CliFrontendParser {
                System.out.println();
        }
 
+       public static void printHelpForRunApplication(String 
availableApplicationModeTargets) {
+               HelpFormatter formatter = new HelpFormatter();
+               formatter.setLeftPadding(5);
+               formatter.setWidth(80);
+
+               System.out.println("\nAction \"run-application\" runs an 
application in Application Mode.");
+               System.out.println("\n  Syntax: run-application -t [" + 
availableApplicationModeTargets + "] [OPTIONS] <jar-file> <arguments>");
+               formatter.setSyntaxPrefix("  \"run-application\" action 
options:");
+               formatter.printHelp(" ", new 
Options().addOption(DynamicPropertiesUtil.DYNAMIC_PROPERTIES));
+               System.out.println();
+       }
+
        public static void printHelpForInfo() {
                HelpFormatter formatter = new HelpFormatter();
                formatter.setLeftPadding(5);
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java
index 3c0c5cc..a70ee32 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.Configuration;
 
 import javax.annotation.Nullable;
 
+import java.util.Optional;
+
 /**
  * A factory containing all the necessary information for creating clients to 
Flink clusters.
  */
@@ -62,4 +64,13 @@ public interface ClusterClientFactory<ClusterID> {
         * @return the corresponding {@link ClusterSpecification} for a new 
Flink cluster
         */
        ClusterSpecification getClusterSpecification(Configuration 
configuration);
+
+       /**
+        * Returns the option to be used when trying to execute an application 
in Application Mode
+        * using this cluster client factory, or an {@link Optional#empty()} if 
the environment of
+        * this cluster client factory does not support Application Mode.
+        */
+       default Optional<String> getApplicationTargetName() {
+               return Optional.empty();
+       }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java
index 51eef13..ac9402a 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java
@@ -20,6 +20,8 @@ package org.apache.flink.client.deployment;
 
 import org.apache.flink.configuration.Configuration;
 
+import java.util.stream.Stream;
+
 /**
  * An interface used to discover the appropriate {@link ClusterClientFactory 
cluster client factory} based on the
  * provided {@link Configuration}.
@@ -33,4 +35,10 @@ public interface ClusterClientServiceLoader {
         * @return the appropriate {@link ClusterClientFactory}.
         */
        <ClusterID> ClusterClientFactory<ClusterID> 
getClusterClientFactory(final Configuration configuration);
+
+       /**
+        * Loads and returns a stream of the names of all available
+        * execution target names for {@code Application Mode}.
+        */
+       Stream<String> getApplicationModeTargetNames();
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
index f79a51e..96e5109 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
@@ -27,8 +27,11 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
 import java.util.ServiceLoader;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -83,4 +86,29 @@ public class DefaultClusterClientServiceLoader implements 
ClusterClientServiceLo
 
                return (ClusterClientFactory<ClusterID>) 
compatibleFactories.get(0);
        }
+
+       @Override
+       public Stream<String> getApplicationModeTargetNames() {
+               final ServiceLoader<ClusterClientFactory> loader =
+                               ServiceLoader.load(ClusterClientFactory.class);
+
+               final List<String> result = new ArrayList<>();
+
+               final Iterator<ClusterClientFactory> it = loader.iterator();
+               while (it.hasNext()) {
+                       try {
+                               final ClusterClientFactory clientFactory = 
it.next();
+
+                               final Optional<String> applicationName = 
clientFactory.getApplicationTargetName();
+                               if (applicationName.isPresent()) {
+                                       result.add(applicationName.get());
+                               }
+
+                       } catch (ServiceConfigurationError e) {
+                               // cannot be loaded, most likely because Hadoop 
is not
+                               // in the classpath, we can ignore it for now.
+                       }
+               }
+               return result.stream();
+       }
 }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientServiceLoader.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientServiceLoader.java
index f58383c..9622cf7 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientServiceLoader.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientServiceLoader.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 
+import java.util.stream.Stream;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -41,4 +43,9 @@ public class DummyClusterClientServiceLoader<ClusterID> 
implements ClusterClient
                checkNotNull(configuration);
                return new DummyClusterClientFactory<>(clusterClient);
        }
+
+       @Override
+       public Stream<String> getApplicationModeTargetNames() {
+               return Stream.empty();
+       }
 }
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java
index 401711a..53a6c9e 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java
@@ -31,6 +31,8 @@ import org.apache.flink.util.AbstractID;
 
 import javax.annotation.Nullable;
 
+import java.util.Optional;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -66,6 +68,11 @@ public class KubernetesClusterClientFactory extends 
AbstractContainerizedCluster
                return 
configuration.getString(KubernetesConfigOptions.CLUSTER_ID);
        }
 
+       @Override
+       public Optional<String> getApplicationTargetName() {
+               return 
Optional.of(KubernetesDeploymentTarget.APPLICATION.getName());
+       }
+
        private String generateClusterId() {
                final String randomID = new AbstractID().toString();
                return (CLUSTER_ID_PREFIX + randomID).substring(0, 
Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID);
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index 776172f..dbf7037 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import javax.annotation.Nullable;
 
+import java.util.Optional;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -69,6 +71,11 @@ public class YarnClusterClientFactory extends 
AbstractContainerizedClusterClient
                return clusterId != null ? 
ConverterUtils.toApplicationId(clusterId) : null;
        }
 
+       @Override
+       public Optional<String> getApplicationTargetName() {
+               return Optional.of(YarnDeploymentTarget.APPLICATION.getName());
+       }
+
        private YarnClusterDescriptor getClusterDescriptor(Configuration 
configuration) {
                final YarnClient yarnClient = YarnClient.createYarnClient();
                final YarnConfiguration yarnConfiguration = new 
YarnConfiguration();

Reply via email to