This is an automated email from the ASF dual-hosted git repository.

sxnan pushed a commit to branch config-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git

commit da9e852f07e80058d39d5c47e1129705dae9ef6f
Author: sxnan <[email protected]>
AuthorDate: Tue Sep 10 11:31:28 2024 +0800

    [FLINK-34079][config] Refactor callers that use get/setInt
---
 .../flink/api/common/cache/DistributedCache.java   |   7 +-
 .../flink/api/common/operators/Operator.java       |  13 ---
 .../flink/configuration/ConfigurationUtils.java    |   4 +
 .../ConfigurationConversionsTest.java              |  53 ++++++-----
 .../flink/configuration/ConfigurationTest.java     |  79 ++++++++--------
 .../configuration/DelegatingConfigurationTest.java |  12 +--
 .../fs/LimitedConnectionsConfigurationTest.java    |  11 ++-
 .../fs/azurefs/AzureBlobStorageFSFactoryTest.java  |   3 +-
 .../hdfs/LimitedConnectionsConfigurationTest.java  |  11 ++-
 .../flink/fs/s3/common/S3EntropyFsFactoryTest.java |   3 +-
 .../flink/runtime/operators/util/TaskConfig.java   | 103 ++++++++++++---------
 .../flink/streaming/api/graph/StreamConfig.java    |  42 +++++----
 .../SlotCountExceedingParallelismTest.java         |  14 ++-
 ...tVertexParallelismAndInputInfosDeciderTest.java |   4 +-
 .../recordutils/RecordComparatorFactory.java       |   9 +-
 .../manual/StreamingScalabilityAndLatency.java     |   5 +-
 .../org/apache/flink/test/operators/MapITCase.java |   3 +-
 .../test/runtime/NetworkStackThroughputITCase.java |  11 ++-
 .../apache/flink/yarn/YarnClusterDescriptor.java   |   6 +-
 19 files changed, 215 insertions(+), 178 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
