This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch flip116 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 442a53391af000693921e88d7a5a055c7a025509 Author: Xintong Song <[email protected]> AuthorDate: Thu Apr 9 13:09:58 2020 +0800 [FLINK-16742][runtime] Ignore unknown command line options for BashJavaUtils. --- .../ClusterConfigurationParserFactory.java | 8 +- .../runtime/taskexecutor/TaskManagerRunner.java | 21 +---- .../apache/flink/runtime/util/BashJavaUtils.java | 38 ++++++++- .../runtime/util/ConfigurationParserUtils.java | 35 ++++++++ .../flink/runtime/util/BashJavaUtilsTest.java | 96 ++++++++++++++++++++++ 5 files changed, 175 insertions(+), 23 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactory.java index 12115d7..069d5b13 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactory.java @@ -36,8 +36,7 @@ import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNA */ public class ClusterConfigurationParserFactory implements ParserResultFactory<ClusterConfiguration> { - @Override - public Options getOptions() { + public static Options options() { final Options options = new Options(); options.addOption(CONFIG_DIR_OPTION); options.addOption(DYNAMIC_PROPERTY_OPTION); @@ -46,6 +45,11 @@ public class ClusterConfigurationParserFactory implements ParserResultFactory<Cl } @Override + public Options getOptions() { + return options(); + } + + @Override public ClusterConfiguration createResult(@Nonnull CommandLine commandLine) { final String configDir = commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 6fbd874..457b402 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -21,8 +21,6 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ConfigurationUtils; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.fs.FileSystem; @@ -34,10 +32,7 @@ import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; -import org.apache.flink.runtime.entrypoint.ClusterConfiguration; -import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory; import org.apache.flink.runtime.entrypoint.FlinkParseException; -import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; @@ -55,6 +50,7 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.taskmanager.MemoryLogger; +import org.apache.flink.runtime.util.ConfigurationParserUtils; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.runtime.util.Hardware; @@ -292,20 +288,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync } public static Configuration loadConfiguration(String[] args) throws FlinkParseException { - final CommandLineParser<ClusterConfiguration> commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory()); - - final ClusterConfiguration clusterConfiguration; - - try { - clusterConfiguration = commandLineParser.parse(args); - } catch (FlinkParseException e) { - LOG.error("Could not parse the command line options.", e); - commandLineParser.printHelp(TaskManagerRunner.class.getSimpleName()); - throw e; - } - - final Configuration dynamicProperties = ConfigurationUtils.createConfiguration(clusterConfiguration.getDynamicProperties()); - return GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir(), dynamicProperties); + return ConfigurationParserUtils.loadCommonConfiguration(args, TaskManagerRunner.class.getSimpleName()); } public static void runTaskManager(Configuration configuration, ResourceID resourceId, PluginManager pluginManager) throws Exception { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java index e00d667..1478e1c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java @@ -23,10 +23,16 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; -import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; +import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory; import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils; +import org.apache.flink.util.FlinkException; +import org.apache.commons.cli.Options; + +import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import static org.apache.flink.util.Preconditions.checkArgument; @@ -63,11 +69,39 @@ public class BashJavaUtils { } private static Configuration getConfigurationForStandaloneTaskManagers(String[] args) throws Exception { - Configuration configuration = TaskManagerRunner.loadConfiguration(args); + Configuration configuration = loadConfiguration(args); return TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption( configuration, TaskManagerOptions.TOTAL_FLINK_MEMORY); } + @VisibleForTesting + static Configuration loadConfiguration(String[] args) throws FlinkException { + return ConfigurationParserUtils.loadCommonConfiguration( + filterCmdArgs(args), + BashJavaUtils.class.getSimpleName()); + } + + private static String[] filterCmdArgs(String[] args) { + final Options options = ClusterConfigurationParserFactory.options(); + final List<String> filteredArgs = new ArrayList<>(); + final Iterator<String> iter = Arrays.asList(args).iterator(); + + while (iter.hasNext()) { + String token = iter.next(); + if (options.hasOption(token)) { + filteredArgs.add(token); + if (options.getOption(token).hasArg() && iter.hasNext()) { + filteredArgs.add(iter.next()); + } + } else if (token.startsWith("-D")) { + // "-Dkey=value" + filteredArgs.add(token); + } + } + + return filteredArgs.toArray(new String[0]); + } + /** * Commands that BashJavaUtils supports. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java index 1104d83..ec0a7ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java @@ -19,11 +19,20 @@ package org.apache.flink.runtime.util; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.entrypoint.ClusterConfiguration; +import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory; +import org.apache.flink.runtime.entrypoint.FlinkParseException; +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.util.MathUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static org.apache.flink.util.MathUtils.checkedDownCast; /** @@ -32,6 +41,8 @@ import static org.apache.flink.util.MathUtils.checkedDownCast; */ public class ConfigurationParserUtils { + private static final Logger LOG = LoggerFactory.getLogger(ConfigurationParserUtils.class); + /** * Parses the configuration to get the number of slots and validates the value. * @@ -95,4 +106,28 @@ public class ConfigurationParserUtils { return pageSize; } + + /** + * Generate configuration from only the config file and dynamic properties. + * @param args the commandline arguments + * @param cmdLineSyntax the syntax for this application + * @return generated configuration + * @throws FlinkParseException if the configuration cannot be generated + */ + public static Configuration loadCommonConfiguration(String[] args, String cmdLineSyntax) throws FlinkParseException { + final CommandLineParser<ClusterConfiguration> commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory()); + + final ClusterConfiguration clusterConfiguration; + + try { + clusterConfiguration = commandLineParser.parse(args); + } catch (FlinkParseException e) { + LOG.error("Could not parse the command line options.", e); + commandLineParser.printHelp(cmdLineSyntax); + throw e; + } + + final Configuration dynamicProperties = ConfigurationUtils.createConfiguration(clusterConfiguration.getDynamicProperties()); + return GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir(), dynamicProperties); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BashJavaUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BashJavaUtilsTest.java new file mode 100644 index 0000000..413a6fe --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BashJavaUtilsTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.TestLogger; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.FileWriter; + +import static org.apache.flink.configuration.ConfigOptions.key; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Tests for {@link BashJavaUtils}. + */ +public class BashJavaUtilsTest extends TestLogger { + + private static final String TEST_CONFIG_KEY = "test.key"; + private static final String TEST_CONFIG_VALUE = "test_value"; + + @Rule + public TemporaryFolder confDir = new TemporaryFolder() { + @Override + protected void before() throws Throwable { + super.create(); + File flinkConfFile = newFile("flink-conf.yaml"); + FileWriter fw = new FileWriter(flinkConfFile); + fw.write(TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE + "\n"); + fw.close(); + } + }; + + @Test + public void testLoadConfigurationConfigDirLongOpt() throws Exception { + String[] args = {"--configDir", confDir.getRoot().getAbsolutePath()}; + Configuration configuration = BashJavaUtils.loadConfiguration(args); + verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE); + } + + @Test + public void testLoadConfigurationConfigDirShortOpt() throws Exception { + String[] args = {"-c", confDir.getRoot().getAbsolutePath()}; + Configuration configuration = BashJavaUtils.loadConfiguration(args); + verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE); + } + + @Test + public void testLoadConfigurationDynamicPropertyWithSpace() throws Exception { + String[] args = {"--configDir", confDir.getRoot().getAbsolutePath(), "-D", "key=value"}; + Configuration configuration = BashJavaUtils.loadConfiguration(args); + verifyConfiguration(configuration, "key", "value"); + } + + @Test + public void testLoadConfigurationDynamicPropertyWithoutSpace() throws Exception { + String[] args = {"--configDir", confDir.getRoot().getAbsolutePath(), "-Dkey=value"}; + Configuration configuration = BashJavaUtils.loadConfiguration(args); + verifyConfiguration(configuration, "key", "value"); + } + + @Test + public void testLoadConfigurationIgnoreUnknownToken() throws Exception { + String [] args = {"unknown", "-u", "--configDir", confDir.getRoot().getAbsolutePath(), "--unknown", "-Dkey=value"}; + Configuration configuration = BashJavaUtils.loadConfiguration(args); + verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE); + verifyConfiguration(configuration, "key", "value"); + } + + private void verifyConfiguration(Configuration config, String key, String expectedValue) { + ConfigOption<String> option = key(key).stringType().noDefaultValue(); + assertThat(config.get(option), is(expectedValue)); + } +}
