Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5225#discussion_r160995333
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java ---
@@ -51,45 +45,23 @@ public String getId() {
}
@Override
- public void addRunOptions(Options baseOptions) {
- }
+ public ClusterDescriptor<StandaloneClusterClient>
createClusterDescriptor(
+ Configuration configuration,
+ String configurationDirectory,
+ CommandLine commandLine) {
+ final Configuration effectiveConfiguration =
applyCommandLineOptionsToConfiguration(configuration, commandLine);
- @Override
- public void addGeneralOptions(Options baseOptions) {
+ return new StandaloneClusterDescriptor(effectiveConfiguration);
}
@Override
- public StandaloneClusterClient retrieveCluster(
- CommandLine commandLine,
- Configuration config,
- String configurationDirectory) {
-
- if
(commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
- String addressWithPort =
commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
- InetSocketAddress jobManagerAddress =
ClientUtils.parseHostPortAddress(addressWithPort);
- setJobManagerAddressInConfig(config, jobManagerAddress);
- }
-
- if
(commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
- String zkNamespace =
commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
- config.setString(HighAvailabilityOptions.HA_CLUSTER_ID,
zkNamespace);
- }
-
- StandaloneClusterDescriptor descriptor = new
StandaloneClusterDescriptor(config);
- return descriptor.retrieve(null);
+ @Nullable
+ public String getClusterId(Configuration configuration, CommandLine
commandLine) {
+ return "standalone";
}
@Override
- public StandaloneClusterClient createCluster(
- String applicationName,
- CommandLine commandLine,
- Configuration config,
- String configurationDirectory,
- List<URL> userJarFiles) throws
UnsupportedOperationException {
-
- StandaloneClusterDescriptor descriptor = new
StandaloneClusterDescriptor(config);
- ClusterSpecification clusterSpecification =
ClusterSpecification.fromConfiguration(config);
-
- return descriptor.deploySessionCluster(clusterSpecification);
+ public ClusterSpecification getClusterSpecification(Configuration
configuration, CommandLine commandLine) {
+ return new
ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
--- End diff --
ok
---