tillrohrmann commented on a change in pull request #13747: URL: https://github.com/apache/flink/pull/13747#discussion_r516025408
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/DynamicParametersConfigurationParserFactory.java ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; + +import javax.annotation.Nonnull; + +import java.util.Properties; + +import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION; + +/** + * {@code DynamicParametersCOnfigurationParserFactory} can be used to extract the dynamic parameters Review comment: typo: COnfiguration ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java ########## @@ -51,8 +56,24 @@ */ public class YarnEntrypointUtils { - public static Configuration loadConfiguration(String workingDirectory, Map<String, String> env) { - Configuration configuration = GlobalConfiguration.loadConfiguration(workingDirectory); + private static final Logger LOG = LoggerFactory.getLogger(YarnEntrypointUtils.class); + + public static Configuration loadConfiguration(String workingDirectory, String[] args, Map<String, String> env) { + Configuration dynamicParameters = null; + final CommandLineParser<Configuration> commandLineParser = new CommandLineParser<>( + new DynamicParametersConfigurationParserFactory()); + + try { + dynamicParameters = commandLineParser.parse(args); + } catch (FlinkParseException e) { + LOG.error("Could not parse command line arguments {}.", args, e); + commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName()); + System.exit(1); + } Review comment: This snipped looks quite similar to what you've put into the `KubernetesEntrypointUtils`. I'd suggest to factor it out and to reuse it. Maybe even pass `dynamicParameters` into this method instead of `args`. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java ########## @@ -73,7 +76,17 @@ public static void main(String[] args) { LOG.warn("Could not log YARN environment information.", e); } - Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env); + final CommandLineParser<Configuration> commandLineParser = new CommandLineParser<>(new DynamicParametersConfigurationParserFactory()); + Configuration dynamicParameters = null; + try { + dynamicParameters = commandLineParser.parse(args); + } catch (FlinkParseException e) { + LOG.error("Could not parse command line arguments {}.", args, e); + commandLineParser.printHelp(YarnSessionClusterEntrypoint.class.getSimpleName()); + System.exit(1); + } Review comment: Maybe one could introduce ``` private static <T> T parseArgumentsOrExit( Logger logger, String componentName, String[] args, ParserResultFactory<T> parserResultFactory) { final CommandLineParser<T> commandLineParser = new CommandLineParser<>(parserResultFactory); try { return commandLineParser.parse(args); } catch (FlinkParseException e) { logger.error("Could not parse command line arguments {}.", args, e); commandLineParser.printHelp(componentName); System.exit(1); return null; } } ``` into `ClusterEntrypointUtils` and use it across the cluster entrypoints. This would also ensure that we fail with a consistent error code if the parsing fails. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
