This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 79abfaab3460834887df9e8284dff51569b63ecc Author: Rui Fan <1996fan...@gmail.com> AuthorDate: Fri Jan 12 16:12:03 2024 +0800 [FLINK-34080][configuration] Add the `T get(ConfigOption<T> configOption, T overrideDefault)` for Configuration --- .../apache/flink/configuration/Configuration.java | 14 ++++++++ .../configuration/DelegatingConfiguration.java | 5 +++ .../flink/configuration/ConfigurationTest.java | 39 ++++++++++++++++++++++ .../configuration/DelegatingConfigurationTest.java | 10 ++++++ 4 files changed, 68 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java index fa1a1b85186..04f975d801d 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java @@ -737,6 +737,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters return getOptional(option).orElseGet(option::defaultValue); } + /** + * Returns the value associated with the given config option as a T. If no value is mapped under + * any key of the option, it returns the specified default instead of the option's default + * value. + * + * @param configOption The configuration option + * @param overrideDefault The value to return if no value was mapper for any key of the option + * @return the configured value associated with the given config option, or the overrideDefault + */ + @PublicEvolving + public <T> T get(ConfigOption<T> configOption, T overrideDefault) { + return getOptional(configOption).orElse(overrideDefault); + } + @Override public <T> Optional<T> getOptional(ConfigOption<T> option) { Optional<Object> rawValue = getRawValueFromOption(option); diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java index 2958496394c..d39f3b44d6d 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java @@ -362,6 +362,11 @@ public final class DelegatingConfiguration extends Configuration { return backingConfig.get(prefixOption(option, prefix)); } + @Override + public <T> T get(ConfigOption<T> configOption, T overrideDefault) { + return backingConfig.get(prefixOption(configOption, prefix), overrideDefault); + } + @Override public <T> Optional<T> getOptional(ConfigOption<T> option) { return backingConfig.getOptional(prefixOption(option, prefix)); diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java index 4c9ed3d4772..e540a3d3738 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java @@ -554,6 +554,45 @@ public class ConfigurationTest { .doesNotContain("secret_value")); } + @TestTemplate + void testGetWithOverrideDefault() { + final Configuration conf = new Configuration(standardYaml); + + // Test for integer without default value. + ConfigOption<Integer> integerOption0 = + ConfigOptions.key("integer.key0").intType().noDefaultValue(); + // integerOption0 doesn't exist in conf, and it should be overrideDefault. + assertThat(conf.get(integerOption0, 2)).isEqualTo(2); + // integerOption0 exists in conf, and it should be value that set before. + conf.set(integerOption0, 3); + assertThat(conf.get(integerOption0, 2)).isEqualTo(3); + + // Test for integer with default value, the default value should be ignored. + ConfigOption<Integer> integerOption1 = + ConfigOptions.key("integer.key1").intType().defaultValue(4); + assertThat(conf.get(integerOption1, 5)).isEqualTo(5); + // integerOption1 is changed. + conf.set(integerOption1, 6); + assertThat(conf.get(integerOption1, 5)).isEqualTo(6); + + // Test for string without default value. + ConfigOption<String> stringOption0 = + ConfigOptions.key("string.key0").stringType().noDefaultValue(); + // stringOption0 doesn't exist in conf, and it should be overrideDefault. + assertThat(conf.get(stringOption0, "a")).isEqualTo("a"); + // stringOption0 exists in conf, and it should be value that set before. + conf.set(stringOption0, "b"); + assertThat(conf.get(stringOption0, "a")).isEqualTo("b"); + + // Test for string with default value, the default value should be ignored. + ConfigOption<String> stringOption1 = + ConfigOptions.key("string.key1").stringType().defaultValue("c"); + assertThat(conf.get(stringOption1, "d")).isEqualTo("d"); + // stringOption1 is changed. + conf.set(stringOption1, "e"); + assertThat(conf.get(stringOption1, "d")).isEqualTo("e"); + } + // -------------------------------------------------------------------------------------------- // Test classes // -------------------------------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java index d7ad7ec90ed..3ad3fc07bd7 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java @@ -157,41 +157,51 @@ class DelegatingConfigurationTest { // integerOption doesn't exist in delegatingConf, and it should be overrideDefault. original.setInteger(integerOption, 1); assertThat(delegatingConf.getInteger(integerOption, 2)).isEqualTo(2); + assertThat(delegatingConf.get(integerOption, 2)).isEqualTo(2); // integerOption exists in delegatingConf, and it should be value that set before. delegatingConf.setInteger(integerOption, 3); assertThat(delegatingConf.getInteger(integerOption, 2)).isEqualTo(3); + assertThat(delegatingConf.get(integerOption, 2)).isEqualTo(3); // Test for float ConfigOption<Float> floatOption = ConfigOptions.key("float.key").floatType().noDefaultValue(); original.setFloat(floatOption, 4f); assertThat(delegatingConf.getFloat(floatOption, 5f)).isEqualTo(5f); + assertThat(delegatingConf.get(floatOption, 5f)).isEqualTo(5f); delegatingConf.setFloat(floatOption, 6f); assertThat(delegatingConf.getFloat(floatOption, 5f)).isEqualTo(6f); + assertThat(delegatingConf.get(floatOption, 5f)).isEqualTo(6f); // Test for double ConfigOption<Double> doubleOption = ConfigOptions.key("double.key").doubleType().noDefaultValue(); original.setDouble(doubleOption, 7d); assertThat(delegatingConf.getDouble(doubleOption, 8d)).isEqualTo(8d); + assertThat(delegatingConf.get(doubleOption, 8d)).isEqualTo(8d); delegatingConf.setDouble(doubleOption, 9f); assertThat(delegatingConf.getDouble(doubleOption, 8d)).isEqualTo(9f); + assertThat(delegatingConf.get(doubleOption, 8d)).isEqualTo(9f); // Test for long ConfigOption<Long> longOption = ConfigOptions.key("long.key").longType().noDefaultValue(); original.setLong(longOption, 10L); assertThat(delegatingConf.getLong(longOption, 11L)).isEqualTo(11L); + assertThat(delegatingConf.get(longOption, 11L)).isEqualTo(11L); delegatingConf.setLong(longOption, 12L); assertThat(delegatingConf.getLong(longOption, 11L)).isEqualTo(12L); + assertThat(delegatingConf.get(longOption, 11L)).isEqualTo(12L); // Test for boolean ConfigOption<Boolean> booleanOption = ConfigOptions.key("boolean.key").booleanType().noDefaultValue(); original.setBoolean(booleanOption, false); assertThat(delegatingConf.getBoolean(booleanOption, true)).isEqualTo(true); + assertThat(delegatingConf.get(booleanOption, true)).isEqualTo(true); delegatingConf.setBoolean(booleanOption, false); assertThat(delegatingConf.getBoolean(booleanOption, true)).isEqualTo(false); + assertThat(delegatingConf.get(booleanOption, true)).isEqualTo(false); } @Test