[FLINK-9820] Forward dynamic properties to Flink configuration in ClusterEntrypoint
With this commit we can use dynamic properties to overwrite configuration values in the ClusterEntrypoint. This closes #6317. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fbbf8ee Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fbbf8ee Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fbbf8ee Branch: refs/heads/master Commit: 2fbbf8ee662647c71581f5cd989226be820fed0f Parents: 5a4bdf2 Author: Till Rohrmann <trohrm...@apache.org> Authored: Tue Jul 10 23:23:59 2018 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Fri Jul 13 18:01:54 2018 +0200 ---------------------------------------------------------------------- .../flink/configuration/ConfigurationUtils.java | 21 ++++++++ .../configuration/ConfigurationUtilsTest.java | 54 ++++++++++++++++++++ .../runtime/entrypoint/ClusterEntrypoint.java | 6 ++- 3 files changed, 79 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2fbbf8ee/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index 8566a43..3d1d830 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -21,6 +21,8 @@ package org.apache.flink.configuration; import javax.annotation.Nonnull; import java.io.File; +import java.util.Properties; +import java.util.Set; /** * Utility class for {@link Configuration} related helper functions. @@ -54,6 +56,25 @@ public class ConfigurationUtils { return splitPaths(configValue); } + /** + * Creates a new {@link Configuration} from the given {@link Properties}. + * + * @param properties to convert into a {@link Configuration} + * @return {@link Configuration} which has been populated by the values of the given {@link Properties} + */ + @Nonnull + public static Configuration createConfiguration(Properties properties) { + final Configuration configuration = new Configuration(); + + final Set<String> propertyNames = properties.stringPropertyNames(); + + for (String propertyName : propertyNames) { + configuration.setString(propertyName, properties.getProperty(propertyName)); + } + + return configuration; + } + @Nonnull private static String[] splitPaths(@Nonnull String separatedPaths) { return separatedPaths.length() > 0 ? separatedPaths.split(",|" + File.pathSeparator) : EMPTY; http://git-wip-us.apache.org/repos/asf/flink/blob/2fbbf8ee/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java new file mode 100644 index 0000000..2019d98 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Properties; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link ConfigurationUtils}. + */ +public class ConfigurationUtilsTest extends TestLogger { + + @Test + public void testPropertiesToConfiguration() { + final Properties properties = new Properties(); + final int entries = 10; + + for (int i = 0; i < entries; i++) { + properties.setProperty("key" + i, "value" + i); + } + + final Configuration configuration = ConfigurationUtils.createConfiguration(properties); + + for (String key : properties.stringPropertyNames()) { + assertThat(configuration.getString(key, ""), is(equalTo(properties.getProperty(key)))); + } + + assertThat(configuration.toMap().size(), is(properties.size())); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2fbbf8ee/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 9ae7b8b..b429de5 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; @@ -688,14 +689,15 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException; - private static EntrypointClusterConfiguration parseArguments(String[] args) throws FlinkParseException { + protected static EntrypointClusterConfiguration parseArguments(String[] args) throws FlinkParseException { final CommandLineParser<EntrypointClusterConfiguration> clusterConfigurationParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory()); return clusterConfigurationParser.parse(args); } protected static Configuration loadConfiguration(EntrypointClusterConfiguration entrypointClusterConfiguration) { - final Configuration configuration = GlobalConfiguration.loadConfiguration(entrypointClusterConfiguration.getConfigDir()); + final Configuration dynamicProperties = ConfigurationUtils.createConfiguration(entrypointClusterConfiguration.getDynamicProperties()); + final Configuration configuration = GlobalConfiguration.loadConfiguration(entrypointClusterConfiguration.getConfigDir(), dynamicProperties); final int restPort = entrypointClusterConfiguration.getRestPort();