This is an automated email from the ASF dual-hosted git repository. sxnan pushed a commit to branch config-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
commit da9e852f07e80058d39d5c47e1129705dae9ef6f Author: sxnan <[email protected]> AuthorDate: Tue Sep 10 11:31:28 2024 +0800 [FLINK-34079][config] Refactor callers that use get/setInt --- .../flink/api/common/cache/DistributedCache.java | 7 +- .../flink/api/common/operators/Operator.java | 13 --- .../flink/configuration/ConfigurationUtils.java | 4 + .../ConfigurationConversionsTest.java | 53 ++++++----- .../flink/configuration/ConfigurationTest.java | 79 ++++++++-------- .../configuration/DelegatingConfigurationTest.java | 12 +-- .../fs/LimitedConnectionsConfigurationTest.java | 11 ++- .../fs/azurefs/AzureBlobStorageFSFactoryTest.java | 3 +- .../hdfs/LimitedConnectionsConfigurationTest.java | 11 ++- .../flink/fs/s3/common/S3EntropyFsFactoryTest.java | 3 +- .../flink/runtime/operators/util/TaskConfig.java | 103 ++++++++++++--------- .../flink/streaming/api/graph/StreamConfig.java | 42 +++++---- .../SlotCountExceedingParallelismTest.java | 14 ++- ...tVertexParallelismAndInputInfosDeciderTest.java | 4 +- .../recordutils/RecordComparatorFactory.java | 9 +- .../manual/StreamingScalabilityAndLatency.java | 5 +- .../org/apache/flink/test/operators/MapITCase.java | 3 +- .../test/runtime/NetworkStackThroughputITCase.java | 11 ++- .../apache/flink/yarn/YarnClusterDescriptor.java | 6 +- 19 files changed, 215 insertions(+), 178 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java index 7049cfa1487..a04d1a49d6e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java @@ -42,6 +42,7 @@ import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; /** * DistributedCache provides static methods to write the registered cache files into job @@ -174,8 +175,8 @@ public class DistributedCache { public static void writeFileInfoToConfig( String name, DistributedCacheEntry e, Configuration conf) { - int num = conf.getInteger(CACHE_FILE_NUM, 0) + 1; - conf.setInteger(CACHE_FILE_NUM, num); + int num = conf.get(getIntConfigOption(CACHE_FILE_NUM), 0) + 1; + conf.set(getIntConfigOption(CACHE_FILE_NUM), num); conf.setString(CACHE_FILE_NAME + num, name); conf.setString(CACHE_FILE_PATH + num, e.filePath); conf.set( @@ -191,7 +192,7 @@ public class DistributedCache { public static Set<Entry<String, DistributedCacheEntry>> readFileInfoFromConfig( Configuration conf) { - int num = conf.getInteger(CACHE_FILE_NUM, 0); + int num = conf.get(getIntConfigOption(CACHE_FILE_NUM), 0); if (num == 0) { return Collections.emptySet(); } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java index c8db0d3a11c..e32f4032100 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java @@ -131,19 +131,6 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> { this.parameters.setString(key, value); } - /** - * Sets a stub parameters in the configuration of this contract. The stub parameters are - * accessible by the user code at runtime. Parameters that the user code needs to access at - * runtime to configure its behavior are typically stored as stub parameters. - * - * @see #getParameters() - * @param key The parameter key. - * @param value The parameter value. - */ - public void setParameter(String key, int value) { - this.parameters.setInteger(key, value); - } - /** * Gets the parallelism for this contract instance. The parallelism denotes how many parallel * instances of the user function will be spawned during the execution. If this value is {@link diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index 7acc0145fd9..f561b77cab9 100755 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -703,4 +703,8 @@ public class ConfigurationUtils { public static ConfigOption<Float> getFloatConfigOption(String key) { return ConfigOptions.key(key).floatType().noDefaultValue(); } + + public static ConfigOption<Integer> getIntConfigOption(String key) { + return ConfigOptions.key(key).intType().noDefaultValue(); + } } diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java index dd50d9b216b..d96bcd85e73 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java @@ -37,6 +37,7 @@ import java.util.Optional; import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; import static org.apache.flink.configuration.ConfigurationUtils.getDoubleConfigOption; import static org.apache.flink.configuration.ConfigurationUtils.getFloatConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -60,7 +61,7 @@ class ConfigurationConversionsTest { void init() { pc = new Configuration(); - pc.setInteger("int", 5); + pc.set(getIntConfigOption("int"), 5); pc.setLong("long", 15); pc.setLong("too_long", TOO_LONG); pc.set(getFloatConfigOption("float"), 2.1456775f); @@ -77,7 +78,7 @@ class ConfigurationConversionsTest { private static Collection<TestSpec> getSpecs() { return Arrays.asList( // from integer - TestSpec.whenAccessed(conf -> conf.getInteger("int", 0)).expect(5), + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("int"), 0)).expect(5), TestSpec.whenAccessed(conf -> conf.getLong("int", 0)).expect(5L), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("int"), 0f)).expect(5f), TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("int"), 0.0)) @@ -89,7 +90,7 @@ class ConfigurationConversionsTest { .expectException("Configuration cannot evaluate value 5 as a byte[] value"), // from long - TestSpec.whenAccessed(conf -> conf.getInteger("long", 0)).expect(15), + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("long"), 0)).expect(15), TestSpec.whenAccessed(conf -> conf.getLong("long", 0)).expect(15L), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("long"), 0f)) .expect(15f), @@ -103,9 +104,8 @@ class ConfigurationConversionsTest { "Configuration cannot evaluate value 15 as a byte[] value"), // from too long - TestSpec.whenAccessed(conf -> conf.getInteger("too_long", 0)) - .expectException( - "Configuration value 2147483657 overflows/underflows the integer type"), + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("too_long"), 0)) + .expectException("Could not parse value '2147483657' for key 'too_long'."), TestSpec.whenAccessed(conf -> conf.getLong("too_long", 0)).expect(TOO_LONG), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("too_long"), 0f)) .expect((float) TOO_LONG), @@ -120,9 +120,10 @@ class ConfigurationConversionsTest { "Configuration cannot evaluate value 2147483657 as a byte[] value"), // from float - TestSpec.whenAccessed(conf -> conf.getInteger("float", 0)) + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("float"), 0)) .expectException( - "For input string: \"2.1456776\"", NumberFormatException.class), + "Could not parse value '2.1456776' for key 'float'.", + IllegalArgumentException.class), TestSpec.whenAccessed(conf -> conf.getLong("float", 0)) .expectException( "For input string: \"2.1456776\"", NumberFormatException.class), @@ -142,10 +143,10 @@ class ConfigurationConversionsTest { "Configuration cannot evaluate value 2.1456776 as a byte[] value"), // from double - TestSpec.whenAccessed(conf -> conf.getInteger("double", 0)) + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("double"), 0)) .expectException( - "For input string: \"3.141592653589793\"", - NumberFormatException.class), + "Could not parse value '3.141592653589793' for key 'double'.", + IllegalArgumentException.class), TestSpec.whenAccessed(conf -> conf.getLong("double", 0)) .expectException( "For input string: \"3.141592653589793\"", @@ -164,8 +165,10 @@ class ConfigurationConversionsTest { "Configuration cannot evaluate value 3.141592653589793 as a byte[] value"), // from negative double - TestSpec.whenAccessed(conf -> conf.getInteger("negative_double", 0)) - .expectException("For input string: \"-1.0\"", NumberFormatException.class), + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("negative_double"), 0)) + .expectException( + "Could not parse value '-1.0' for key 'negative_double'.", + IllegalArgumentException.class), TestSpec.whenAccessed(conf -> conf.getLong("negative_double", 0)) .expectException("For input string: \"-1.0\"", NumberFormatException.class), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("negative_double"), 0f)) @@ -183,8 +186,10 @@ class ConfigurationConversionsTest { "Configuration cannot evaluate value -1.0 as a byte[] value"), // from zero - TestSpec.whenAccessed(conf -> conf.getInteger("zero", 0)) - .expectException("For input string: \"0.0\"", NumberFormatException.class), + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("zero"), 0)) + .expectException( + "Could not parse value '0.0' for key 'zero'.", + IllegalArgumentException.class), TestSpec.whenAccessed(conf -> conf.getLong("zero", 0)) .expectException("For input string: \"0.0\"", NumberFormatException.class), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("zero"), 0f)) @@ -200,10 +205,10 @@ class ConfigurationConversionsTest { "Configuration cannot evaluate value 0.0 as a byte[] value"), // from too long double - TestSpec.whenAccessed(conf -> conf.getInteger("too_long_double", 0)) + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("too_long_double"), 0)) .expectException( - "For input string: \"1.7976931348623157E308\"", - NumberFormatException.class), + "Could not parse value '1.7976931348623157E308' for key 'too_long_double'.", + IllegalArgumentException.class), TestSpec.whenAccessed(conf -> conf.getLong("too_long_double", 0)) .expectException( "For input string: \"1.7976931348623157E308\"", @@ -225,7 +230,7 @@ class ConfigurationConversionsTest { "Configuration cannot evaluate value 1.7976931348623157E308 as a byte[] value"), // from string - TestSpec.whenAccessed(conf -> conf.getInteger("string", 0)).expect(42), + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("string"), 0)).expect(42), TestSpec.whenAccessed(conf -> conf.getLong("string", 0)).expect(42L), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("string"), 0f)) .expect(42f), @@ -239,9 +244,11 @@ class ConfigurationConversionsTest { "Configuration cannot evaluate value 42 as a byte[] value"), // from non convertible string - TestSpec.whenAccessed(conf -> conf.getInteger("non_convertible_string", 0)) + TestSpec.whenAccessed( + conf -> conf.get(getIntConfigOption("non_convertible_string"), 0)) .expectException( - "For input string: \"bcdefg&&\"", NumberFormatException.class), + "Could not parse value 'bcdefg&&' for key 'non_convertible_string'.", + IllegalArgumentException.class), TestSpec.whenAccessed(conf -> conf.getLong("non_convertible_string", 0)) .expectException( "For input string: \"bcdefg&&\"", NumberFormatException.class), @@ -274,8 +281,8 @@ class ConfigurationConversionsTest { "Configuration cannot evaluate value bcdefg&& as a byte[] value"), // from boolean - TestSpec.whenAccessed(conf -> conf.getInteger("boolean", 0)) - .expectException("For input string: \"true\""), + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("boolean"), 0)) + .expectException("Could not parse value 'true' for key 'boolean'."), TestSpec.whenAccessed(conf -> conf.getLong("boolean", 0)) .expectException("For input string: \"true\""), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("boolean"), 0f)) diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java index 85f9b6006ed..6363d748d5e 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java @@ -40,6 +40,7 @@ import java.util.stream.Collectors; import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; import static org.apache.flink.configuration.ConfigurationUtils.getDoubleConfigOption; import static org.apache.flink.configuration.ConfigurationUtils.getFloatConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -86,7 +87,7 @@ class ConfigurationTest { void testConfigurationSerializationAndGetters() throws ClassNotFoundException, IOException { final Configuration orig = new Configuration(standardYaml); orig.setString("mykey", "myvalue"); - orig.setInteger("mynumber", 100); + orig.set(getIntConfigOption("mynumber"), 100); orig.setLong("longvalue", 478236947162389746L); orig.set(getFloatConfigOption("PI"), 3.1415926f); orig.set(getDoubleConfigOption("E"), Math.E); @@ -95,7 +96,7 @@ class ConfigurationTest { final Configuration copy = InstantiationUtil.createCopyWritable(orig); assertThat("myvalue").isEqualTo(copy.getString("mykey", "null")); - assertThat(copy.getInteger("mynumber", 0)).isEqualTo(100); + assertThat(copy.get(getIntConfigOption("mynumber"), 0)).isEqualTo(100); assertThat(copy.getLong("longvalue", 0L)).isEqualTo(478236947162389746L); assertThat(copy.get(getFloatConfigOption("PI"), 3.1415926f)) .isCloseTo(3.1415926f, Offset.offset(0.0f)); @@ -124,7 +125,7 @@ class ConfigurationTest { @TestTemplate void testOptionWithDefault() { Configuration cfg = new Configuration(standardYaml); - cfg.setInteger("int-key", 11); + cfg.set(getIntConfigOption("int-key"), 11); cfg.setString("string-key", "abc"); ConfigOption<String> presentStringOption = @@ -135,7 +136,7 @@ class ConfigurationTest { assertThat(cfg.getString(presentStringOption)).isEqualTo("abc"); assertThat(cfg.getValue(presentStringOption)).isEqualTo("abc"); - assertThat(cfg.getInteger(presentIntOption)).isEqualTo(11); + assertThat(cfg.get(presentIntOption)).isEqualTo(11); assertThat(cfg.getValue(presentIntOption)).isEqualTo("11"); // test getting default when no value is present @@ -152,14 +153,14 @@ class ConfigurationTest { assertThat(cfg.getString(stringOption, "override")).isEqualTo("override"); // getting a primitive with a default value should work - assertThat(cfg.getInteger(intOption)).isEqualTo(87); + assertThat(cfg.get(intOption)).isEqualTo(87); assertThat(cfg.getValue(intOption)).isEqualTo("87"); } @TestTemplate void testOptionWithNoDefault() { Configuration cfg = new Configuration(standardYaml); - cfg.setInteger("int-key", 11); + cfg.get(getIntConfigOption("int-key"), 11); cfg.setString("string-key", "abc"); ConfigOption<String> presentStringOption = @@ -183,9 +184,9 @@ class ConfigurationTest { @TestTemplate void testDeprecatedKeys() { Configuration cfg = new Configuration(standardYaml); - cfg.setInteger("the-key", 11); - cfg.setInteger("old-key", 12); - cfg.setInteger("older-key", 13); + cfg.set(getIntConfigOption("the-key"), 11); + cfg.set(getIntConfigOption("old-key"), 12); + cfg.set(getIntConfigOption("older-key"), 13); ConfigOption<Integer> matchesFirst = ConfigOptions.key("the-key") @@ -211,18 +212,18 @@ class ConfigurationTest { .defaultValue(-1) .withDeprecatedKeys("not-there", "also-not-there"); - assertThat(cfg.getInteger(matchesFirst)).isEqualTo(11); - assertThat(cfg.getInteger(matchesSecond)).isEqualTo(12); - assertThat(cfg.getInteger(matchesThird)).isEqualTo(13); - assertThat(cfg.getInteger(notContained)).isEqualTo(-1); + assertThat(cfg.get(matchesFirst)).isEqualTo(11); + assertThat(cfg.get(matchesSecond)).isEqualTo(12); + assertThat(cfg.get(matchesThird)).isEqualTo(13); + assertThat(cfg.get(notContained)).isEqualTo(-1); } @TestTemplate void testFallbackKeys() { Configuration cfg = new Configuration(standardYaml); - cfg.setInteger("the-key", 11); - cfg.setInteger("old-key", 12); - cfg.setInteger("older-key", 13); + cfg.set(getIntConfigOption("the-key"), 11); + cfg.set(getIntConfigOption("old-key"), 12); + cfg.set(getIntConfigOption("older-key"), 13); ConfigOption<Integer> matchesFirst = ConfigOptions.key("the-key") @@ -248,10 +249,10 @@ class ConfigurationTest { .defaultValue(-1) .withFallbackKeys("not-there", "also-not-there"); - assertThat(cfg.getInteger(matchesFirst)).isEqualTo(11); - assertThat(cfg.getInteger(matchesSecond)).isEqualTo(12); - assertThat(cfg.getInteger(matchesThird)).isEqualTo(13); - assertThat(cfg.getInteger(notContained)).isEqualTo(-1); + assertThat(cfg.get(matchesFirst)).isEqualTo(11); + assertThat(cfg.get(matchesSecond)).isEqualTo(12); + assertThat(cfg.get(matchesThird)).isEqualTo(13); + assertThat(cfg.get(notContained)).isEqualTo(-1); } @TestTemplate @@ -270,12 +271,12 @@ class ConfigurationTest { .withDeprecatedKeys(deprecated.key()); final Configuration fallbackCfg = new Configuration(standardYaml); - fallbackCfg.setInteger(fallback, 1); - assertThat(fallbackCfg.getInteger(mainOption)).isOne(); + fallbackCfg.set(fallback, 1); + assertThat(fallbackCfg.get(mainOption)).isOne(); final Configuration deprecatedCfg = new Configuration(standardYaml); - deprecatedCfg.setInteger(deprecated, 2); - assertThat(deprecatedCfg.getInteger(mainOption)).isEqualTo(2); + deprecatedCfg.set(deprecated, 2); + assertThat(deprecatedCfg.get(mainOption)).isEqualTo(2); // reverse declaration of fallback and deprecated keys, fallback keys should always be used // first @@ -287,17 +288,17 @@ class ConfigurationTest { .withFallbackKeys(fallback.key()); final Configuration deprecatedAndFallBackConfig = new Configuration(standardYaml); - deprecatedAndFallBackConfig.setInteger(fallback, 1); - deprecatedAndFallBackConfig.setInteger(deprecated, 2); - assertThat(deprecatedAndFallBackConfig.getInteger(mainOption)).isOne(); - assertThat(deprecatedAndFallBackConfig.getInteger(reversedMainOption)).isOne(); + deprecatedAndFallBackConfig.set(fallback, 1); + deprecatedAndFallBackConfig.set(deprecated, 2); + assertThat(deprecatedAndFallBackConfig.get(mainOption)).isOne(); + assertThat(deprecatedAndFallBackConfig.get(reversedMainOption)).isOne(); } @TestTemplate void testRemove() { Configuration cfg = new Configuration(standardYaml); - cfg.setInteger("a", 1); - cfg.setInteger("b", 2); + cfg.set(getIntConfigOption("a"), 1); + cfg.set(getIntConfigOption("b"), 2); ConfigOption<Integer> validOption = ConfigOptions.key("a").intType().defaultValue(-1); @@ -324,11 +325,11 @@ class ConfigurationTest { Configuration cfg = new Configuration(standardYaml); String key1 = "a.b"; String key2 = "c.d"; - cfg.setInteger(key1, 42); - cfg.setInteger(key2, 44); - cfg.setInteger(key2 + ".f1", 44); - cfg.setInteger(key2 + ".f2", 44); - cfg.setInteger("e.f", 1337); + cfg.set(getIntConfigOption(key1), 42); + cfg.set(getIntConfigOption(key2), 44); + cfg.set(getIntConfigOption(key2 + ".f1"), 44); + cfg.set(getIntConfigOption(key2 + ".f2"), 44); + cfg.set(getIntConfigOption("e.f"), 1337); assertThat(cfg.removeKey("not-existing-key")).isFalse(); assertThat(cfg.removeKey(key1)).isTrue(); @@ -451,7 +452,7 @@ class ConfigurationTest { void testMapWithPrefix() { final Configuration cfg = new Configuration(standardYaml); cfg.setString(MAP_PROPERTY_1, "value1"); - cfg.setInteger(MAP_PROPERTY_2, 12); + cfg.set(getIntConfigOption(MAP_PROPERTY_2), 12); assertThat(cfg.get(MAP_OPTION)).isEqualTo(PROPERTIES_MAP); assertThat(cfg.contains(MAP_OPTION)).isTrue(); @@ -471,7 +472,7 @@ class ConfigurationTest { final Configuration cfg = new Configuration(standardYaml); cfg.set(MAP_OPTION, PROPERTIES_MAP); cfg.setString(MAP_PROPERTY_1, "value1"); - cfg.setInteger(MAP_PROPERTY_2, 99999); + cfg.get(getIntConfigOption(MAP_PROPERTY_2), 99999); assertThat(cfg.get(MAP_OPTION)).isEqualTo(PROPERTIES_MAP); assertThat(cfg.contains(MAP_OPTION)).isTrue(); @@ -482,7 +483,7 @@ class ConfigurationTest { void testMapThatOverwritesPrefix() { final Configuration cfg = new Configuration(standardYaml); cfg.setString(MAP_PROPERTY_1, "value1"); - cfg.setInteger(MAP_PROPERTY_2, 99999); + cfg.get(getIntConfigOption(MAP_PROPERTY_2), 99999); cfg.set(MAP_OPTION, PROPERTIES_MAP); assertThat(cfg.get(MAP_OPTION)).isEqualTo(PROPERTIES_MAP); @@ -494,7 +495,7 @@ class ConfigurationTest { void testMapRemovePrefix() { final Configuration cfg = new Configuration(standardYaml); cfg.setString(MAP_PROPERTY_1, "value1"); - cfg.setInteger(MAP_PROPERTY_2, 99999); + cfg.get(getIntConfigOption(MAP_PROPERTY_2), 99999); cfg.removeConfig(MAP_OPTION); assertThat(cfg.contains(MAP_OPTION)).isFalse(); diff --git a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java index 32a08ee3620..b7073cd381e 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java @@ -155,13 +155,13 @@ class DelegatingConfigurationTest { ConfigOptions.key("integer.key").intType().noDefaultValue(); // integerOption doesn't exist in delegatingConf, and it should be overrideDefault. - original.setInteger(integerOption, 1); - assertThat(delegatingConf.getInteger(integerOption, 2)).isEqualTo(2); + original.set(integerOption, 1); + assertThat(delegatingConf.get(integerOption, 2)).isEqualTo(2); assertThat(delegatingConf.get(integerOption, 2)).isEqualTo(2); // integerOption exists in delegatingConf, and it should be value that set before. delegatingConf.setInteger(integerOption, 3); - assertThat(delegatingConf.getInteger(integerOption, 2)).isEqualTo(3); + assertThat(delegatingConf.get(integerOption, 2)).isEqualTo(3); assertThat(delegatingConf.get(integerOption, 2)).isEqualTo(3); // Test for float @@ -217,13 +217,13 @@ class DelegatingConfigurationTest { assertThat(delegatingConf.get(integerOption)).isZero(); delegatingConf.removeConfig(integerOption); assertThat(delegatingConf.getOptional(integerOption)).isEmpty(); - assertThat(delegatingConf.getInteger(integerOption.key(), 0)).isZero(); + assertThat(delegatingConf.get(integerOption, 0)).isZero(); // Test for removeKey delegatingConf.set(integerOption, 0); - assertThat(delegatingConf.getInteger(integerOption, -1)).isZero(); + assertThat(delegatingConf.get(integerOption, -1)).isZero(); delegatingConf.removeKey(integerOption.key()); assertThat(delegatingConf.getOptional(integerOption)).isEmpty(); - assertThat(delegatingConf.getInteger(integerOption.key(), 0)).isZero(); + assertThat(delegatingConf.get(integerOption, 0)).isZero(); } } diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java index b9ffc12e293..9e3ac40d101 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java @@ -29,6 +29,7 @@ import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.net.URI; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; import static org.assertj.core.api.Assertions.assertThat; /** Tests that validate that the configuration for limited FS connections are properly picked up. */ @@ -54,11 +55,11 @@ class LimitedConnectionsConfigurationTest { // configure some limits, which should cause "fsScheme" to be limited final Configuration config = new Configuration(); - config.setInteger("fs." + fsScheme + ".limit.total", 42); - config.setInteger("fs." + fsScheme + ".limit.input", 11); - config.setInteger("fs." + fsScheme + ".limit.output", 40); - config.setInteger("fs." + fsScheme + ".limit.timeout", 12345); - config.setInteger("fs." + fsScheme + ".limit.stream-timeout", 98765); + config.set(getIntConfigOption("fs." + fsScheme + ".limit.total"), 42); + config.set(getIntConfigOption("fs." + fsScheme + ".limit.input"), 11); + config.set(getIntConfigOption("fs." + fsScheme + ".limit.output"), 40); + config.set(getIntConfigOption("fs." + fsScheme + ".limit.timeout"), 12345); + config.set(getIntConfigOption("fs." + fsScheme + ".limit.stream-timeout"), 98765); try { FileSystem.initialize(config); diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactoryTest.java b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactoryTest.java index 4f5cc22e26e..6e0f4f3786c 100644 --- a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactoryTest.java +++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactoryTest.java @@ -29,6 +29,7 @@ import java.lang.annotation.RetentionPolicy; import java.net.URI; import java.util.stream.Stream; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the AzureFSFactory. */ @@ -63,7 +64,7 @@ class AzureBlobStorageFSFactoryTest { final URI uri = URI.create(uriString); Configuration config = new Configuration(); - config.setInteger("fs.azure.io.retry.max.retries", 0); + config.set(getIntConfigOption("fs.azure.io.retry.max.retries"), 0); factory.configure(config); assertThatThrownBy(() -> factory.create(uri)).isInstanceOf(AzureException.class); diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java index af39bfd4295..554ad7a5b73 100644 --- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java +++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java @@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test; import java.net.URI; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; import static org.assertj.core.api.Assertions.assertThat; /** @@ -47,11 +48,11 @@ class LimitedConnectionsConfigurationTest { // configure some limits, which should cause "fsScheme" to be limited final Configuration config = new Configuration(); - config.setInteger("fs.hdfs.limit.total", 40); - config.setInteger("fs.hdfs.limit.input", 39); - config.setInteger("fs.hdfs.limit.output", 38); - config.setInteger("fs.hdfs.limit.timeout", 23456); - config.setInteger("fs.hdfs.limit.stream-timeout", 34567); + config.set(getIntConfigOption("fs.hdfs.limit.total"), 40); + config.set(getIntConfigOption("fs.hdfs.limit.input"), 39); + config.set(getIntConfigOption("fs.hdfs.limit.output"), 38); + config.set(getIntConfigOption("fs.hdfs.limit.timeout"), 23456); + config.set(getIntConfigOption("fs.hdfs.limit.stream-timeout"), 34567); try { FileSystem.initialize(config); diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java index 06887d25da4..5b6b60f35ae 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test; import java.net.URI; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; import static org.assertj.core.api.Assertions.assertThat; /** Tests that the file system factory picks up the entropy configuration properly. */ @@ -33,7 +34,7 @@ class S3EntropyFsFactoryTest { void testEntropyInjectionConfig() throws Exception { final Configuration conf = new Configuration(); conf.setString("s3.entropy.key", "__entropy__"); - conf.setInteger("s3.entropy.length", 7); + conf.set(getIntConfigOption("s3.entropy.length"), 7); TestS3FileSystemFactory factory = new TestS3FileSystemFactory(); factory.configure(conf); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java index a40b8aa7309..3dce6c3d2f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java @@ -52,6 +52,7 @@ import java.util.List; import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; import static org.apache.flink.configuration.ConfigurationUtils.getDoubleConfigOption; import static org.apache.flink.configuration.ConfigurationUtils.getFloatConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; /** Configuration class which stores all relevant parameters required to set up the Pact tasks. */ public class TaskConfig implements Serializable { @@ -348,11 +349,11 @@ public class TaskConfig implements Serializable { } public void setDriverStrategy(DriverStrategy strategy) { - this.config.setInteger(DRIVER_STRATEGY, strategy.ordinal()); + this.config.set(getIntConfigOption(DRIVER_STRATEGY), strategy.ordinal()); } public DriverStrategy getDriverStrategy() { - final int ls = this.config.getInteger(DRIVER_STRATEGY, -1); + final int ls = this.config.get(getIntConfigOption(DRIVER_STRATEGY), -1); if (ls == -1) { return DriverStrategy.NONE; } else if (ls < 0 || ls >= DriverStrategy.values().length) { @@ -415,11 +416,13 @@ public class TaskConfig implements Serializable { // -------------------------------------------------------------------------------------------- public void setInputLocalStrategy(int inputNum, LocalStrategy strategy) { - this.config.setInteger(INPUT_LOCAL_STRATEGY_PREFIX + inputNum, strategy.ordinal()); + this.config.set( + getIntConfigOption(INPUT_LOCAL_STRATEGY_PREFIX + inputNum), strategy.ordinal()); } public LocalStrategy getInputLocalStrategy(int inputNum) { - final int ls = this.config.getInteger(INPUT_LOCAL_STRATEGY_PREFIX + inputNum, -1); + final int ls = + this.config.get(getIntConfigOption(INPUT_LOCAL_STRATEGY_PREFIX + inputNum), -1); if (ls == -1) { return LocalStrategy.NONE; } else if (ls < 0 || ls >= LocalStrategy.values().length) { @@ -473,34 +476,38 @@ public class TaskConfig implements Serializable { } public int getNumInputs() { - return this.config.getInteger(NUM_INPUTS, 0); + return this.config.get(getIntConfigOption(NUM_INPUTS), 0); } public int getNumBroadcastInputs() { - return this.config.getInteger(NUM_BROADCAST_INPUTS, 0); + return this.config.get(getIntConfigOption(NUM_BROADCAST_INPUTS), 0); } public int getGroupSize(int groupIndex) { - return this.config.getInteger(INPUT_GROUP_SIZE_PREFIX + groupIndex, -1); + return this.config.get(getIntConfigOption(INPUT_GROUP_SIZE_PREFIX + groupIndex), -1); } public int getBroadcastGroupSize(int groupIndex) { - return this.config.getInteger(BROADCAST_INPUT_GROUP_SIZE_PREFIX + groupIndex, -1); + return this.config.get( + getIntConfigOption(BROADCAST_INPUT_GROUP_SIZE_PREFIX + groupIndex), -1); } public void addInputToGroup(int groupIndex) { final String grp = INPUT_GROUP_SIZE_PREFIX + groupIndex; - this.config.setInteger(grp, this.config.getInteger(grp, 0) + 1); - this.config.setInteger(NUM_INPUTS, this.config.getInteger(NUM_INPUTS, 0) + 1); + this.config.set(getIntConfigOption(grp), this.config.get(getIntConfigOption(grp), 0) + 1); + this.config.set( + getIntConfigOption(NUM_INPUTS), + this.config.get(getIntConfigOption(NUM_INPUTS), 0) + 1); } public void addBroadcastInputToGroup(int groupIndex) { final String grp = BROADCAST_INPUT_GROUP_SIZE_PREFIX + groupIndex; if (!this.config.containsKey(grp)) { - this.config.setInteger( - NUM_BROADCAST_INPUTS, this.config.getInteger(NUM_BROADCAST_INPUTS, 0) + 1); + this.config.set( + getIntConfigOption(NUM_BROADCAST_INPUTS), + this.config.get(getIntConfigOption(NUM_BROADCAST_INPUTS), 0) + 1); } - this.config.setInteger(grp, this.config.getInteger(grp, 0) + 1); + this.config.set(getIntConfigOption(grp), this.config.get(getIntConfigOption(grp), 0) + 1); } public void setInputAsynchronouslyMaterialized(int inputNum, boolean temp) { @@ -542,18 +549,19 @@ public class TaskConfig implements Serializable { // -------------------------------------------------------------------------------------------- public void addOutputShipStrategy(ShipStrategyType strategy) { - final int outputCnt = this.config.getInteger(OUTPUTS_NUM, 0); - this.config.setInteger(OUTPUT_SHIP_STRATEGY_PREFIX + outputCnt, strategy.ordinal()); - this.config.setInteger(OUTPUTS_NUM, outputCnt + 1); + final int outputCnt = this.config.get(getIntConfigOption(OUTPUTS_NUM), 0); + this.config.set( + getIntConfigOption(OUTPUT_SHIP_STRATEGY_PREFIX + outputCnt), strategy.ordinal()); + this.config.set(getIntConfigOption(OUTPUTS_NUM), outputCnt + 1); } public int getNumOutputs() { - return this.config.getInteger(OUTPUTS_NUM, 0); + return this.config.get(getIntConfigOption(OUTPUTS_NUM), 0); } public ShipStrategyType getOutputShipStrategy(int outputNum) { // check how many outputs are encoded in the config - final int outputCnt = this.config.getInteger(OUTPUTS_NUM, -1); + final int outputCnt = this.config.get(getIntConfigOption(OUTPUTS_NUM), -1); if (outputCnt < 1) { throw new CorruptConfigurationException( "No output ship strategies are specified in the configuration."); @@ -564,7 +572,8 @@ public class TaskConfig implements Serializable { throw new IllegalArgumentException("Invalid index for output shipping strategy."); } - final int strategy = this.config.getInteger(OUTPUT_SHIP_STRATEGY_PREFIX + outputNum, -1); + final int strategy = + this.config.get(getIntConfigOption(OUTPUT_SHIP_STRATEGY_PREFIX + outputNum), -1); if (strategy == -1) { throw new CorruptConfigurationException( "No output shipping strategy in configuration for output " + outputNum); @@ -705,22 +714,22 @@ public class TaskConfig implements Serializable { if (filehandles < 2) { throw new IllegalArgumentException(); } - this.config.setInteger(FILEHANDLES_DRIVER, filehandles); + this.config.set(getIntConfigOption(FILEHANDLES_DRIVER), filehandles); } public int getFilehandlesDriver() { - return this.config.getInteger(FILEHANDLES_DRIVER, -1); + return this.config.get(getIntConfigOption(FILEHANDLES_DRIVER), -1); } public void setFilehandlesInput(int inputNum, int filehandles) { if (filehandles < 2) { throw new IllegalArgumentException(); } - this.config.setInteger(FILEHANDLES_INPUT_PREFIX + inputNum, filehandles); + this.config.set(getIntConfigOption(FILEHANDLES_INPUT_PREFIX + inputNum), filehandles); } public int getFilehandlesInput(int inputNum) { - return this.config.getInteger(FILEHANDLES_INPUT_PREFIX + inputNum, -1); + return this.config.get(getIntConfigOption(FILEHANDLES_INPUT_PREFIX + inputNum), -1); } // -------------------------------------------------------------------------------------------- @@ -763,20 +772,20 @@ public class TaskConfig implements Serializable { // -------------------------------------------------------------------------------------------- public int getNumberOfChainedStubs() { - return this.config.getInteger(CHAINING_NUM_STUBS, 0); + return this.config.get(getIntConfigOption(CHAINING_NUM_STUBS), 0); } public void addChainedTask( @SuppressWarnings("rawtypes") Class<? extends ChainedDriver> chainedTaskClass, TaskConfig conf, String taskName) { - int numChainedYet = this.config.getInteger(CHAINING_NUM_STUBS, 0); + int numChainedYet = this.config.get(getIntConfigOption(CHAINING_NUM_STUBS), 0); this.config.setString(CHAINING_TASK_PREFIX + numChainedYet, chainedTaskClass.getName()); this.config.addAll(conf.config, CHAINING_TASKCONFIG_PREFIX + numChainedYet + SEPARATOR); this.config.setString(CHAINING_TASKNAME_PREFIX + numChainedYet, taskName); - this.config.setInteger(CHAINING_NUM_STUBS, ++numChainedYet); + this.config.set(getIntConfigOption(CHAINING_NUM_STUBS), ++numChainedYet); } public TaskConfig getChainedStubConfig(int chainPos) { @@ -813,11 +822,11 @@ public class TaskConfig implements Serializable { if (numberOfIterations <= 0) { throw new IllegalArgumentException(); } - this.config.setInteger(NUMBER_OF_ITERATIONS, numberOfIterations); + this.config.set(getIntConfigOption(NUMBER_OF_ITERATIONS), numberOfIterations); } public int getNumberOfIterations() { - int numberOfIterations = this.config.getInteger(NUMBER_OF_ITERATIONS, 0); + int numberOfIterations = this.config.get(getIntConfigOption(NUMBER_OF_ITERATIONS), 0); if (numberOfIterations <= 0) { throw new IllegalArgumentException(); } @@ -828,11 +837,12 @@ public class TaskConfig implements Serializable { if (inputIndex < 0) { throw new IllegalArgumentException(); } - this.config.setInteger(ITERATION_HEAD_INDEX_OF_PARTIAL_SOLUTION, inputIndex); + this.config.set(getIntConfigOption(ITERATION_HEAD_INDEX_OF_PARTIAL_SOLUTION), inputIndex); } public int getIterationHeadPartialSolutionOrWorksetInputIndex() { - int index = this.config.getInteger(ITERATION_HEAD_INDEX_OF_PARTIAL_SOLUTION, -1); + int index = + this.config.get(getIntConfigOption(ITERATION_HEAD_INDEX_OF_PARTIAL_SOLUTION), -1); if (index < 0) { throw new IllegalArgumentException(); } @@ -843,11 +853,11 @@ public class TaskConfig implements Serializable { if (inputIndex < 0) { throw new IllegalArgumentException(); } - this.config.setInteger(ITERATION_HEAD_INDEX_OF_SOLUTIONSET, inputIndex); + this.config.set(getIntConfigOption(ITERATION_HEAD_INDEX_OF_SOLUTIONSET), inputIndex); } public int getIterationHeadSolutionSetInputIndex() { - int index = this.config.getInteger(ITERATION_HEAD_INDEX_OF_SOLUTIONSET, -1); + int index = this.config.get(getIntConfigOption(ITERATION_HEAD_INDEX_OF_SOLUTIONSET), -1); if (index < 0) { throw new IllegalArgumentException(); } @@ -898,14 +908,15 @@ public class TaskConfig implements Serializable { if (numEvents <= 0) { throw new IllegalArgumentException(); } - this.config.setInteger(NUMBER_OF_EOS_EVENTS_PREFIX + inputGateIndex, numEvents); + this.config.set( + getIntConfigOption(NUMBER_OF_EOS_EVENTS_PREFIX + inputGateIndex), numEvents); } public int getNumberOfEventsUntilInterruptInIterativeGate(int inputGateIndex) { if (inputGateIndex < 0) { throw new IllegalArgumentException(); } - return this.config.getInteger(NUMBER_OF_EOS_EVENTS_PREFIX + inputGateIndex, 0); + return this.config.get(getIntConfigOption(NUMBER_OF_EOS_EVENTS_PREFIX + inputGateIndex), 0); } public void setBroadcastGateIterativeWithNumberOfEventsUntilInterrupt( @@ -916,25 +927,27 @@ public class TaskConfig implements Serializable { if (numEvents <= 0) { throw new IllegalArgumentException(); } - this.config.setInteger(NUMBER_OF_EOS_EVENTS_BROADCAST_PREFIX + bcGateIndex, numEvents); + this.config.set( + getIntConfigOption(NUMBER_OF_EOS_EVENTS_BROADCAST_PREFIX + bcGateIndex), numEvents); } public int getNumberOfEventsUntilInterruptInIterativeBroadcastGate(int bcGateIndex) { if (bcGateIndex < 0) { throw new IllegalArgumentException(); } - return this.config.getInteger(NUMBER_OF_EOS_EVENTS_BROADCAST_PREFIX + bcGateIndex, 0); + return this.config.get( + getIntConfigOption(NUMBER_OF_EOS_EVENTS_BROADCAST_PREFIX + bcGateIndex), 0); } public void setIterationId(int id) { if (id < 0) { throw new IllegalArgumentException(); } - this.config.setInteger(ITERATION_HEAD_ID, id); + this.config.set(getIntConfigOption(ITERATION_HEAD_ID), id); } public int getIterationId() { - int id = this.config.getInteger(ITERATION_HEAD_ID, -1); + int id = this.config.get(getIntConfigOption(ITERATION_HEAD_ID), -1); if (id == -1) { throw new CorruptConfigurationException("Iteration head ID is missing."); } @@ -953,11 +966,11 @@ public class TaskConfig implements Serializable { if (outputIndex < 0) { throw new IllegalArgumentException(); } - this.config.setInteger(ITERATION_HEAD_SYNC_OUT_INDEX, outputIndex); + this.config.set(getIntConfigOption(ITERATION_HEAD_SYNC_OUT_INDEX), outputIndex); } public int getIterationHeadIndexOfSyncOutput() { - int outputIndex = this.config.getInteger(ITERATION_HEAD_SYNC_OUT_INDEX, -1); + int outputIndex = this.config.get(getIntConfigOption(ITERATION_HEAD_SYNC_OUT_INDEX), -1); if (outputIndex < 0) { throw new IllegalArgumentException(); } @@ -1002,7 +1015,7 @@ public class TaskConfig implements Serializable { } public void addIterationAggregator(String name, Aggregator<?> aggregator) { - int num = this.config.getInteger(ITERATION_NUM_AGGREGATORS, 0); + int num = this.config.get(getIntConfigOption(ITERATION_NUM_AGGREGATORS), 0); this.config.setString(ITERATION_AGGREGATOR_NAME_PREFIX + num, name); try { InstantiationUtil.writeObjectToConfig( @@ -1011,11 +1024,11 @@ public class TaskConfig implements Serializable { throw new RuntimeException( "Error while writing the aggregator object to the task configuration."); } - this.config.setInteger(ITERATION_NUM_AGGREGATORS, num + 1); + this.config.set(getIntConfigOption(ITERATION_NUM_AGGREGATORS), num + 1); } public void addIterationAggregators(Collection<AggregatorWithName<?>> aggregators) { - int num = this.config.getInteger(ITERATION_NUM_AGGREGATORS, 0); + int num = this.config.get(getIntConfigOption(ITERATION_NUM_AGGREGATORS), 0); for (AggregatorWithName<?> awn : aggregators) { this.config.setString(ITERATION_AGGREGATOR_NAME_PREFIX + num, awn.getName()); try { @@ -1027,12 +1040,12 @@ public class TaskConfig implements Serializable { } num++; } - this.config.setInteger(ITERATION_NUM_AGGREGATORS, num); + this.config.set(getIntConfigOption(ITERATION_NUM_AGGREGATORS), num); } @SuppressWarnings("unchecked") public Collection<AggregatorWithName<?>> getIterationAggregators(ClassLoader cl) { - final int numAggs = this.config.getInteger(ITERATION_NUM_AGGREGATORS, 0); + final int numAggs = this.config.get(getIntConfigOption(ITERATION_NUM_AGGREGATORS), 0); if (numAggs == 0) { return Collections.emptyList(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 9ac18bfd908..c0e4214d41e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -84,14 +84,18 @@ public class StreamConfig implements Serializable { */ public static final String SERIALIZED_UDF_CLASS = "serializedUdfClass"; - private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs"; - private static final String NUMBER_OF_NETWORK_INPUTS = "numberOfNetworkInputs"; + private static final ConfigOption<Integer> NUMBER_OF_OUTPUTS = + ConfigOptions.key("numberOfOutputs").intType().defaultValue(0); + private static final ConfigOption<Integer> NUMBER_OF_NETWORK_INPUTS = + ConfigOptions.key("numberOfNetworkInputs").intType().defaultValue(0); private static final String CHAINED_OUTPUTS = "chainedOutputs"; private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_"; private static final ConfigOption<Boolean> IS_CHAINED_VERTEX = ConfigOptions.key("isChainedSubtask").booleanType().defaultValue(false); - private static final String CHAIN_INDEX = "chainIndex"; - private static final String VERTEX_NAME = "vertexID"; + private static final ConfigOption<Integer> CHAIN_INDEX = + ConfigOptions.key("chainIndex").intType().defaultValue(0); + private static final ConfigOption<Integer> VERTEX_NAME = + ConfigOptions.key("vertexID").intType().defaultValue(-1); private static final String ITERATION_ID = "iterationId"; private static final String INPUTS = "inputs"; private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out"; @@ -111,7 +115,8 @@ public class StreamConfig implements Serializable { private static final ConfigOption<Boolean> CHECKPOINTING_ENABLED = ConfigOptions.key("checkpointing").booleanType().defaultValue(false); - private static final String CHECKPOINT_MODE = "checkpointMode"; + private static final ConfigOption<Integer> CHECKPOINT_MODE = + ConfigOptions.key("checkpointMode").intType().defaultValue(-1); private static final String SAVEPOINT_DIR = "savepointdir"; private static final String CHECKPOINT_STORAGE = "checkpointstorage"; @@ -122,7 +127,8 @@ public class StreamConfig implements Serializable { private static final String STATE_KEY_SERIALIZER = "statekeyser"; - private static final String TIME_CHARACTERISTIC = "timechar"; + private static final ConfigOption<Integer> TIME_CHARACTERISTIC = + ConfigOptions.key("timechar").intType().defaultValue(-1); private static final String MANAGED_MEMORY_FRACTION_PREFIX = "managedMemFraction."; private static final ConfigOption<Boolean> STATE_BACKEND_USE_MANAGED_MEMORY = @@ -234,11 +240,11 @@ public class StreamConfig implements Serializable { // ------------------------------------------------------------------------ public void setVertexID(Integer vertexID) { - config.setInteger(VERTEX_NAME, vertexID); + config.set(VERTEX_NAME, vertexID); } public Integer getVertexID() { - return config.getInteger(VERTEX_NAME, -1); + return config.get(VERTEX_NAME); } /** Fraction of managed memory reserved for the given use case that this operator should use. */ @@ -294,11 +300,11 @@ public class StreamConfig implements Serializable { } public void setTimeCharacteristic(TimeCharacteristic characteristic) { - config.setInteger(TIME_CHARACTERISTIC, characteristic.ordinal()); + config.set(TIME_CHARACTERISTIC, characteristic.ordinal()); } public TimeCharacteristic getTimeCharacteristic() { - int ordinal = config.getInteger(TIME_CHARACTERISTIC, -1); + int ordinal = config.get(TIME_CHARACTERISTIC, -1); if (ordinal >= 0) { return TimeCharacteristic.values()[ordinal]; } else { @@ -449,19 +455,19 @@ public class StreamConfig implements Serializable { } public void setNumberOfNetworkInputs(int numberOfInputs) { - config.setInteger(NUMBER_OF_NETWORK_INPUTS, numberOfInputs); + config.set(NUMBER_OF_NETWORK_INPUTS, numberOfInputs); } public int getNumberOfNetworkInputs() { - return config.getInteger(NUMBER_OF_NETWORK_INPUTS, 0); + return config.get(NUMBER_OF_NETWORK_INPUTS); } public void setNumberOfOutputs(int numberOfOutputs) { - config.setInteger(NUMBER_OF_OUTPUTS, numberOfOutputs); + config.set(NUMBER_OF_OUTPUTS, numberOfOutputs); } public int getNumberOfOutputs() { - return config.getInteger(NUMBER_OF_OUTPUTS, 0); + return config.get(NUMBER_OF_OUTPUTS); } /** Sets the operator level non-chained outputs. */ @@ -518,11 +524,11 @@ public class StreamConfig implements Serializable { } public void setCheckpointMode(CheckpointingMode mode) { - config.setInteger(CHECKPOINT_MODE, mode.ordinal()); + config.set(CHECKPOINT_MODE, mode.ordinal()); } public CheckpointingMode getCheckpointMode() { - int ordinal = config.getInteger(CHECKPOINT_MODE, -1); + int ordinal = config.get(CHECKPOINT_MODE, -1); if (ordinal >= 0) { return CheckpointingMode.values()[ordinal]; } else { @@ -642,11 +648,11 @@ public class StreamConfig implements Serializable { } public void setChainIndex(int index) { - this.config.setInteger(CHAIN_INDEX, index); + this.config.set(CHAIN_INDEX, index); } public int getChainIndex() { - return this.config.getInteger(CHAIN_INDEX, 0); + return this.config.get(CHAIN_INDEX); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java index 53822036e33..e19be728a3b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java @@ -43,6 +43,8 @@ import org.junit.Test; import java.util.Arrays; import java.util.BitSet; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; + /** * Tests that Flink can execute jobs with a higher parallelism than available number of slots. This * effectively tests that Flink can execute jobs with blocking results in a staged fashion. @@ -108,12 +110,15 @@ public class SlotCountExceedingParallelismTest extends TestLogger { final JobVertex sender = new JobVertex("Sender"); sender.setInvokableClass(RoundRobinSubtaskIndexSender.class); sender.getConfiguration() - .setInteger(RoundRobinSubtaskIndexSender.CONFIG_KEY, receiverParallelism); + .get( + getIntConfigOption(RoundRobinSubtaskIndexSender.CONFIG_KEY), + receiverParallelism); sender.setParallelism(senderParallelism); final JobVertex receiver = new JobVertex("Receiver"); receiver.setInvokableClass(SubtaskIndexReceiver.class); - receiver.getConfiguration().setInteger(SubtaskIndexReceiver.CONFIG_KEY, senderParallelism); + receiver.getConfiguration() + .get(getIntConfigOption(SubtaskIndexReceiver.CONFIG_KEY), senderParallelism); receiver.setParallelism(receiverParallelism); receiver.connectNewDataSetAsInput( @@ -138,7 +143,8 @@ public class SlotCountExceedingParallelismTest extends TestLogger { public void invoke() throws Exception { RecordWriter<IntValue> writer = new RecordWriterBuilder<IntValue>().build(getEnvironment().getWriter(0)); - final int numberOfTimesToSend = getTaskConfiguration().getInteger(CONFIG_KEY, 0); + final int numberOfTimesToSend = + getTaskConfiguration().get(getIntConfigOption(CONFIG_KEY), 0); final IntValue subtaskIndex = new IntValue(getEnvironment().getTaskInfo().getIndexOfThisSubtask()); @@ -173,7 +179,7 @@ public class SlotCountExceedingParallelismTest extends TestLogger { try { final int numberOfSubtaskIndexesToReceive = - getTaskConfiguration().getInteger(CONFIG_KEY, 0); + getTaskConfiguration().get(getIntConfigOption(CONFIG_KEY), 0); final BitSet receivedSubtaskIndexes = new BitSet(numberOfSubtaskIndexesToReceive); IntValue record; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java index 1c26a4e32d3..d1b24d862f4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java @@ -383,7 +383,7 @@ class DefaultVertexParallelismAndInputInfosDeciderTest { @Test void testComputeSourceParallelismUpperBound() { Configuration configuration = new Configuration(); - configuration.setInteger( + configuration.set( BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM, DEFAULT_SOURCE_PARALLELISM); VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider = @@ -408,7 +408,7 @@ class DefaultVertexParallelismAndInputInfosDeciderTest { @Test void testComputeSourceParallelismUpperBoundNotExceedMaxParallelism() { Configuration configuration = new Configuration(); - configuration.setInteger( + configuration.set( BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM, VERTEX_MAX_PARALLELISM * 2); VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparatorFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparatorFactory.java index 97b12bf9509..dfa187736a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparatorFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparatorFactory.java @@ -27,6 +27,7 @@ import org.apache.flink.types.Value; import java.util.Arrays; import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; /** * A factory for a {@link org.apache.flink.api.common.typeutils.TypeComparator} for {@link Record}. @@ -101,9 +102,9 @@ public class RecordComparatorFactory implements TypeComparatorFactory<Record> { } // write the config - config.setInteger(NUM_KEYS, this.positions.length); + config.set(getIntConfigOption(NUM_KEYS), this.positions.length); for (int i = 0; i < this.positions.length; i++) { - config.setInteger(KEY_POS_PREFIX + i, this.positions[i]); + config.set(getIntConfigOption(KEY_POS_PREFIX + i), this.positions[i]); config.setString(KEY_CLASS_PREFIX + i, this.types[i].getName()); config.set( getBooleanConfigOption(KEY_SORT_DIRECTION_PREFIX + i), this.sortDirections[i]); @@ -115,7 +116,7 @@ public class RecordComparatorFactory implements TypeComparatorFactory<Record> { public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException { // figure out how many key fields there are - final int numKeyFields = config.getInteger(NUM_KEYS, -1); + final int numKeyFields = config.get(getIntConfigOption(NUM_KEYS), -1); if (numKeyFields < 0) { throw new IllegalConfigurationException( "The number of keys for the comparator is invalid: " + numKeyFields); @@ -128,7 +129,7 @@ public class RecordComparatorFactory implements TypeComparatorFactory<Record> { // read the individual key positions and types for (int i = 0; i < numKeyFields; i++) { // next key position - final int p = config.getInteger(KEY_POS_PREFIX + i, -1); + final int p = config.get(getIntConfigOption(KEY_POS_PREFIX + i), -1); if (p >= 0) { positions[i] = p; } else { diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java index 9e769c99379..1ed6867d100 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java @@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.test.util.MiniClusterWithClientResource; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; import static org.junit.Assert.fail; /** Manual test to evaluate impact of checkpointing on latency. */ @@ -53,8 +54,8 @@ public class StreamingScalabilityAndLatency { config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("80m")); config.set(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 20000); - config.setInteger("taskmanager.net.server.numThreads", 1); - config.setInteger("taskmanager.net.client.numThreads", 1); + config.set(getIntConfigOption("taskmanager.net.server.numThreads"), 1); + config.set(getIntConfigOption("taskmanager.net.client.numThreads"), 1); cluster = new MiniClusterWithClientResource( diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java index 0c2b2c716f7..ecb3bed4702 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java @@ -38,6 +38,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; import static org.apache.flink.test.util.TestBaseUtils.compareResultAsText; import static org.apache.flink.test.util.TestBaseUtils.compareResultAsTuples; @@ -512,7 +513,7 @@ public class MapITCase extends MultipleProgramsTestBaseJUnit4 { DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); Configuration conf = new Configuration(); - conf.setInteger(TEST_KEY, TEST_VALUE); + conf.set(getIntConfigOption(TEST_KEY), TEST_VALUE); DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.map(new RichMapper2()).withParameters(conf); List<Tuple3<Integer, Long, String>> result = bcMapDs.collect(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java index 8f85add350d..545f6e048d2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; /** Manually test the throughput of the network stack. */ public class NetworkStackThroughputITCase extends TestLogger { @@ -90,8 +91,11 @@ public class NetworkStackThroughputITCase extends TestLogger { // Determine the amount of data to send per subtask int dataVolumeGb = getTaskConfiguration() - .getInteger( - NetworkStackThroughputITCase.DATA_VOLUME_GB_CONFIG_KEY, 1); + .get( + getIntConfigOption( + NetworkStackThroughputITCase + .DATA_VOLUME_GB_CONFIG_KEY), + 1); long dataMbPerSubtask = (dataVolumeGb * 10) / getCurrentNumberOfSubtasks(); long numRecordsToEmit = @@ -331,7 +335,8 @@ public class NetworkStackThroughputITCase extends TestLogger { producer.setInvokableClass(SpeedTestProducer.class); producer.setParallelism(numSubtasks); - producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb); + producer.getConfiguration() + .set(getIntConfigOption(DATA_VOLUME_GB_CONFIG_KEY), dataVolumeGb); producer.getConfiguration() .set(getBooleanConfigOption(IS_SLOW_SENDER_CONFIG_KEY), isSlowSender); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index bc65a5ee93d..20548c619ea 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -915,15 +915,15 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) { // activate re-execution of failed applications appContext.setMaxAppAttempts( - configuration.getInteger( - YarnConfigOptions.APPLICATION_ATTEMPTS.key(), + configuration.get( + YarnConfigOptions.APPLICATION_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); activateHighAvailabilitySupport(appContext); } else { // set number of application retries to 1 in the default case appContext.setMaxAppAttempts( - configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1)); + configuration.get(YarnConfigOptions.APPLICATION_ATTEMPTS, 1)); } final Set<Path> userJarFiles = new HashSet<>();
