[
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16322274#comment-16322274
]
ASF GitHub Bot commented on FLINK-8339:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5225#discussion_r160970724
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java ---
@@ -51,45 +45,23 @@ public String getId() {
}
@Override
- public void addRunOptions(Options baseOptions) {
- }
+ public ClusterDescriptor<StandaloneClusterClient>
createClusterDescriptor(
+ Configuration configuration,
+ String configurationDirectory,
+ CommandLine commandLine) {
+ final Configuration effectiveConfiguration =
applyCommandLineOptionsToConfiguration(configuration, commandLine);
- @Override
- public void addGeneralOptions(Options baseOptions) {
+ return new StandaloneClusterDescriptor(effectiveConfiguration);
}
@Override
- public StandaloneClusterClient retrieveCluster(
- CommandLine commandLine,
- Configuration config,
- String configurationDirectory) {
-
- if
(commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
- String addressWithPort =
commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
- InetSocketAddress jobManagerAddress =
ClientUtils.parseHostPortAddress(addressWithPort);
- setJobManagerAddressInConfig(config, jobManagerAddress);
- }
-
- if
(commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
- String zkNamespace =
commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
- config.setString(HighAvailabilityOptions.HA_CLUSTER_ID,
zkNamespace);
- }
-
- StandaloneClusterDescriptor descriptor = new
StandaloneClusterDescriptor(config);
- return descriptor.retrieve(null);
+ @Nullable
+ public String getClusterId(Configuration configuration, CommandLine
commandLine) {
+ return "standalone";
}
@Override
- public StandaloneClusterClient createCluster(
- String applicationName,
- CommandLine commandLine,
- Configuration config,
- String configurationDirectory,
- List<URL> userJarFiles) throws
UnsupportedOperationException {
-
- StandaloneClusterDescriptor descriptor = new
StandaloneClusterDescriptor(config);
- ClusterSpecification clusterSpecification =
ClusterSpecification.fromConfiguration(config);
-
- return descriptor.deploySessionCluster(clusterSpecification);
+ public ClusterSpecification getClusterSpecification(Configuration
configuration, CommandLine commandLine) {
+ return new
ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
--- End diff --
After thinking again about it, I think it's ok how it is. The unsupported
operation is `StandaloneClusterDescriptor#deploySessionCluster`. But this has
nothing to do with the `DefaultCLI#getClusterSpecification`. In the future it
might very well be that we can also start a standalone cluster programmatically.
> Let CustomCommandLine return a ClusterDescriptor
> ------------------------------------------------
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
> Issue Type: Sub-task
> Components: Client
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}}
> and deploy a cluster. In order to better separate concerns it would be good
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}}
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink
> cluster.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)