This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch flip116
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 442a53391af000693921e88d7a5a055c7a025509
Author: Xintong Song <[email protected]>
AuthorDate: Thu Apr 9 13:09:58 2020 +0800

    [FLINK-16742][runtime] Ignore unknown command line options for 
BashJavaUtils.
---
 .../ClusterConfigurationParserFactory.java         |  8 +-
 .../runtime/taskexecutor/TaskManagerRunner.java    | 21 +----
 .../apache/flink/runtime/util/BashJavaUtils.java   | 38 ++++++++-
 .../runtime/util/ConfigurationParserUtils.java     | 35 ++++++++
 .../flink/runtime/util/BashJavaUtilsTest.java      | 96 ++++++++++++++++++++++
 5 files changed, 175 insertions(+), 23 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactory.java
index 12115d7..069d5b13 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactory.java
@@ -36,8 +36,7 @@ import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNA
  */
 public class ClusterConfigurationParserFactory implements 
ParserResultFactory<ClusterConfiguration> {
 
-       @Override
-       public Options getOptions() {
+       public static Options options() {
                final Options options = new Options();
                options.addOption(CONFIG_DIR_OPTION);
                options.addOption(DYNAMIC_PROPERTY_OPTION);
@@ -46,6 +45,11 @@ public class ClusterConfigurationParserFactory implements 
ParserResultFactory<Cl
        }
 
        @Override
+       public Options getOptions() {
+               return options();
+       }
+
+       @Override
        public ClusterConfiguration createResult(@Nonnull CommandLine 
commandLine) {
                final String configDir = 
commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt());
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 6fbd874..457b402 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.FileSystem;
@@ -34,10 +32,7 @@ import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.entrypoint.ClusterConfiguration;
-import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
-import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -55,6 +50,7 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskmanager.MemoryLogger;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
@@ -292,20 +288,7 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
        }
 
        public static Configuration loadConfiguration(String[] args) throws 
FlinkParseException {
-               final CommandLineParser<ClusterConfiguration> commandLineParser 
= new CommandLineParser<>(new ClusterConfigurationParserFactory());
-
-               final ClusterConfiguration clusterConfiguration;
-
-               try {
-                       clusterConfiguration = commandLineParser.parse(args);
-               } catch (FlinkParseException e) {
-                       LOG.error("Could not parse the command line options.", 
e);
-                       
commandLineParser.printHelp(TaskManagerRunner.class.getSimpleName());
-                       throw e;
-               }
-
-               final Configuration dynamicProperties = 
ConfigurationUtils.createConfiguration(clusterConfiguration.getDynamicProperties());
-               return 
GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir(), 
dynamicProperties);
+               return ConfigurationParserUtils.loadCommonConfiguration(args, 
TaskManagerRunner.class.getSimpleName());
        }
 
        public static void runTaskManager(Configuration configuration, 
ResourceID resourceId, PluginManager pluginManager) throws Exception {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java
index e00d667..1478e1c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java
@@ -23,10 +23,16 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
-import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
+import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory;
 import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
+import org.apache.flink.util.FlinkException;
 
+import org.apache.commons.cli.Options;
+
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -63,11 +69,39 @@ public class BashJavaUtils {
        }
 
        private static Configuration 
getConfigurationForStandaloneTaskManagers(String[] args) throws Exception {
-               Configuration configuration = 
TaskManagerRunner.loadConfiguration(args);
+               Configuration configuration = loadConfiguration(args);
                return 
TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
                        configuration, TaskManagerOptions.TOTAL_FLINK_MEMORY);
        }
 
+       @VisibleForTesting
+       static Configuration loadConfiguration(String[] args) throws 
FlinkException {
+               return ConfigurationParserUtils.loadCommonConfiguration(
+                       filterCmdArgs(args),
+                       BashJavaUtils.class.getSimpleName());
+       }
+
+       private static String[] filterCmdArgs(String[] args) {
+               final Options options = 
ClusterConfigurationParserFactory.options();
+               final List<String> filteredArgs = new ArrayList<>();
+               final Iterator<String> iter = Arrays.asList(args).iterator();
+
+               while (iter.hasNext()) {
+                       String token = iter.next();
+                       if (options.hasOption(token)) {
+                               filteredArgs.add(token);
+                               if (options.getOption(token).hasArg() && 
iter.hasNext()) {
+                                       filteredArgs.add(iter.next());
+                               }
+                       } else if (token.startsWith("-D")) {
+                               // "-Dkey=value"
+                               filteredArgs.add(token);
+                       }
+               }
+
+               return filteredArgs.toArray(new String[0]);
+       }
+
        /**
         * Commands that BashJavaUtils supports.
         */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
