[
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311649#comment-16311649
]
ASF GitHub Bot commented on FLINK-8339:
---------------------------------------
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5225#discussion_r159698245
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java ---
@@ -51,45 +45,23 @@ public String getId() {
}
@Override
- public void addRunOptions(Options baseOptions) {
- }
+ public ClusterDescriptor<StandaloneClusterClient>
createClusterDescriptor(
+ Configuration configuration,
+ String configurationDirectory,
+ CommandLine commandLine) {
+ final Configuration effectiveConfiguration =
applyCommandLineOptionsToConfiguration(configuration, commandLine);
- @Override
- public void addGeneralOptions(Options baseOptions) {
+ return new StandaloneClusterDescriptor(effectiveConfiguration);
}
@Override
- public StandaloneClusterClient retrieveCluster(
- CommandLine commandLine,
- Configuration config,
- String configurationDirectory) {
-
- if
(commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
- String addressWithPort =
commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
- InetSocketAddress jobManagerAddress =
ClientUtils.parseHostPortAddress(addressWithPort);
- setJobManagerAddressInConfig(config, jobManagerAddress);
- }
-
- if
(commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
- String zkNamespace =
commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
- config.setString(HighAvailabilityOptions.HA_CLUSTER_ID,
zkNamespace);
- }
-
- StandaloneClusterDescriptor descriptor = new
StandaloneClusterDescriptor(config);
- return descriptor.retrieve(null);
+ @Nullable
+ public String getClusterId(Configuration configuration, CommandLine
commandLine) {
+ return "standalone";
}
@Override
- public StandaloneClusterClient createCluster(
- String applicationName,
- CommandLine commandLine,
- Configuration config,
- String configurationDirectory,
- List<URL> userJarFiles) throws
UnsupportedOperationException {
-
- StandaloneClusterDescriptor descriptor = new
StandaloneClusterDescriptor(config);
- ClusterSpecification clusterSpecification =
ClusterSpecification.fromConfiguration(config);
-
- return descriptor.deploySessionCluster(clusterSpecification);
+ public ClusterSpecification getClusterSpecification(Configuration
configuration, CommandLine commandLine) {
+ return new
ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
--- End diff --
Can this code actually be executed? It shouldn't be possible to deploy a
standalone cluster from what I understand:
`StandaloneClusterDescriptor#deploySessionCluster(ClusterSpecification
clusterSpecification)`
```
@Override
public StandaloneClusterClient
deploySessionCluster(ClusterSpecification clusterSpecification) throws
UnsupportedOperationException {
throw new UnsupportedOperationException("Can't deploy a
standalone cluster.");
}
```
If this shouldn't run, maybe throw an Exception directly.
> Let CustomCommandLine return a ClusterDescriptor
> ------------------------------------------------
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
> Issue Type: Sub-task
> Components: Client
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}}
> and deploy a cluster. In order to better separate concerns it would be good
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}}
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink
> cluster.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)