index 7049cfa1487..a04d1a49d6e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
@@ -42,6 +42,7 @@ import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption;
+import static 
org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption;
 
 /**
  * DistributedCache provides static methods to write the registered cache 
files into job
@@ -174,8 +175,8 @@ public class DistributedCache {
 
     public static void writeFileInfoToConfig(
             String name, DistributedCacheEntry e, Configuration conf) {
-        int num = conf.getInteger(CACHE_FILE_NUM, 0) + 1;
-        conf.setInteger(CACHE_FILE_NUM, num);
+        int num = conf.get(getIntConfigOption(CACHE_FILE_NUM), 0) + 1;
+        conf.set(getIntConfigOption(CACHE_FILE_NUM), num);
         conf.setString(CACHE_FILE_NAME + num, name);
         conf.setString(CACHE_FILE_PATH + num, e.filePath);
         conf.set(
@@ -191,7 +192,7 @@ public class DistributedCache {
 
     public static Set<Entry<String, DistributedCacheEntry>> 
readFileInfoFromConfig(
             Configuration conf) {
-        int num = conf.getInteger(CACHE_FILE_NUM, 0);
+        int num = conf.get(getIntConfigOption(CACHE_FILE_NUM), 0);
         if (num == 0) {
             return Collections.emptySet();
         }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
index c8db0d3a11c..e32f4032100 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
@@ -131,19 +131,6 @@ public abstract class Operator<OUT> implements 
Visitable<Operator<?>> {
         this.parameters.setString(key, value);
     }
 
-    /**
-     * Sets a stub parameters in the configuration of this contract. The stub 
parameters are
-     * accessible by the user code at runtime. Parameters that the user code 
needs to access at
-     * runtime to configure its behavior are typically stored as stub 
parameters.
-     *
-     * @see #getParameters()
-     * @param key The parameter key.
-     * @param value The parameter value.
-     */
-    public void setParameter(String key, int value) {
-        this.parameters.setInteger(key, value);
-    }
-
     /**
      * Gets the parallelism for this contract instance. The parallelism 
denotes how many parallel
      * instances of the user function will be spawned during the execution. If 
this value is {@link
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index 7acc0145fd9..f561b77cab9 100755
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -703,4 +703,8 @@ public class ConfigurationUtils {
     public static ConfigOption<Float> getFloatConfigOption(String key) {
         return ConfigOptions.key(key).floatType().noDefaultValue();
     }
+
+    public static ConfigOption<Integer> getIntConfigOption(String key) {
+        return ConfigOptions.key(key).intType().noDefaultValue();
+    }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java
index dd50d9b216b..d96bcd85e73 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java
@@ -37,6 +37,7 @@ import java.util.Optional;
 import static 
org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption;
 import static 
org.apache.flink.configuration.ConfigurationUtils.getDoubleConfigOption;
 import static 
org.apache.flink.configuration.ConfigurationUtils.getFloatConfigOption;
+import static 
org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -60,7 +61,7 @@ class ConfigurationConversionsTest {
     void init() {
         pc = new Configuration();
 
-        pc.setInteger("int", 5);
+        pc.set(getIntConfigOption("int"), 5);
         pc.setLong("long", 15);
         pc.setLong("too_long", TOO_LONG);
         pc.set(getFloatConfigOption("float"), 2.1456775f);
@@ -77,7 +78,7 @@ class ConfigurationConversionsTest {
     private static Collection<TestSpec> getSpecs() {
         return Arrays.asList(
                 // from integer
-                TestSpec.whenAccessed(conf -> conf.getInteger("int", 
0)).expect(5),
+                TestSpec.whenAccessed(conf -> 
conf.get(getIntConfigOption("int"), 0)).expect(5),
                 TestSpec.whenAccessed(conf -> conf.getLong("int", 
0)).expect(5L),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("int"), 0f)).expect(5f),
                 TestSpec.whenAccessed(conf -> 
conf.get(getDoubleConfigOption("int"), 0.0))
@@ -89,7 +90,7 @@ class ConfigurationConversionsTest {
                         .expectException("Configuration cannot evaluate value 
5 as a byte[] value"),
 
                 // from long
-                TestSpec.whenAccessed(conf -> conf.getInteger("long", 
0)).expect(15),
+                TestSpec.whenAccessed(conf -> 
conf.get(getIntConfigOption("long"), 0)).expect(15),
                 TestSpec.whenAccessed(conf -> conf.getLong("long", 
0)).expect(15L),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("long"), 0f))
                         .expect(15f),
@@ -103,9 +104,8 @@ class ConfigurationConversionsTest {
                                 "Configuration cannot evaluate value 15 as a 
byte[] value"),
 
                 // from too long
-                TestSpec.whenAccessed(conf -> conf.getInteger("too_long", 0))
-                        .expectException(
-                                "Configuration value 2147483657 
overflows/underflows the integer type"),
+                TestSpec.whenAccessed(conf -> 
conf.get(getIntConfigOption("too_long"), 0))
+                        .expectException("Could not parse value '2147483657' 
for key 'too_long'."),
                 TestSpec.whenAccessed(conf -> conf.getLong("too_long", 
0)).expect(TOO_LONG),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("too_long"), 0f))
                         .expect((float) TOO_LONG),
@@ -120,9 +120,10 @@ class ConfigurationConversionsTest {
                                 "Configuration cannot evaluate value 
2147483657 as a byte[] value"),
 
                 // from float
-                TestSpec.whenAccessed(conf -> conf.getInteger("float", 0))
+                TestSpec.whenAccessed(conf -> 
conf.get(getIntConfigOption("float"), 0))
                         .expectException(
-                                "For input string: \"2.1456776\"", 
NumberFormatException.class),
+                                "Could not parse value '2.1456776' for key 
'float'.",
+                                IllegalArgumentException.class),
                 TestSpec.whenAccessed(conf -> conf.getLong("float", 0))
                         .expectException(
                                 "For input string: \"2.1456776\"", 
NumberFormatException.class),
@@ -142,10 +143,10 @@ class ConfigurationConversionsTest {
                                 "Configuration cannot evaluate value 2.1456776 
as a byte[] value"),
 
                 // from double
-                TestSpec.whenAccessed(conf -> conf.getInteger("double", 0))
+                TestSpec.whenAccessed(conf -> 
conf.get(getIntConfigOption("double"), 0))
                         .expectException(
-                                "For input string: \"3.141592653589793\"",
-                                NumberFormatException.class),
+                                "Could not parse value '3.141592653589793' for 
key 'double'.",
+                                IllegalArgumentException.class),
                 TestSpec.whenAccessed(conf -> conf.getLong("double", 0))
                         .expectException(
                                 "For input string: \"3.141592653589793\"",
@@ -164,8 +165,10 @@ class ConfigurationConversionsTest {
                                 "Configuration cannot evaluate value 
3.141592653589793 as a byte[] value"),
 
                 // from negative double
-                TestSpec.whenAccessed(conf -> 
conf.getInteger("negative_double", 0))
-                        .expectException("For input string: \"-1.0\"", 
NumberFormatException.class),
+                TestSpec.whenAccessed(conf -> 
conf.get(getIntConfigOption("negative_double"), 0))
+                        .expectException(
+                                "Could not parse value '-1.0' for key 
'negative_double'.",
+                                IllegalArgumentException.class),
                 TestSpec.whenAccessed(conf -> conf.getLong("negative_double", 
0))
                         .expectException("For input string: \"-1.0\"", 
NumberFormatException.class),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("negative_double"), 0f))
@@ -183,8 +186,10 @@ class ConfigurationConversionsTest {
                                 "Configuration cannot evaluate value -1.0 as a 
byte[] value"),
 
                 // from zero
-                TestSpec.whenAccessed(conf -> conf.getInteger("zero", 0))
-                        .expectException("For input string: \"0.0\"", 
NumberFormatException.class),
+                TestSpec.whenAccessed(conf -> 
conf.get(getIntConfigOption("zero"), 0))
+                        .expectException(
+                                "Could not parse value '0.0' for key 'zero'.",
+                                IllegalArgumentException.class),
                 TestSpec.whenAccessed(conf -> conf.getLong("zero", 0))
                         .expectException("For input string: \"0.0\"", 
NumberFormatException.class),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("zero"), 0f))
@@ -200,10 +205,10 @@ class ConfigurationConversionsTest {
                                 "Configuration cannot evaluate value 0.0 as a 
byte[] value"),
 
                 // from too long double
-                TestSpec.whenAccessed(conf -> 
conf.getInteger("too_long_double", 0))
+                TestSpec.whenAccessed(conf -> 
conf.get(getIntConfigOption("too_long_double"), 0))
                         .expectException(
-                                "For input string: \"1.7976931348623157E308\"",
-                                NumberFormatException.class),
+                                "Could not parse value 
'1.7976931348623157E308' for key 'too_long_double'.",
+                                IllegalArgumentException.class),
                 TestSpec.whenAccessed(conf -> conf.getLong("too_long_double", 
0))
                         .expectException(
                                 "For input string: \"1.7976931348623157E308\"",
@@ -225,7 +230,7 @@ class ConfigurationConversionsTest {
                                 "Configuration cannot evaluate value 
1.7976931348623157E308 as a byte[] value"),
 
                 // from string
-                TestSpec.whenAccessed(conf -> conf.getInteger("string", 
0)).expect(42),
+                TestSpec.whenAccessed(conf -> 
conf.get(getIntConfigOption("string"), 0)).expect(42),
                 TestSpec.whenAccessed(conf -> conf.getLong("string", 
0)).expect(42L),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("string"), 0f))
                         .expect(42f),
@@ -239,9 +244,11 @@ class ConfigurationConversionsTest {
                                 "Configuration cannot evaluate value 42 as a 
byte[] value"),
 
                 // from non convertible string
-                TestSpec.whenAccessed(conf -> 
conf.getInteger("non_convertible_string", 0))
+                TestSpec.whenAccessed(
+                                conf -> 
conf.get(getIntConfigOption("non_convertible_string"), 0))
                         .expectException(
-                                "For input string: \"bcdefg&&\"", 
NumberFormatException.class),
+                                "Could not parse value 'bcdefg&&' for key 
'non_convertible_string'.",
+                                IllegalArgumentException.class),
                 TestSpec.whenAccessed(conf -> 
conf.getLong("non_convertible_string", 0))
                         .expectException(
                                 "For input string: \"bcdefg&&\"", 
NumberFormatException.class),
@@ -274,8 +281,8 @@ class ConfigurationConversionsTest {
                                 "Configuration cannot evaluate value bcdefg&& 
as a byte[] value"),
 
                 // from boolean
-                TestSpec.whenAccessed(conf -> conf.getInteger("boolean", 0))
-                        .expectException("For input string: \"true\""),
+                TestSpec.whenAccessed(conf -> 
conf.get(getIntConfigOption("boolean"), 0))
+                        .expectException("Could not parse value 'true' for key 
'boolean'."),
                 TestSpec.whenAccessed(conf -> conf.getLong("boolean", 0))
                         .expectException("For input string: \"true\""),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("boolean"), 0f))
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index 85f9b6006ed..6363d748d5e 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -40,6 +40,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption;
 import static 
org.apache.flink.configuration.ConfigurationUtils.getDoubleConfigOption;
 import static 
org.apache.flink.configuration.ConfigurationUtils.getFloatConfigOption;
+import static 
org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -86,7 +87,7 @@ class ConfigurationTest {
     void testConfigurationSerializationAndGetters() throws 
ClassNotFoundException, IOException {
         final Configuration orig = new Configuration(standardYaml);
         orig.setString("mykey", "myvalue");
-        orig.setInteger("mynumber", 100);
+        orig.set(getIntConfigOption("mynumber"), 100);
         orig.setLong("longvalue", 478236947162389746L);
         orig.set(getFloatConfigOption("PI"), 3.1415926f);
         orig.set(getDoubleConfigOption("E"), Math.E);
@@ -95,7 +96,7 @@ class ConfigurationTest {
 
         final Configuration copy = InstantiationUtil.createCopyWritable(orig);
         assertThat("myvalue").isEqualTo(copy.getString("mykey", "null"));
-        assertThat(copy.getInteger("mynumber", 0)).isEqualTo(100);
+        assertThat(copy.get(getIntConfigOption("mynumber"), 0)).isEqualTo(100);
         assertThat(copy.getLong("longvalue", 
0L)).isEqualTo(478236947162389746L);
         assertThat(copy.get(getFloatConfigOption("PI"), 3.1415926f))
                 .isCloseTo(3.1415926f, Offset.offset(0.0f));
@@ -124,7 +125,7 @@ class ConfigurationTest {
     @TestTemplate
     void testOptionWithDefault() {
         Configuration cfg = new Configuration(standardYaml);
-        cfg.setInteger("int-key", 11);
+        cfg.set(getIntConfigOption("int-key"), 11);
         cfg.setString("string-key", "abc");
 
         ConfigOption<String> presentStringOption =
@@ -135,7 +136,7 @@ class ConfigurationTest {
         assertThat(cfg.getString(presentStringOption)).isEqualTo("abc");
         assertThat(cfg.getValue(presentStringOption)).isEqualTo("abc");
 
-        assertThat(cfg.getInteger(presentIntOption)).isEqualTo(11);
+        assertThat(cfg.get(presentIntOption)).isEqualTo(11);
         assertThat(cfg.getValue(presentIntOption)).isEqualTo("11");
 
         // test getting default when no value is present
@@ -152,14 +153,14 @@ class ConfigurationTest {
         assertThat(cfg.getString(stringOption, 
"override")).isEqualTo("override");
 
         // getting a primitive with a default value should work
-        assertThat(cfg.getInteger(intOption)).isEqualTo(87);
+        assertThat(cfg.get(intOption)).isEqualTo(87);
         assertThat(cfg.getValue(intOption)).isEqualTo("87");
     }
 
     @TestTemplate
     void testOptionWithNoDefault() {
         Configuration cfg = new Configuration(standardYaml);
-        cfg.setInteger("int-key", 11);
+        cfg.get(getIntConfigOption("int-key"), 11);
         cfg.setString("string-key", "abc");
 
         ConfigOption<String> presentStringOption =
@@ -183,9 +184,9 @@ class ConfigurationTest {
     @TestTemplate
     void testDeprecatedKeys() {
         Configuration cfg = new Configuration(standardYaml);
-        cfg.setInteger("the-key", 11);
-        cfg.setInteger("old-key", 12);
-        cfg.setInteger("older-key", 13);
+        cfg.set(getIntConfigOption("the-key"), 11);
+        cfg.set(getIntConfigOption("old-key"), 12);
+        cfg.set(getIntConfigOption("older-key"), 13);
 
         ConfigOption<Integer> matchesFirst =
                 ConfigOptions.key("the-key")
@@ -211,18 +212,18 @@ class ConfigurationTest {
                         .defaultValue(-1)
                         .withDeprecatedKeys("not-there", "also-not-there");
 
-        assertThat(cfg.getInteger(matchesFirst)).isEqualTo(11);
-        assertThat(cfg.getInteger(matchesSecond)).isEqualTo(12);
-        assertThat(cfg.getInteger(matchesThird)).isEqualTo(13);
-        assertThat(cfg.getInteger(notContained)).isEqualTo(-1);
+        assertThat(cfg.get(matchesFirst)).isEqualTo(11);
+        assertThat(cfg.get(matchesSecond)).isEqualTo(12);
+        assertThat(cfg.get(matchesThird)).isEqualTo(13);
+        assertThat(cfg.get(notContained)).isEqualTo(-1);
     }
 
     @TestTemplate
     void testFallbackKeys() {
         Configuration cfg = new Configuration(standardYaml);
-        cfg.setInteger("the-key", 11);
-        cfg.setInteger("old-key", 12);
-        cfg.setInteger("older-key", 13);
+        cfg.set(getIntConfigOption("the-key"), 11);
+        cfg.set(getIntConfigOption("old-key"), 12);
+        cfg.set(getIntConfigOption("older-key"), 13);
 
         ConfigOption<Integer> matchesFirst =
                 ConfigOptions.key("the-key")
@@ -248,10 +249,10 @@ class ConfigurationTest {
                         .defaultValue(-1)
                         .withFallbackKeys("not-there", "also-not-there");
 
-        assertThat(cfg.getInteger(matchesFirst)).isEqualTo(11);
-        assertThat(cfg.getInteger(matchesSecond)).isEqualTo(12);
-        assertThat(cfg.getInteger(matchesThird)).isEqualTo(13);
-        assertThat(cfg.getInteger(notContained)).isEqualTo(-1);
+        assertThat(cfg.get(matchesFirst)).isEqualTo(11);
+        assertThat(cfg.get(matchesSecond)).isEqualTo(12);
+        assertThat(cfg.get(matchesThird)).isEqualTo(13);
+        assertThat(cfg.get(notContained)).isEqualTo(-1);
     }
 
     @TestTemplate
@@ -270,12 +271,12 @@ class ConfigurationTest {
                         .withDeprecatedKeys(deprecated.key());
 
         final Configuration fallbackCfg = new Configuration(standardYaml);
-        fallbackCfg.setInteger(fallback, 1);
-        assertThat(fallbackCfg.getInteger(mainOption)).isOne();
+        fallbackCfg.set(fallback, 1);
+        assertThat(fallbackCfg.get(mainOption)).isOne();
 
         final Configuration deprecatedCfg = new Configuration(standardYaml);
-        deprecatedCfg.setInteger(deprecated, 2);
-        assertThat(deprecatedCfg.getInteger(mainOption)).isEqualTo(2);
+        deprecatedCfg.set(deprecated, 2);
+        assertThat(deprecatedCfg.get(mainOption)).isEqualTo(2);
 
         // reverse declaration of fallback and deprecated keys, fallback keys 
should always be used
         // first
@@ -287,17 +288,17 @@ class ConfigurationTest {
                         .withFallbackKeys(fallback.key());
 
         final Configuration deprecatedAndFallBackConfig = new 
Configuration(standardYaml);
-        deprecatedAndFallBackConfig.setInteger(fallback, 1);
-        deprecatedAndFallBackConfig.setInteger(deprecated, 2);
-        assertThat(deprecatedAndFallBackConfig.getInteger(mainOption)).isOne();
-        
assertThat(deprecatedAndFallBackConfig.getInteger(reversedMainOption)).isOne();
+        deprecatedAndFallBackConfig.set(fallback, 1);
+        deprecatedAndFallBackConfig.set(deprecated, 2);
+        assertThat(deprecatedAndFallBackConfig.get(mainOption)).isOne();
+        
assertThat(deprecatedAndFallBackConfig.get(reversedMainOption)).isOne();
     }
 
     @TestTemplate
     void testRemove() {
         Configuration cfg = new Configuration(standardYaml);
-        cfg.setInteger("a", 1);
-        cfg.setInteger("b", 2);
+        cfg.set(getIntConfigOption("a"), 1);
+        cfg.set(getIntConfigOption("b"), 2);
 
         ConfigOption<Integer> validOption = 
ConfigOptions.key("a").intType().defaultValue(-1);
 
@@ -324,11 +325,11 @@ class ConfigurationTest {
         Configuration cfg = new Configuration(standardYaml);
         String key1 = "a.b";
         String key2 = "c.d";
-        cfg.setInteger(key1, 42);
-        cfg.setInteger(key2, 44);
-        cfg.setInteger(key2 + ".f1", 44);
-        cfg.setInteger(key2 + ".f2", 44);
-        cfg.setInteger("e.f", 1337);
+        cfg.set(getIntConfigOption(key1), 42);
+        cfg.set(getIntConfigOption(key2), 44);
+        cfg.set(getIntConfigOption(key2 + ".f1"), 44);
+        cfg.set(getIntConfigOption(key2 + ".f2"), 44);
+        cfg.set(getIntConfigOption("e.f"), 1337);
 
         assertThat(cfg.removeKey("not-existing-key")).isFalse();
         assertThat(cfg.removeKey(key1)).isTrue();
@@ -451,7 +452,7 @@ class ConfigurationTest {
     void testMapWithPrefix() {
         final Configuration cfg = new Configuration(standardYaml);
         cfg.setString(MAP_PROPERTY_1, "value1");
-        cfg.setInteger(MAP_PROPERTY_2, 12);
+        cfg.set(getIntConfigOption(MAP_PROPERTY_2), 12);
 
         assertThat(cfg.get(MAP_OPTION)).isEqualTo(PROPERTIES_MAP);
         assertThat(cfg.contains(MAP_OPTION)).isTrue();
@@ -471,7 +472,7 @@ class ConfigurationTest {
         final Configuration cfg = new Configuration(standardYaml);
         cfg.set(MAP_OPTION, PROPERTIES_MAP);
         cfg.setString(MAP_PROPERTY_1, "value1");
-        cfg.setInteger(MAP_PROPERTY_2, 99999);
+        cfg.get(getIntConfigOption(MAP_PROPERTY_2), 99999);
 
         assertThat(cfg.get(MAP_OPTION)).isEqualTo(PROPERTIES_MAP);
         assertThat(cfg.contains(MAP_OPTION)).isTrue();
@@ -482,7 +483,7 @@ class ConfigurationTest {
     void testMapThatOverwritesPrefix() {
         final Configuration cfg = new Configuration(standardYaml);
         cfg.setString(MAP_PROPERTY_1, "value1");
-        cfg.setInteger(MAP_PROPERTY_2, 99999);
+        cfg.get(getIntConfigOption(MAP_PROPERTY_2), 99999);
         cfg.set(MAP_OPTION, PROPERTIES_MAP);
 
         assertThat(cfg.get(MAP_OPTION)).isEqualTo(PROPERTIES_MAP);
@@ -494,7 +495,7 @@ class ConfigurationTest {
     void testMapRemovePrefix() {
         final Configuration cfg = new Configuration(standardYaml);
         cfg.setString(MAP_PROPERTY_1, "value1");
-        cfg.setInteger(MAP_PROPERTY_2, 99999);
+        cfg.get(getIntConfigOption(MAP_PROPERTY_2), 99999);
         cfg.removeConfig(MAP_OPTION);
 
         assertThat(cfg.contains(MAP_OPTION)).isFalse();
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
index 32a08ee3620..b7073cd381e 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
@@ -155,13 +155,13 @@ class DelegatingConfigurationTest {
                 ConfigOptions.key("integer.key").intType().noDefaultValue();
 
         // integerOption doesn't exist in delegatingConf, and it should be 
overrideDefault.
-        original.setInteger(integerOption, 1);
-        assertThat(delegatingConf.getInteger(integerOption, 2)).isEqualTo(2);
+        original.set(integerOption, 1);
+        assertThat(delegatingConf.get(integerOption, 2)).isEqualTo(2);
         assertThat(delegatingConf.get(integerOption, 2)).isEqualTo(2);
 
         // integerOption exists in delegatingConf, and it should be value that 
set before.
         delegatingConf.setInteger(integerOption, 3);
-        assertThat(delegatingConf.getInteger(integerOption, 2)).isEqualTo(3);
+        assertThat(delegatingConf.get(integerOption, 2)).isEqualTo(3);
         assertThat(delegatingConf.get(integerOption, 2)).isEqualTo(3);
 
         // Test for float
@@ -217,13 +217,13 @@ class DelegatingConfigurationTest {
         assertThat(delegatingConf.get(integerOption)).isZero();
         delegatingConf.removeConfig(integerOption);
         assertThat(delegatingConf.getOptional(integerOption)).isEmpty();
-        assertThat(delegatingConf.getInteger(integerOption.key(), 0)).isZero();
+        assertThat(delegatingConf.get(integerOption, 0)).isZero();
 
         // Test for removeKey
         delegatingConf.set(integerOption, 0);
-        assertThat(delegatingConf.getInteger(integerOption, -1)).isZero();
+        assertThat(delegatingConf.get(integerOption, -1)).isZero();
         delegatingConf.removeKey(integerOption.key());
         assertThat(delegatingConf.getOptional(integerOption)).isEmpty();
-        assertThat(delegatingConf.getInteger(integerOption.key(), 0)).isZero();
+        assertThat(delegatingConf.get(integerOption, 0)).isZero();
     }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
index b9ffc12e293..9e3ac40d101 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.File;
 import java.net.URI;
 
+import static 
org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests that validate that the configuration for limited FS connections are 
properly picked up. */
@@ -54,11 +55,11 @@ class LimitedConnectionsConfigurationTest {
         // configure some limits, which should cause "fsScheme" to be limited
 
         final Configuration config = new Configuration();
-        config.setInteger("fs." + fsScheme + ".limit.total", 42);
-        config.setInteger("fs." + fsScheme + ".limit.input", 11);
-        config.setInteger("fs." + fsScheme + ".limit.output", 40);
-        config.setInteger("fs." + fsScheme + ".limit.timeout", 12345);
-        config.setInteger("fs." + fsScheme + ".limit.stream-timeout", 98765);
+        config.set(getIntConfigOption("fs." + fsScheme + ".limit.total"), 42);
+        config.set(getIntConfigOption("fs." + fsScheme + ".limit.input"), 11);
+        config.set(getIntConfigOption("fs." + fsScheme + ".limit.output"), 40);
+        config.set(getIntConfigOption("fs." + fsScheme + ".limit.timeout"), 
12345);
+        config.set(getIntConfigOption("fs." + fsScheme + 
".limit.stream-timeout"), 98765);
 
         try {
             FileSystem.initialize(config);
diff --git 
a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactoryTest.java
 
b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactoryTest.java
index 4f5cc22e26e..6e0f4f3786c 100644
--- 
a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactoryTest.java
+++ 
b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactoryTest.java
@@ -29,6 +29,7 @@ import java.lang.annotation.RetentionPolicy;
 import java.net.URI;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the AzureFSFactory. */
@@ -63,7 +64,7 @@ class AzureBlobStorageFSFactoryTest {
         final URI uri = URI.create(uriString);
 
         Configuration config = new Configuration();
-        config.setInteger("fs.azure.io.retry.max.retries", 0);
+        config.set(getIntConfigOption("fs.azure.io.retry.max.retries"), 0);
         factory.configure(config);
 
         assertThatThrownBy(() -> 
factory.create(uri)).isInstanceOf(AzureException.class);
diff --git 
a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java
 
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java
index af39bfd4295..554ad7a5b73 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test;
 
 import java.net.URI;
 
+import static 
org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
@@ -47,11 +48,11 @@ class LimitedConnectionsConfigurationTest {
         // configure some limits, which should cause "fsScheme" to be limited
 
         final Configuration config = new Configuration();
-        config.setInteger("fs.hdfs.limit.total", 40);
-        config.setInteger("fs.hdfs.limit.input", 39);
-        config.setInteger("fs.hdfs.limit.output", 38);
-        config.setInteger("fs.hdfs.limit.timeout", 23456);
-        config.setInteger("fs.hdfs.limit.stream-timeout", 34567);
+        config.set(getIntConfigOption("fs.hdfs.limit.total"), 40);
+        config.set(getIntConfigOption("fs.hdfs.limit.input"), 39);
+        config.set(getIntConfigOption("fs.hdfs.limit.output"), 38);
+        config.set(getIntConfigOption("fs.hdfs.limit.timeout"), 23456);
+        config.set(getIntConfigOption("fs.hdfs.limit.stream-timeout"), 34567);
 
         try {
             FileSystem.initialize(config);
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
index 06887d25da4..5b6b60f35ae 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
 
 import java.net.URI;
 
+import static 
org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests that the file system factory picks up the entropy configuration 
properly. */
@@ -33,7 +34,7 @@ class S3EntropyFsFactoryTest {
     void testEntropyInjectionConfig() throws Exception {
         final Configuration conf = new Configuration();
         conf.setString("s3.entropy.key", "__entropy__");
-        conf.setInteger("s3.entropy.length", 7);
+        conf.set(getIntConfigOption("s3.entropy.length"), 7);
 
         TestS3FileSystemFactory factory = new TestS3FileSystemFactory();
         factory.configure(conf);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
index a40b8aa7309..3dce6c3d2f2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
@@ -52,6 +52,7 @@ import java.util.List;
 import static 
org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption;
 import static 
org.apache.flink.configuration.ConfigurationUtils.getDoubleConfigOption;
 import static 
org.apache.flink.configuration.ConfigurationUtils.getFloatConfigOption;
+import static 
org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption;
 
 /** Configuration class which stores all relevant parameters required to set 
up the Pact tasks. */
 public class TaskConfig implements Serializable {
@@ -348,11 +349,11 @@ public class TaskConfig implements Serializable {
     }
 
     public void setDriverStrategy(DriverStrategy strategy) {
-        this.config.setInteger(DRIVER_STRATEGY, strategy.ordinal());
+        this.config.set(getIntConfigOption(DRIVER_STRATEGY), 
strategy.ordinal());
     }
 
     public DriverStrategy getDriverStrategy() {
-        final int ls = this.config.getInteger(DRIVER_STRATEGY, -1);
+        final int ls = this.config.get(getIntConfigOption(DRIVER_STRATEGY), 
-1);
         if (ls == -1) {
             return DriverStrategy.NONE;
         } else if (ls < 0 || ls >= DriverStrategy.values().length) {
@@ -415,11 +416,13 @@ public class TaskConfig implements Serializable {
     // 
--------------------------------------------------------------------------------------------
 
     public void setInputLocalStrategy(int inputNum, LocalStrategy strategy) {
-        this.config.setInteger(INPUT_LOCAL_STRATEGY_PREFIX + inputNum, 
strategy.ordinal());
+        this.config.set(
+                getIntConfigOption(INPUT_LOCAL_STRATEGY_PREFIX + inputNum), 
strategy.ordinal());
     }
 
     public LocalStrategy getInputLocalStrategy(int inputNum) {
-        final int ls = this.config.getInteger(INPUT_LOCAL_STRATEGY_PREFIX + 
inputNum, -1);
+        final int ls =
+                this.config.get(getIntConfigOption(INPUT_LOCAL_STRATEGY_PREFIX 
+ inputNum), -1);
         if (ls == -1) {
             return LocalStrategy.NONE;
         } else if (ls < 0 || ls >= LocalStrategy.values().length) {
@@ -473,34 +476,38 @@ public class TaskConfig implements Serializable {
     }
 
     public int getNumInputs() {
-        return this.config.getInteger(NUM_INPUTS, 0);
+        return this.config.get(getIntConfigOption(NUM_INPUTS), 0);
     }
 
     public int getNumBroadcastInputs() {
-        return this.config.getInteger(NUM_BROADCAST_INPUTS, 0);
+        return this.config.get(getIntConfigOption(NUM_BROADCAST_INPUTS), 0);
     }
 
     public int getGroupSize(int groupIndex) {
-        return this.config.getInteger(INPUT_GROUP_SIZE_PREFIX + groupIndex, 
-1);
+        return this.config.get(getIntConfigOption(INPUT_GROUP_SIZE_PREFIX + 
groupIndex), -1);
     }
 
     public int getBroadcastGroupSize(int groupIndex) {
-        return this.config.getInteger(BROADCAST_INPUT_GROUP_SIZE_PREFIX + 
groupIndex, -1);
+        return this.config.get(
+                getIntConfigOption(BROADCAST_INPUT_GROUP_SIZE_PREFIX + 
groupIndex), -1);
     }
 
     public void addInputToGroup(int groupIndex) {
         final String grp = INPUT_GROUP_SIZE_PREFIX + groupIndex;
-        this.config.setInteger(grp, this.config.getInteger(grp, 0) + 1);
-        this.config.setInteger(NUM_INPUTS, this.config.getInteger(NUM_INPUTS, 
0) + 1);
+        this.config.set(getIntConfigOption(grp), 
this.config.get(getIntConfigOption(grp), 0) + 1);
+        this.config.set(
+                getIntConfigOption(NUM_INPUTS),
+                this.config.get(getIntConfigOption(NUM_INPUTS), 0) + 1);
     }
 
     public void addBroadcastInputToGroup(int groupIndex) {
         final String grp = BROADCAST_INPUT_GROUP_SIZE_PREFIX + groupIndex;
         if (!this.config.containsKey(grp)) {
-            this.config.setInteger(
-                    NUM_BROADCAST_INPUTS, 
this.config.getInteger(NUM_BROADCAST_INPUTS, 0) + 1);
+            this.config.set(
+                    getIntConfigOption(NUM_BROADCAST_INPUTS),
+                    this.config.get(getIntConfigOption(NUM_BROADCAST_INPUTS), 
0) + 1);
         }
-        this.config.setInteger(grp, this.config.getInteger(grp, 0) + 1);
+        this.config.set(getIntConfigOption(grp), 
this.config.get(getIntConfigOption(grp), 0) + 1);
     }
 
     public void setInputAsynchronouslyMaterialized(int inputNum, boolean temp) 
{
@@ -542,18 +549,19 @@ public class TaskConfig implements Serializable {
     // 
--------------------------------------------------------------------------------------------
 
     public void addOutputShipStrategy(ShipStrategyType strategy) {
-        final int outputCnt = this.config.getInteger(OUTPUTS_NUM, 0);
-        this.config.setInteger(OUTPUT_SHIP_STRATEGY_PREFIX + outputCnt, 
strategy.ordinal());
-        this.config.setInteger(OUTPUTS_NUM, outputCnt + 1);
+        final int outputCnt = this.config.get(getIntConfigOption(OUTPUTS_NUM), 
0);
+        this.config.set(
+                getIntConfigOption(OUTPUT_SHIP_STRATEGY_PREFIX + outputCnt), 
strategy.ordinal());
+        this.config.set(getIntConfigOption(OUTPUTS_NUM), outputCnt + 1);
     }
 
     public int getNumOutputs() {
-        return this.config.getInteger(OUTPUTS_NUM, 0);
+        return this.config.get(getIntConfigOption(OUTPUTS_NUM), 0);
     }
 
     public ShipStrategyType getOutputShipStrategy(int outputNum) {
         // check how many outputs are encoded in the config
-        final int outputCnt = this.config.getInteger(OUTPUTS_NUM, -1);
+        final int outputCnt = this.config.get(getIntConfigOption(OUTPUTS_NUM), 
-1);
         if (outputCnt < 1) {
             throw new CorruptConfigurationException(
                     "No output ship strategies are specified in the 
configuration.");
@@ -564,7 +572,8 @@ public class TaskConfig implements Serializable {
             throw new IllegalArgumentException("Invalid index for output 
shipping strategy.");
         }
 
-        final int strategy = 
this.config.getInteger(OUTPUT_SHIP_STRATEGY_PREFIX + outputNum, -1);
+        final int strategy =
+                this.config.get(getIntConfigOption(OUTPUT_SHIP_STRATEGY_PREFIX 
+ outputNum), -1);
         if (strategy == -1) {
             throw new CorruptConfigurationException(
                     "No output shipping strategy in configuration for output " 
+ outputNum);
@@ -705,22 +714,22 @@ public class TaskConfig implements Serializable {
         if (filehandles < 2) {
             throw new IllegalArgumentException();
         }
-        this.config.setInteger(FILEHANDLES_DRIVER, filehandles);
+        this.config.set(getIntConfigOption(FILEHANDLES_DRIVER), filehandles);
     }
 
     public int getFilehandlesDriver() {
-        return this.config.getInteger(FILEHANDLES_DRIVER, -1);
+        return this.config.get(getIntConfigOption(FILEHANDLES_DRIVER), -1);
     }
 
     public void setFilehandlesInput(int inputNum, int filehandles) {
         if (filehandles < 2) {
             throw new IllegalArgumentException();
         }
-        this.config.setInteger(FILEHANDLES_INPUT_PREFIX + inputNum, 
filehandles);
+        this.config.set(getIntConfigOption(FILEHANDLES_INPUT_PREFIX + 
inputNum), filehandles);
     }
 
     public int getFilehandlesInput(int inputNum) {
-        return this.config.getInteger(FILEHANDLES_INPUT_PREFIX + inputNum, -1);
+        return this.config.get(getIntConfigOption(FILEHANDLES_INPUT_PREFIX + 
inputNum), -1);
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -763,20 +772,20 @@ public class TaskConfig implements Serializable {
     // 
--------------------------------------------------------------------------------------------
 
     public int getNumberOfChainedStubs() {
-        return this.config.getInteger(CHAINING_NUM_STUBS, 0);
+        return this.config.get(getIntConfigOption(CHAINING_NUM_STUBS), 0);
     }
 
     public void addChainedTask(
             @SuppressWarnings("rawtypes") Class<? extends ChainedDriver> 
chainedTaskClass,
             TaskConfig conf,
             String taskName) {
-        int numChainedYet = this.config.getInteger(CHAINING_NUM_STUBS, 0);
+        int numChainedYet = 
this.config.get(getIntConfigOption(CHAINING_NUM_STUBS), 0);
 
         this.config.setString(CHAINING_TASK_PREFIX + numChainedYet, 
chainedTaskClass.getName());
         this.config.addAll(conf.config, CHAINING_TASKCONFIG_PREFIX + 
numChainedYet + SEPARATOR);
         this.config.setString(CHAINING_TASKNAME_PREFIX + numChainedYet, 
taskName);
 
-        this.config.setInteger(CHAINING_NUM_STUBS, ++numChainedYet);
+        this.config.set(getIntConfigOption(CHAINING_NUM_STUBS), 
++numChainedYet);
     }
 
     public TaskConfig getChainedStubConfig(int chainPos) {
@@ -813,11 +822,11 @@ public class TaskConfig implements Serializable {
         if (numberOfIterations <= 0) {
             throw new IllegalArgumentException();
         }
-        this.config.setInteger(NUMBER_OF_ITERATIONS, numberOfIterations);
+        this.config.set(getIntConfigOption(NUMBER_OF_ITERATIONS), 
numberOfIterations);
     }
 
     public int getNumberOfIterations() {
-        int numberOfIterations = this.config.getInteger(NUMBER_OF_ITERATIONS, 
0);
+        int numberOfIterations = 
this.config.get(getIntConfigOption(NUMBER_OF_ITERATIONS), 0);
         if (numberOfIterations <= 0) {
             throw new IllegalArgumentException();
         }
@@ -828,11 +837,12 @@ public class TaskConfig implements Serializable {
         if (inputIndex < 0) {
             throw new IllegalArgumentException();
         }
-        this.config.setInteger(ITERATION_HEAD_INDEX_OF_PARTIAL_SOLUTION, 
inputIndex);
+        
this.config.set(getIntConfigOption(ITERATION_HEAD_INDEX_OF_PARTIAL_SOLUTION), 
inputIndex);
     }
 
     public int getIterationHeadPartialSolutionOrWorksetInputIndex() {
-        int index = 
this.config.getInteger(ITERATION_HEAD_INDEX_OF_PARTIAL_SOLUTION, -1);
+        int index =
+                
this.config.get(getIntConfigOption(ITERATION_HEAD_INDEX_OF_PARTIAL_SOLUTION), 
-1);
         if (index < 0) {
             throw new IllegalArgumentException();
         }
@@ -843,11 +853,11 @@ public class TaskConfig implements Serializable {
         if (inputIndex < 0) {
             throw new IllegalArgumentException();
         }
-        this.config.setInteger(ITERATION_HEAD_INDEX_OF_SOLUTIONSET, 
inputIndex);
+        
this.config.set(getIntConfigOption(ITERATION_HEAD_INDEX_OF_SOLUTIONSET), 
inputIndex);
     }
 
     public int getIterationHeadSolutionSetInputIndex() {
-        int index = 
this.config.getInteger(ITERATION_HEAD_INDEX_OF_SOLUTIONSET, -1);
+        int index = 
this.config.get(getIntConfigOption(ITERATION_HEAD_INDEX_OF_SOLUTIONSET), -1);
         if (index < 0) {
             throw new IllegalArgumentException();
         }
@@ -898,14 +908,15 @@ public class TaskConfig implements Serializable {
         if (numEvents <= 0) {
             throw new IllegalArgumentException();
         }
-        this.config.setInteger(NUMBER_OF_EOS_EVENTS_PREFIX + inputGateIndex, 
numEvents);
+        this.config.set(
+                getIntConfigOption(NUMBER_OF_EOS_EVENTS_PREFIX + 
inputGateIndex), numEvents);
     }
 
     public int getNumberOfEventsUntilInterruptInIterativeGate(int 
inputGateIndex) {
         if (inputGateIndex < 0) {
             throw new IllegalArgumentException();
         }
-        return this.config.getInteger(NUMBER_OF_EOS_EVENTS_PREFIX + 
inputGateIndex, 0);
+        return this.config.get(getIntConfigOption(NUMBER_OF_EOS_EVENTS_PREFIX 
+ inputGateIndex), 0);
     }
 
     public void setBroadcastGateIterativeWithNumberOfEventsUntilInterrupt(
@@ -916,25 +927,27 @@ public class TaskConfig implements Serializable {
         if (numEvents <= 0) {
             throw new IllegalArgumentException();
         }
-        this.config.setInteger(NUMBER_OF_EOS_EVENTS_BROADCAST_PREFIX + 
bcGateIndex, numEvents);
+        this.config.set(
+                getIntConfigOption(NUMBER_OF_EOS_EVENTS_BROADCAST_PREFIX + 
bcGateIndex), numEvents);
     }
 
     public int getNumberOfEventsUntilInterruptInIterativeBroadcastGate(int 
bcGateIndex) {
         if (bcGateIndex < 0) {
             throw new IllegalArgumentException();
         }
-        return this.config.getInteger(NUMBER_OF_EOS_EVENTS_BROADCAST_PREFIX + 
bcGateIndex, 0);
+        return this.config.get(
+                getIntConfigOption(NUMBER_OF_EOS_EVENTS_BROADCAST_PREFIX + 
bcGateIndex), 0);
     }
 
     public void setIterationId(int id) {
         if (id < 0) {
             throw new IllegalArgumentException();
         }
-        this.config.setInteger(ITERATION_HEAD_ID, id);
+        this.config.set(getIntConfigOption(ITERATION_HEAD_ID), id);
     }
 
     public int getIterationId() {
-        int id = this.config.getInteger(ITERATION_HEAD_ID, -1);
+        int id = this.config.get(getIntConfigOption(ITERATION_HEAD_ID), -1);
         if (id == -1) {
             throw new CorruptConfigurationException("Iteration head ID is 
missing.");
         }
@@ -953,11 +966,11 @@ public class TaskConfig implements Serializable {
         if (outputIndex < 0) {
             throw new IllegalArgumentException();
         }
-        this.config.setInteger(ITERATION_HEAD_SYNC_OUT_INDEX, outputIndex);
+        this.config.set(getIntConfigOption(ITERATION_HEAD_SYNC_OUT_INDEX), 
outputIndex);
     }
 
     public int getIterationHeadIndexOfSyncOutput() {
-        int outputIndex = 
this.config.getInteger(ITERATION_HEAD_SYNC_OUT_INDEX, -1);
+        int outputIndex = 
this.config.get(getIntConfigOption(ITERATION_HEAD_SYNC_OUT_INDEX), -1);
         if (outputIndex < 0) {
             throw new IllegalArgumentException();
         }
@@ -1002,7 +1015,7 @@ public class TaskConfig implements Serializable {
     }
 
     public void addIterationAggregator(String name, Aggregator<?> aggregator) {
-        int num = this.config.getInteger(ITERATION_NUM_AGGREGATORS, 0);
+        int num = 
this.config.get(getIntConfigOption(ITERATION_NUM_AGGREGATORS), 0);
         this.config.setString(ITERATION_AGGREGATOR_NAME_PREFIX + num, name);
         try {
             InstantiationUtil.writeObjectToConfig(
@@ -1011,11 +1024,11 @@ public class TaskConfig implements Serializable {
             throw new RuntimeException(
                     "Error while writing the aggregator object to the task 
configuration.");
         }
-        this.config.setInteger(ITERATION_NUM_AGGREGATORS, num + 1);
+        this.config.set(getIntConfigOption(ITERATION_NUM_AGGREGATORS), num + 
1);
     }
 
     public void addIterationAggregators(Collection<AggregatorWithName<?>> 
aggregators) {
-        int num = this.config.getInteger(ITERATION_NUM_AGGREGATORS, 0);
+        int num = 
this.config.get(getIntConfigOption(ITERATION_NUM_AGGREGATORS), 0);
         for (AggregatorWithName<?> awn : aggregators) {
             this.config.setString(ITERATION_AGGREGATOR_NAME_PREFIX + num, 
awn.getName());
             try {
@@ -1027,12 +1040,12 @@ public class TaskConfig implements Serializable {
             }
             num++;
         }
-        this.config.setInteger(ITERATION_NUM_AGGREGATORS, num);
+        this.config.set(getIntConfigOption(ITERATION_NUM_AGGREGATORS), num);
     }
 
     @SuppressWarnings("unchecked")
     public Collection<AggregatorWithName<?>> 
getIterationAggregators(ClassLoader cl) {
-        final int numAggs = this.config.getInteger(ITERATION_NUM_AGGREGATORS, 
0);
+        final int numAggs = 
this.config.get(getIntConfigOption(ITERATION_NUM_AGGREGATORS), 0);
         if (numAggs == 0) {
             return Collections.emptyList();
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 9ac18bfd908..c0e4214d41e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -84,14 +84,18 @@ public class StreamConfig implements Serializable {
      */
     public static final String SERIALIZED_UDF_CLASS = "serializedUdfClass";
 
-    private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
-    private static final String NUMBER_OF_NETWORK_INPUTS = 
"numberOfNetworkInputs";
+    private static final ConfigOption<Integer> NUMBER_OF_OUTPUTS =
+            ConfigOptions.key("numberOfOutputs").intType().defaultValue(0);
+    private static final ConfigOption<Integer> NUMBER_OF_NETWORK_INPUTS =
+            
ConfigOptions.key("numberOfNetworkInputs").intType().defaultValue(0);
     private static final String CHAINED_OUTPUTS = "chainedOutputs";
     private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_";
     private static final ConfigOption<Boolean> IS_CHAINED_VERTEX =
             
ConfigOptions.key("isChainedSubtask").booleanType().defaultValue(false);
-    private static final String CHAIN_INDEX = "chainIndex";
-    private static final String VERTEX_NAME = "vertexID";
+    private static final ConfigOption<Integer> CHAIN_INDEX =
+            ConfigOptions.key("chainIndex").intType().defaultValue(0);
+    private static final ConfigOption<Integer> VERTEX_NAME =
+            ConfigOptions.key("vertexID").intType().defaultValue(-1);
     private static final String ITERATION_ID = "iterationId";
     private static final String INPUTS = "inputs";
     private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out";
@@ -111,7 +115,8 @@ public class StreamConfig implements Serializable {
     private static final ConfigOption<Boolean> CHECKPOINTING_ENABLED =
             
ConfigOptions.key("checkpointing").booleanType().defaultValue(false);
 
-    private static final String CHECKPOINT_MODE = "checkpointMode";
+    private static final ConfigOption<Integer> CHECKPOINT_MODE =
+            ConfigOptions.key("checkpointMode").intType().defaultValue(-1);
 
     private static final String SAVEPOINT_DIR = "savepointdir";
     private static final String CHECKPOINT_STORAGE = "checkpointstorage";
@@ -122,7 +127,8 @@ public class StreamConfig implements Serializable {
 
     private static final String STATE_KEY_SERIALIZER = "statekeyser";
 
-    private static final String TIME_CHARACTERISTIC = "timechar";
+    private static final ConfigOption<Integer> TIME_CHARACTERISTIC =
+            ConfigOptions.key("timechar").intType().defaultValue(-1);
 
     private static final String MANAGED_MEMORY_FRACTION_PREFIX = 
"managedMemFraction.";
     private static final ConfigOption<Boolean> 
STATE_BACKEND_USE_MANAGED_MEMORY =
@@ -234,11 +240,11 @@ public class StreamConfig implements Serializable {
     // ------------------------------------------------------------------------
 
     public void setVertexID(Integer vertexID) {
-        config.setInteger(VERTEX_NAME, vertexID);
+        config.set(VERTEX_NAME, vertexID);
     }
 
     public Integer getVertexID() {
-        return config.getInteger(VERTEX_NAME, -1);
+        return config.get(VERTEX_NAME);
     }
 
     /** Fraction of managed memory reserved for the given use case that this 
operator should use. */
@@ -294,11 +300,11 @@ public class StreamConfig implements Serializable {
     }
 
     public void setTimeCharacteristic(TimeCharacteristic characteristic) {
-        config.setInteger(TIME_CHARACTERISTIC, characteristic.ordinal());
+        config.set(TIME_CHARACTERISTIC, characteristic.ordinal());
     }
 
     public TimeCharacteristic getTimeCharacteristic() {
-        int ordinal = config.getInteger(TIME_CHARACTERISTIC, -1);
+        int ordinal = config.get(TIME_CHARACTERISTIC, -1);
         if (ordinal >= 0) {
             return TimeCharacteristic.values()[ordinal];
         } else {
@@ -449,19 +455,19 @@ public class StreamConfig implements Serializable {
     }
 
     public void setNumberOfNetworkInputs(int numberOfInputs) {
-        config.setInteger(NUMBER_OF_NETWORK_INPUTS, numberOfInputs);
+        config.set(NUMBER_OF_NETWORK_INPUTS, numberOfInputs);
     }
 
     public int getNumberOfNetworkInputs() {
-        return config.getInteger(NUMBER_OF_NETWORK_INPUTS, 0);
+        return config.get(NUMBER_OF_NETWORK_INPUTS);
     }
 
     public void setNumberOfOutputs(int numberOfOutputs) {
-        config.setInteger(NUMBER_OF_OUTPUTS, numberOfOutputs);
+        config.set(NUMBER_OF_OUTPUTS, numberOfOutputs);
     }
 
     public int getNumberOfOutputs() {
-        return config.getInteger(NUMBER_OF_OUTPUTS, 0);
+        return config.get(NUMBER_OF_OUTPUTS);
     }
 
     /** Sets the operator level non-chained outputs. */
@@ -518,11 +524,11 @@ public class StreamConfig implements Serializable {
     }
 
     public void setCheckpointMode(CheckpointingMode mode) {
-        config.setInteger(CHECKPOINT_MODE, mode.ordinal());
+        config.set(CHECKPOINT_MODE, mode.ordinal());
     }
 
     public CheckpointingMode getCheckpointMode() {
-        int ordinal = config.getInteger(CHECKPOINT_MODE, -1);
+        int ordinal = config.get(CHECKPOINT_MODE, -1);
         if (ordinal >= 0) {
             return CheckpointingMode.values()[ordinal];
         } else {
@@ -642,11 +648,11 @@ public class StreamConfig implements Serializable {
     }
 
     public void setChainIndex(int index) {
-        this.config.setInteger(CHAIN_INDEX, index);
+        this.config.set(CHAIN_INDEX, index);
     }
 
     public int getChainIndex() {
-        return this.config.getInteger(CHAIN_INDEX, 0);
+        return this.config.get(CHAIN_INDEX);
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index 53822036e33..e19be728a3b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -43,6 +43,8 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.BitSet;
 
+import static 
org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption;
+
 /**
  * Tests that Flink can execute jobs with a higher parallelism than available 
number of slots. This
  * effectively tests that Flink can execute jobs with blocking results in a 
staged fashion.
@@ -108,12 +110,15 @@ public class SlotCountExceedingParallelismTest extends 
TestLogger {
         final JobVertex sender = new JobVertex("Sender");
         sender.setInvokableClass(RoundRobinSubtaskIndexSender.class);
         sender.getConfiguration()
-                .setInteger(RoundRobinSubtaskIndexSender.CONFIG_KEY, 
receiverParallelism);
+                .get(
+                        
getIntConfigOption(RoundRobinSubtaskIndexSender.CONFIG_KEY),
+                        receiverParallelism);
         sender.setParallelism(senderParallelism);
 
         final JobVertex receiver = new JobVertex("Receiver");
         receiver.setInvokableClass(SubtaskIndexReceiver.class);
-        
receiver.getConfiguration().setInteger(SubtaskIndexReceiver.CONFIG_KEY, 
senderParallelism);
+        receiver.getConfiguration()
+                .get(getIntConfigOption(SubtaskIndexReceiver.CONFIG_KEY), 
senderParallelism);
         receiver.setParallelism(receiverParallelism);
 
         receiver.connectNewDataSetAsInput(
@@ -138,7 +143,8 @@ public class SlotCountExceedingParallelismTest extends 
TestLogger {
         public void invoke() throws Exception {
             RecordWriter<IntValue> writer =
                     new 
RecordWriterBuilder<IntValue>().build(getEnvironment().getWriter(0));
-            final int numberOfTimesToSend = 
getTaskConfiguration().getInteger(CONFIG_KEY, 0);
+            final int numberOfTimesToSend =
+                    getTaskConfiguration().get(getIntConfigOption(CONFIG_KEY), 
0);
 
             final IntValue subtaskIndex =
                     new 
IntValue(getEnvironment().getTaskInfo().getIndexOfThisSubtask());
@@ -173,7 +179,7 @@ public class SlotCountExceedingParallelismTest extends 
TestLogger {
 
             try {
                 final int numberOfSubtaskIndexesToReceive =
-                        getTaskConfiguration().getInteger(CONFIG_KEY, 0);
+                        
getTaskConfiguration().get(getIntConfigOption(CONFIG_KEY), 0);
                 final BitSet receivedSubtaskIndexes = new 
BitSet(numberOfSubtaskIndexesToReceive);
 
                 IntValue record;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java
index 1c26a4e32d3..d1b24d862f4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java
@@ -383,7 +383,7 @@ class DefaultVertexParallelismAndInputInfosDeciderTest {
     @Test
     void testComputeSourceParallelismUpperBound() {
         Configuration configuration = new Configuration();
-        configuration.setInteger(
+        configuration.set(
                 
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM,
                 DEFAULT_SOURCE_PARALLELISM);
         VertexParallelismAndInputInfosDecider 
vertexParallelismAndInputInfosDecider =
@@ -408,7 +408,7 @@ class DefaultVertexParallelismAndInputInfosDeciderTest {
     @Test
     void testComputeSourceParallelismUpperBoundNotExceedMaxParallelism() {
         Configuration configuration = new Configuration();
-        configuration.setInteger(
+        configuration.set(
                 
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM,
                 VERTEX_MAX_PARALLELISM * 2);
         VertexParallelismAndInputInfosDecider 
vertexParallelismAndInputInfosDecider =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparatorFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparatorFactory.java
index 97b12bf9509..dfa187736a9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparatorFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparatorFactory.java
@@ -27,6 +27,7 @@ import org.apache.flink.types.Value;
 import java.util.Arrays;
 
 import static 
org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption;
+import static 
org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption;
 
 /**
  * A factory for a {@link 
org.apache.flink.api.common.typeutils.TypeComparator} for {@link Record}.
@@ -101,9 +102,9 @@ public class RecordComparatorFactory implements 
TypeComparatorFactory<Record> {
         }
 
         // write the config
-        config.setInteger(NUM_KEYS, this.positions.length);
+        config.set(getIntConfigOption(NUM_KEYS), this.positions.length);
         for (int i = 0; i < this.positions.length; i++) {
-            config.setInteger(KEY_POS_PREFIX + i, this.positions[i]);
+            config.set(getIntConfigOption(KEY_POS_PREFIX + i), 
this.positions[i]);
             config.setString(KEY_CLASS_PREFIX + i, this.types[i].getName());
             config.set(
                     getBooleanConfigOption(KEY_SORT_DIRECTION_PREFIX + i), 
this.sortDirections[i]);
@@ -115,7 +116,7 @@ public class RecordComparatorFactory implements 
TypeComparatorFactory<Record> {
     public void readParametersFromConfig(Configuration config, ClassLoader cl)
             throws ClassNotFoundException {
         // figure out how many key fields there are
-        final int numKeyFields = config.getInteger(NUM_KEYS, -1);
+        final int numKeyFields = config.get(getIntConfigOption(NUM_KEYS), -1);
         if (numKeyFields < 0) {
             throw new IllegalConfigurationException(
                     "The number of keys for the comparator is invalid: " + 
numKeyFields);
@@ -128,7 +129,7 @@ public class RecordComparatorFactory implements 
TypeComparatorFactory<Record> {
         // read the individual key positions and types
         for (int i = 0; i < numKeyFields; i++) {
             // next key position
-            final int p = config.getInteger(KEY_POS_PREFIX + i, -1);
+            final int p = config.get(getIntConfigOption(KEY_POS_PREFIX + i), 
-1);
             if (p >= 0) {
                 positions[i] = p;
             } else {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index 9e769c99379..1ed6867d100 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 
+import static 
org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption;
 import static org.junit.Assert.fail;
 
 /** Manual test to evaluate impact of checkpointing on latency. */
@@ -53,8 +54,8 @@ public class StreamingScalabilityAndLatency {
             config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
MemorySize.parse("80m"));
             config.set(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 
20000);
 
-            config.setInteger("taskmanager.net.server.numThreads", 1);
-            config.setInteger("taskmanager.net.client.numThreads", 1);
+            
config.set(getIntConfigOption("taskmanager.net.server.numThreads"), 1);
+            
config.set(getIntConfigOption("taskmanager.net.client.numThreads"), 1);
 
             cluster =
                     new MiniClusterWithClientResource(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
index 0c2b2c716f7..ecb3bed4702 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
@@ -38,6 +38,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption;
 import static org.apache.flink.test.util.TestBaseUtils.compareResultAsText;
 import static org.apache.flink.test.util.TestBaseUtils.compareResultAsTuples;
 
@@ -512,7 +513,7 @@ public class MapITCase extends 
MultipleProgramsTestBaseJUnit4 {
 
         DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
         Configuration conf = new Configuration();
-        conf.setInteger(TEST_KEY, TEST_VALUE);
+        conf.set(getIntConfigOption(TEST_KEY), TEST_VALUE);
         DataSet<Tuple3<Integer, Long, String>> bcMapDs =
                 ds.map(new RichMapper2()).withParameters(conf);
         List<Tuple3<Integer, Long, String>> result = bcMapDs.collect();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 8f85add350d..545f6e048d2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -49,6 +49,7 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption;
+import static 
org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption;
 
 /** Manually test the throughput of the network stack. */
 public class NetworkStackThroughputITCase extends TestLogger {
@@ -90,8 +91,11 @@ public class NetworkStackThroughputITCase extends TestLogger 
{
                 // Determine the amount of data to send per subtask
                 int dataVolumeGb =
                         getTaskConfiguration()
-                                .getInteger(
-                                        
NetworkStackThroughputITCase.DATA_VOLUME_GB_CONFIG_KEY, 1);
+                                .get(
+                                        getIntConfigOption(
+                                                NetworkStackThroughputITCase
+                                                        
.DATA_VOLUME_GB_CONFIG_KEY),
+                                        1);
 
                 long dataMbPerSubtask = (dataVolumeGb * 10) / 
getCurrentNumberOfSubtasks();
                 long numRecordsToEmit =
@@ -331,7 +335,8 @@ public class NetworkStackThroughputITCase extends 
TestLogger {
 
         producer.setInvokableClass(SpeedTestProducer.class);
         producer.setParallelism(numSubtasks);
-        producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, 
dataVolumeGb);
+        producer.getConfiguration()
+                .set(getIntConfigOption(DATA_VOLUME_GB_CONFIG_KEY), 
dataVolumeGb);
         producer.getConfiguration()
                 .set(getBooleanConfigOption(IS_SLOW_SENDER_CONFIG_KEY), 
isSlowSender);
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index bc65a5ee93d..20548c619ea 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -915,15 +915,15 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         if 
(HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
             // activate re-execution of failed applications
             appContext.setMaxAppAttempts(
-                    configuration.getInteger(
-                            YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
+                    configuration.get(
+                            YarnConfigOptions.APPLICATION_ATTEMPTS,
                             YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
 
             activateHighAvailabilitySupport(appContext);
         } else {
             // set number of application retries to 1 in the default case
             appContext.setMaxAppAttempts(
-                    
configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));
+                    configuration.get(YarnConfigOptions.APPLICATION_ATTEMPTS, 
1));
         }
 
         final Set<Path> userJarFiles = new HashSet<>();

Reply via email to