index 1104d83..ec0a7ff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
@@ -19,11 +19,20 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.entrypoint.ClusterConfiguration;
+import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory;
+import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.util.MathUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import static org.apache.flink.util.MathUtils.checkedDownCast;
 
 /**
@@ -32,6 +41,8 @@ import static org.apache.flink.util.MathUtils.checkedDownCast;
  */
 public class ConfigurationParserUtils {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(ConfigurationParserUtils.class);
+
        /**
         * Parses the configuration to get the number of slots and validates 
the value.
         *
@@ -95,4 +106,28 @@ public class ConfigurationParserUtils {
 
                return pageSize;
        }
+
+       /**
+        * Generate configuration from only the config file and dynamic 
properties.
+        * @param args the commandline arguments
+        * @param cmdLineSyntax the syntax for this application
+        * @return generated configuration
+        * @throws FlinkParseException if the configuration cannot be generated
+        */
+       public static Configuration loadCommonConfiguration(String[] args, 
String cmdLineSyntax) throws FlinkParseException {
+               final CommandLineParser<ClusterConfiguration> commandLineParser 
= new CommandLineParser<>(new ClusterConfigurationParserFactory());
+
+               final ClusterConfiguration clusterConfiguration;
+
+               try {
+                       clusterConfiguration = commandLineParser.parse(args);
+               } catch (FlinkParseException e) {
+                       LOG.error("Could not parse the command line options.", 
e);
+                       commandLineParser.printHelp(cmdLineSyntax);
+                       throw e;
+               }
+
+               final Configuration dynamicProperties = 
ConfigurationUtils.createConfiguration(clusterConfiguration.getDynamicProperties());
+               return 
GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir(), 
dynamicProperties);
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BashJavaUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BashJavaUtilsTest.java
new file mode 100644
index 0000000..413a6fe
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BashJavaUtilsTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileWriter;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for {@link BashJavaUtils}.
+ */
+public class BashJavaUtilsTest extends TestLogger {
+
+       private static final String TEST_CONFIG_KEY = "test.key";
+       private static final String TEST_CONFIG_VALUE = "test_value";
+
+       @Rule
+       public TemporaryFolder confDir = new TemporaryFolder() {
+               @Override
+               protected void before() throws Throwable {
+                       super.create();
+                       File flinkConfFile = newFile("flink-conf.yaml");
+                       FileWriter fw = new FileWriter(flinkConfFile);
+                       fw.write(TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE + 
"\n");
+                       fw.close();
+               }
+       };
+
+       @Test
+       public void testLoadConfigurationConfigDirLongOpt() throws Exception {
+               String[] args = {"--configDir", 
confDir.getRoot().getAbsolutePath()};
+               Configuration configuration = 
BashJavaUtils.loadConfiguration(args);
+               verifyConfiguration(configuration, TEST_CONFIG_KEY, 
TEST_CONFIG_VALUE);
+       }
+
+       @Test
+       public void testLoadConfigurationConfigDirShortOpt() throws Exception {
+               String[] args = {"-c", confDir.getRoot().getAbsolutePath()};
+               Configuration configuration = 
BashJavaUtils.loadConfiguration(args);
+               verifyConfiguration(configuration, TEST_CONFIG_KEY, 
TEST_CONFIG_VALUE);
+       }
+
+       @Test
+       public void testLoadConfigurationDynamicPropertyWithSpace() throws 
Exception {
+               String[] args = {"--configDir", 
confDir.getRoot().getAbsolutePath(), "-D", "key=value"};
+               Configuration configuration = 
BashJavaUtils.loadConfiguration(args);
+               verifyConfiguration(configuration, "key", "value");
+       }
+
+       @Test
+       public void testLoadConfigurationDynamicPropertyWithoutSpace() throws 
Exception {
+               String[] args = {"--configDir", 
confDir.getRoot().getAbsolutePath(), "-Dkey=value"};
+               Configuration configuration = 
BashJavaUtils.loadConfiguration(args);
+               verifyConfiguration(configuration, "key", "value");
+       }
+
+       @Test
+       public void testLoadConfigurationIgnoreUnknownToken() throws Exception {
+               String [] args = {"unknown", "-u", "--configDir", 
confDir.getRoot().getAbsolutePath(), "--unknown", "-Dkey=value"};
+               Configuration configuration = 
BashJavaUtils.loadConfiguration(args);
+               verifyConfiguration(configuration, TEST_CONFIG_KEY, 
TEST_CONFIG_VALUE);
+               verifyConfiguration(configuration, "key", "value");
+       }
+
+       private void verifyConfiguration(Configuration config, String key, 
String expectedValue) {
+               ConfigOption<String> option = 
key(key).stringType().noDefaultValue();
+               assertThat(config.get(option), is(expectedValue));
+       }
+}

Reply via email to