This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b32630df20a17a16b768f0a0c42d6cc45d358a1f Author: Kostas Kloudas <[email protected]> AuthorDate: Fri Nov 20 17:16:36 2020 +0100 [FLINK-19969] CLI print run-application help msg --- .../org/apache/flink/client/cli/CliFrontend.java | 12 +++++++--- .../apache/flink/client/cli/CliFrontendParser.java | 15 +++++++++++- .../client/deployment/ClusterClientFactory.java | 11 +++++++++ .../deployment/ClusterClientServiceLoader.java | 8 +++++++ .../DefaultClusterClientServiceLoader.java | 28 ++++++++++++++++++++++ .../cli/util/DummyClusterClientServiceLoader.java | 7 ++++++ .../kubernetes/KubernetesClusterClientFactory.java | 7 ++++++ .../flink/yarn/YarnClusterClientFactory.java | 7 ++++++ 8 files changed, 91 insertions(+), 4 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 822ed49..27a2a51 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -176,7 +176,7 @@ public class CliFrontend { final CommandLine commandLine = getCommandLine(commandOptions, args, true); if (commandLine.hasOption(HELP_OPTION.getOpt())) { - CliFrontendParser.printHelpForRun(customCommandLines); + CliFrontendParser.printHelpForRunApplication(getApplicationModeTargetNames()); return; } @@ -207,6 +207,12 @@ public class CliFrontend { deployer.run(effectiveConfiguration, applicationConfiguration); } + private static String getApplicationModeTargetNames() { + return new DefaultClusterClientServiceLoader().getApplicationModeTargetNames() + .map(name -> String.format("\"%s\"", name)) + .collect(Collectors.joining(", ")); + } + /** * Executions the run action. * @@ -953,7 +959,7 @@ public class CliFrontend { // check for action if (args.length < 1) { - CliFrontendParser.printHelp(customCommandLines); + CliFrontendParser.printHelp(customCommandLines, getApplicationModeTargetNames()); System.out.println("Please specify an action."); return 1; } @@ -990,7 +996,7 @@ public class CliFrontend { return 0; case "-h": case "--help": - CliFrontendParser.printHelp(customCommandLines); + CliFrontendParser.printHelp(customCommandLines, getApplicationModeTargetNames()); return 0; case "-v": case "--version": diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index 0d82334..a5c37ab 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -341,12 +341,13 @@ public class CliFrontendParser { /** * Prints the help for the client. */ - public static void printHelp(Collection<CustomCommandLine> customCommandLines) { + public static void printHelp(Collection<CustomCommandLine> customCommandLines, String availableApplicationModeTargets) { System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]"); System.out.println(); System.out.println("The following actions are available:"); printHelpForRun(customCommandLines); + printHelpForRunApplication(availableApplicationModeTargets); printHelpForInfo(); printHelpForList(customCommandLines); printHelpForStop(customCommandLines); @@ -371,6 +372,18 @@ public class CliFrontendParser { System.out.println(); } + public static void printHelpForRunApplication(String availableApplicationModeTargets) { + HelpFormatter formatter = new HelpFormatter(); + formatter.setLeftPadding(5); + formatter.setWidth(80); + + System.out.println("\nAction \"run-application\" runs an application in Application Mode."); + System.out.println("\n Syntax: run-application -t [" + availableApplicationModeTargets + "] [OPTIONS] <jar-file> <arguments>"); + formatter.setSyntaxPrefix(" \"run-application\" action options:"); + formatter.printHelp(" ", new Options().addOption(DynamicPropertiesUtil.DYNAMIC_PROPERTIES)); + System.out.println(); + } + public static void printHelpForInfo() { HelpFormatter formatter = new HelpFormatter(); formatter.setLeftPadding(5); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java index 3c0c5cc..a70ee32 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java @@ -23,6 +23,8 @@ import org.apache.flink.configuration.Configuration; import javax.annotation.Nullable; +import java.util.Optional; + /** * A factory containing all the necessary information for creating clients to Flink clusters. */ @@ -62,4 +64,13 @@ public interface ClusterClientFactory<ClusterID> { * @return the corresponding {@link ClusterSpecification} for a new Flink cluster */ ClusterSpecification getClusterSpecification(Configuration configuration); + + /** + * Returns the option to be used when trying to execute an application in Application Mode + * using this cluster client factory, or an {@link Optional#empty()} if the environment of + * this cluster client factory does not support Application Mode. + */ + default Optional<String> getApplicationTargetName() { + return Optional.empty(); + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java index 51eef13..ac9402a 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java @@ -20,6 +20,8 @@ package org.apache.flink.client.deployment; import org.apache.flink.configuration.Configuration; +import java.util.stream.Stream; + /** * An interface used to discover the appropriate {@link ClusterClientFactory cluster client factory} based on the * provided {@link Configuration}. @@ -33,4 +35,10 @@ public interface ClusterClientServiceLoader { * @return the appropriate {@link ClusterClientFactory}. */ <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(final Configuration configuration); + + /** + * Loads and returns a stream of the names of all available + * execution target names for {@code Application Mode}. + */ + Stream<String> getApplicationModeTargetNames(); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java index f79a51e..96e5109 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java @@ -27,8 +27,11 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Optional; +import java.util.ServiceConfigurationError; import java.util.ServiceLoader; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -83,4 +86,29 @@ public class DefaultClusterClientServiceLoader implements ClusterClientServiceLo return (ClusterClientFactory<ClusterID>) compatibleFactories.get(0); } + + @Override + public Stream<String> getApplicationModeTargetNames() { + final ServiceLoader<ClusterClientFactory> loader = + ServiceLoader.load(ClusterClientFactory.class); + + final List<String> result = new ArrayList<>(); + + final Iterator<ClusterClientFactory> it = loader.iterator(); + while (it.hasNext()) { + try { + final ClusterClientFactory clientFactory = it.next(); + + final Optional<String> applicationName = clientFactory.getApplicationTargetName(); + if (applicationName.isPresent()) { + result.add(applicationName.get()); + } + + } catch (ServiceConfigurationError e) { + // cannot be loaded, most likely because Hadoop is not + // in the classpath, we can ignore it for now. + } + } + return result.stream(); + } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientServiceLoader.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientServiceLoader.java index f58383c..9622cf7 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientServiceLoader.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientServiceLoader.java @@ -23,6 +23,8 @@ import org.apache.flink.client.deployment.ClusterClientServiceLoader; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; +import java.util.stream.Stream; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -41,4 +43,9 @@ public class DummyClusterClientServiceLoader<ClusterID> implements ClusterClient checkNotNull(configuration); return new DummyClusterClientFactory<>(clusterClient); } + + @Override + public Stream<String> getApplicationModeTargetNames() { + return Stream.empty(); + } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java index 401711a..53a6c9e 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java @@ -31,6 +31,8 @@ import org.apache.flink.util.AbstractID; import javax.annotation.Nullable; +import java.util.Optional; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -66,6 +68,11 @@ public class KubernetesClusterClientFactory extends AbstractContainerizedCluster return configuration.getString(KubernetesConfigOptions.CLUSTER_ID); } + @Override + public Optional<String> getApplicationTargetName() { + return Optional.of(KubernetesDeploymentTarget.APPLICATION.getName()); + } + private String generateClusterId() { final String randomID = new AbstractID().toString(); return (CLUSTER_ID_PREFIX + randomID).substring(0, Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java index 776172f..dbf7037 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java @@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import javax.annotation.Nullable; +import java.util.Optional; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -69,6 +71,11 @@ public class YarnClusterClientFactory extends AbstractContainerizedClusterClient return clusterId != null ? ConverterUtils.toApplicationId(clusterId) : null; } + @Override + public Optional<String> getApplicationTargetName() { + return Optional.of(YarnDeploymentTarget.APPLICATION.getName()); + } + private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) { final YarnClient yarnClient = YarnClient.createYarnClient(); final YarnConfiguration yarnConfiguration = new YarnConfiguration();
