This is an automated email from the ASF dual-hosted git repository.

sxnan pushed a commit to branch config-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9c804a587242054b3ee919e00884a2f53162f603
Author: sxnan <[email protected]>
AuthorDate: Tue Sep 10 11:59:12 2024 +0800

    [FLINK-34079][config] Refactor callers that use get/setLong
---
 .../file/table/stream/StreamingFileWriterTest.java |  4 +-
 .../flink/api/common/io/BinaryInputFormat.java     |  6 ++-
 .../flink/api/common/io/BinaryOutputFormat.java    |  5 +-
 .../flink/configuration/ConfigurationUtils.java    |  4 ++
 .../flink/api/common/io/BinaryInputFormatTest.java |  5 +-
 .../ConfigurationConversionsTest.java              | 55 +++++++++++++---------
 .../flink/configuration/ConfigurationTest.java     |  5 +-
 .../configuration/DelegatingConfigurationTest.java |  8 ++--
 .../api/common/io/SequentialFormatTestBase.java    |  4 +-
 .../decorators/InitTaskManagerDecoratorTest.java   |  9 ++--
 .../apache/flink/runtime/jobgraph/JobGraph.java    | 11 +++--
 .../flink/streaming/api/graph/StreamConfig.java    |  7 +--
 .../TaskExecutorProcessUtilsTest.java              |  9 ++--
 .../ExternalResourceUtilsTest.java                 | 22 +++++----
 .../resourcemanager/WorkerResourceSpecTest.java    |  9 ++--
 .../apache/flink/runtime/taskmanager/TaskTest.java |  4 +-
 16 files changed, 107 insertions(+), 60 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java
index 7d58d36ccbc..f5562f1d196 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java
@@ -384,7 +384,7 @@ class StreamingFileWriterTest {
         configuration.set(SINK_PARTITION_COMMIT_POLICY_KIND, "success-file");
         
configuration.setString(PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER.key(), 
"yyyy-MM-dd");
         configuration.setString(SINK_PARTITION_COMMIT_TRIGGER.key(), 
"partition-time");
-        configuration.setLong(SINK_PARTITION_COMMIT_DELAY.key(), commitDelay);
+        configuration.set(SINK_PARTITION_COMMIT_DELAY, 
Duration.ofMillis(commitDelay));
         
configuration.setString(SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE.key(), "UTC");
         return configuration;
     }
@@ -393,7 +393,7 @@ class StreamingFileWriterTest {
         Configuration configuration = new Configuration();
         configuration.set(SINK_PARTITION_COMMIT_POLICY_KIND, "success-file");
         configuration.setString(SINK_PARTITION_COMMIT_TRIGGER.key(), 
"process-time");
-        configuration.setLong(SINK_PARTITION_COMMIT_DELAY.key(), commitDelay);
+        configuration.set(SINK_PARTITION_COMMIT_DELAY, 
Duration.ofMillis(commitDelay));
         
configuration.setString(SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE.key(), "UTC");
         return configuration;
     }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index 6d7c7fcdbb9..b9ed4f1da71 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -42,6 +42,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static 
org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption;
+
 /**
  * Base class for all input formats that use blocks of fixed size. The input 
splits are aligned to
  * these blocks, meaning that each split will consist of one block. Without 
configuration, these
@@ -90,7 +92,9 @@ public abstract class BinaryInputFormat<T> extends 
FileInputFormat<T>
         // overwriting the value set by the setter
 
         if (this.blockSize == NATIVE_BLOCK_SIZE) {
-            long blockSize = parameters.getLong(BLOCK_SIZE_PARAMETER_KEY, 
NATIVE_BLOCK_SIZE);
+            long blockSize =
+                    parameters.get(
+                            getLongConfigOption(BLOCK_SIZE_PARAMETER_KEY), 
NATIVE_BLOCK_SIZE);
             setBlockSize(blockSize);
         }
     }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
index f2dc9ca8883..cddeb40c163 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
@@ -27,6 +27,8 @@ import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import static 
org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption;
+
 @Public
 public abstract class BinaryOutputFormat<T> extends FileOutputFormat<T> {
 
@@ -63,7 +65,8 @@ public abstract class BinaryOutputFormat<T> extends 
FileOutputFormat<T> {
         super.configure(parameters);
 
         // read own parameters
-        this.blockSize = parameters.getLong(BLOCK_SIZE_PARAMETER_KEY, 
NATIVE_BLOCK_SIZE);
+        this.blockSize =
+                parameters.get(getLongConfigOption(BLOCK_SIZE_PARAMETER_KEY), 
NATIVE_BLOCK_SIZE);
         if (this.blockSize < 1 && this.blockSize != NATIVE_BLOCK_SIZE) {
             throw new IllegalArgumentException(
                     "The block size parameter must be set and larger than 0.");
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index f561b77cab9..f68c4da20d7 100755
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -707,4 +707,8 @@ public class ConfigurationUtils {
     public static ConfigOption<Integer> getIntConfigOption(String key) {
         return ConfigOptions.key(key).intType().noDefaultValue();
     }
+
+    public static ConfigOption<Long> getLongConfigOption(String key) {
+        return ConfigOptions.key(key).longType().noDefaultValue();
+    }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
index ab4791f88f2..9add3efd7ec 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
@@ -33,6 +33,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Path;
 
+import static 
org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Fail.fail;
 
@@ -67,7 +68,7 @@ class BinaryInputFormatTest {
                         "test_create_input_splits_with_one_file", blockSize, 
numBlocks);
 
         final Configuration config = new Configuration();
-        config.setLong("input.block_size", blockSize + 10);
+        config.set(getLongConfigOption("input.block_size"), blockSize + 10L);
 
         final BinaryInputFormat<Record> inputFormat = new 
MyBinaryInputFormat();
         inputFormat.setFilePath(tempFile.toURI().toString());
@@ -184,7 +185,7 @@ class BinaryInputFormatTest {
                         "test_create_input_splits_with_empty_split", 
blockSize, numBlocks);
 
         final Configuration config = new Configuration();
-        config.setLong("input.block_size", blockSize + 10);
+        config.set(getLongConfigOption("input.block_size"), blockSize + 10L);
 
         final BinaryInputFormat<Record> inputFormat = new 
MyBinaryInputFormat();
         inputFormat.setFilePath(tempFile.toURI().toString());
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java
index d96bcd85e73..cf59fc0ccbc 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java
@@ -38,6 +38,7 @@ import static 
org.apache.flink.configuration.ConfigurationUtils.getBooleanConfig
 import static 
org.apache.flink.configuration.ConfigurationUtils.getDoubleConfigOption;
 import static 
org.apache.flink.configuration.ConfigurationUtils.getFloatConfigOption;
 import static 
org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption;
+import static 
org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -62,8 +63,8 @@ class ConfigurationConversionsTest {
         pc = new Configuration();
 
         pc.set(getIntConfigOption("int"), 5);
-        pc.setLong("long", 15);
-        pc.setLong("too_long", TOO_LONG);
+        pc.set(getLongConfigOption("long"), 15L);
+        pc.set(getLongConfigOption("too_long"), TOO_LONG);
         pc.set(getFloatConfigOption("float"), 2.1456775f);
         pc.set(getDoubleConfigOption("double"), Math.PI);
         pc.set(getDoubleConfigOption("negative_double"), -1.0);
@@ -79,7 +80,7 @@ class ConfigurationConversionsTest {
         return Arrays.asList(
                 // from integer
                 TestSpec.whenAccessed(conf -> 
conf.get(getIntConfigOption("int"), 0)).expect(5),
-                TestSpec.whenAccessed(conf -> conf.getLong("int", 
0)).expect(5L),
+                TestSpec.whenAccessed(conf -> 
conf.get(getLongConfigOption("int"), 0L)).expect(5L),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("int"), 0f)).expect(5f),
                 TestSpec.whenAccessed(conf -> 
conf.get(getDoubleConfigOption("int"), 0.0))
                         .expect(5.0),
@@ -91,7 +92,8 @@ class ConfigurationConversionsTest {
 
                 // from long
                 TestSpec.whenAccessed(conf -> 
conf.get(getIntConfigOption("long"), 0)).expect(15),
-                TestSpec.whenAccessed(conf -> conf.getLong("long", 
0)).expect(15L),
+                TestSpec.whenAccessed(conf -> 
conf.get(getLongConfigOption("long"), 0L))
+                        .expect(15L),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("long"), 0f))
                         .expect(15f),
                 TestSpec.whenAccessed(conf -> 
conf.get(getDoubleConfigOption("long"), 0.0))
@@ -106,7 +108,8 @@ class ConfigurationConversionsTest {
                 // from too long
                 TestSpec.whenAccessed(conf -> 
conf.get(getIntConfigOption("too_long"), 0))
                         .expectException("Could not parse value '2147483657' 
for key 'too_long'."),
-                TestSpec.whenAccessed(conf -> conf.getLong("too_long", 
0)).expect(TOO_LONG),
+                TestSpec.whenAccessed(conf -> 
conf.get(getLongConfigOption("too_long"), 0L))
+                        .expect(TOO_LONG),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("too_long"), 0f))
                         .expect((float) TOO_LONG),
                 TestSpec.whenAccessed(conf -> 
conf.get(getDoubleConfigOption("too_long"), 0.0))
@@ -124,9 +127,10 @@ class ConfigurationConversionsTest {
                         .expectException(
                                 "Could not parse value '2.1456776' for key 
'float'.",
                                 IllegalArgumentException.class),
-                TestSpec.whenAccessed(conf -> conf.getLong("float", 0))
+                TestSpec.whenAccessed(conf -> 
conf.get(getLongConfigOption("float"), 0L))
                         .expectException(
-                                "For input string: \"2.1456776\"", 
NumberFormatException.class),
+                                "Could not parse value '2.1456776' for key 
'float'.",
+                                IllegalArgumentException.class),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("float"), 0f))
                         .expect(2.1456775f),
                 TestSpec.whenAccessed(conf -> 
conf.get(getDoubleConfigOption("float"), 0.0))
@@ -147,10 +151,10 @@ class ConfigurationConversionsTest {
                         .expectException(
                                 "Could not parse value '3.141592653589793' for 
key 'double'.",
                                 IllegalArgumentException.class),
-                TestSpec.whenAccessed(conf -> conf.getLong("double", 0))
+                TestSpec.whenAccessed(conf -> 
conf.get(getLongConfigOption("double"), 0L))
                         .expectException(
-                                "For input string: \"3.141592653589793\"",
-                                NumberFormatException.class),
+                                "Could not parse value '3.141592653589793' for 
key 'double'.",
+                                IllegalArgumentException.class),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("double"), 0f))
                         .expect(new IsCloseTo(3.141592f, 0.000001f)),
                 TestSpec.whenAccessed(conf -> 
conf.get(getDoubleConfigOption("double"), 0.0))
@@ -169,8 +173,10 @@ class ConfigurationConversionsTest {
                         .expectException(
                                 "Could not parse value '-1.0' for key 
'negative_double'.",
                                 IllegalArgumentException.class),
-                TestSpec.whenAccessed(conf -> conf.getLong("negative_double", 
0))
-                        .expectException("For input string: \"-1.0\"", 
NumberFormatException.class),
+                TestSpec.whenAccessed(conf -> 
conf.get(getLongConfigOption("negative_double"), 0L))
+                        .expectException(
+                                "Could not parse value '-1.0' for key 
'negative_double'.",
+                                IllegalArgumentException.class),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("negative_double"), 0f))
                         .expect(new IsCloseTo(-1f, 0.000001f)),
                 TestSpec.whenAccessed(
@@ -190,8 +196,10 @@ class ConfigurationConversionsTest {
                         .expectException(
                                 "Could not parse value '0.0' for key 'zero'.",
                                 IllegalArgumentException.class),
-                TestSpec.whenAccessed(conf -> conf.getLong("zero", 0))
-                        .expectException("For input string: \"0.0\"", 
NumberFormatException.class),
+                TestSpec.whenAccessed(conf -> 
conf.get(getLongConfigOption("zero"), 0L))
+                        .expectException(
+                                "Could not parse value '0.0' for key 'zero'.",
+                                IllegalArgumentException.class),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("zero"), 0f))
                         .expect(new IsCloseTo(0f, 0.000001f)),
                 TestSpec.whenAccessed(conf -> 
conf.get(getDoubleConfigOption("zero"), 0.0))
@@ -209,10 +217,10 @@ class ConfigurationConversionsTest {
                         .expectException(
                                 "Could not parse value 
'1.7976931348623157E308' for key 'too_long_double'.",
                                 IllegalArgumentException.class),
-                TestSpec.whenAccessed(conf -> conf.getLong("too_long_double", 
0))
+                TestSpec.whenAccessed(conf -> 
conf.get(getLongConfigOption("too_long_double"), 0L))
                         .expectException(
-                                "For input string: \"1.7976931348623157E308\"",
-                                NumberFormatException.class),
+                                "Could not parse value 
'1.7976931348623157E308' for key 'too_long_double'.",
+                                IllegalArgumentException.class),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("too_long_double"), 0f))
                         .expectException(
                                 "Could not parse value 
'1.7976931348623157E308' for key 'too_long_double'."),
@@ -231,7 +239,8 @@ class ConfigurationConversionsTest {
 
                 // from string
                 TestSpec.whenAccessed(conf -> 
conf.get(getIntConfigOption("string"), 0)).expect(42),
-                TestSpec.whenAccessed(conf -> conf.getLong("string", 
0)).expect(42L),
+                TestSpec.whenAccessed(conf -> 
conf.get(getLongConfigOption("string"), 0L))
+                        .expect(42L),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("string"), 0f))
                         .expect(42f),
                 TestSpec.whenAccessed(conf -> 
conf.get(getDoubleConfigOption("string"), 0.0))
@@ -249,9 +258,11 @@ class ConfigurationConversionsTest {
                         .expectException(
                                 "Could not parse value 'bcdefg&&' for key 
'non_convertible_string'.",
                                 IllegalArgumentException.class),
-                TestSpec.whenAccessed(conf -> 
conf.getLong("non_convertible_string", 0))
+                TestSpec.whenAccessed(
+                                conf -> 
conf.get(getLongConfigOption("non_convertible_string"), 0L))
                         .expectException(
-                                "For input string: \"bcdefg&&\"", 
NumberFormatException.class),
+                                "Could not parse value 'bcdefg&&' for key 
'non_convertible_string'.",
+                                IllegalArgumentException.class),
                 TestSpec.whenAccessed(
                                 conf ->
                                         conf.get(
@@ -283,8 +294,8 @@ class ConfigurationConversionsTest {
                 // from boolean
                 TestSpec.whenAccessed(conf -> 
conf.get(getIntConfigOption("boolean"), 0))
                         .expectException("Could not parse value 'true' for key 
'boolean'."),
-                TestSpec.whenAccessed(conf -> conf.getLong("boolean", 0))
-                        .expectException("For input string: \"true\""),
+                TestSpec.whenAccessed(conf -> 
conf.get(getLongConfigOption("boolean"), 0L))
+                        .expectException("Could not parse value 'true' for key 
'boolean'."),
                 TestSpec.whenAccessed(conf -> 
conf.get(getFloatConfigOption("boolean"), 0f))
                         .expectException("Could not parse value 'true' for key 
'boolean'."),
                 TestSpec.whenAccessed(conf -> 
conf.get(getDoubleConfigOption("boolean"), 0.0))
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index 6363d748d5e..f0ddec497e2 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -41,6 +41,7 @@ import static 
org.apache.flink.configuration.ConfigurationUtils.getBooleanConfig
 import static 
org.apache.flink.configuration.ConfigurationUtils.getDoubleConfigOption;
 import static 
org.apache.flink.configuration.ConfigurationUtils.getFloatConfigOption;
 import static 
org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption;
+import static 
org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -88,7 +89,7 @@ class ConfigurationTest {
         final Configuration orig = new Configuration(standardYaml);
         orig.setString("mykey", "myvalue");
         orig.set(getIntConfigOption("mynumber"), 100);
-        orig.setLong("longvalue", 478236947162389746L);
+        orig.set(getLongConfigOption("longvalue"), 478236947162389746L);
         orig.set(getFloatConfigOption("PI"), 3.1415926f);
         orig.set(getDoubleConfigOption("E"), Math.E);
         orig.set(getBooleanConfigOption("shouldbetrue"), true);
@@ -97,7 +98,7 @@ class ConfigurationTest {
         final Configuration copy = InstantiationUtil.createCopyWritable(orig);
         assertThat("myvalue").isEqualTo(copy.getString("mykey", "null"));
         assertThat(copy.get(getIntConfigOption("mynumber"), 0)).isEqualTo(100);
-        assertThat(copy.getLong("longvalue", 
0L)).isEqualTo(478236947162389746L);
+        assertThat(copy.get(getLongConfigOption("longvalue"), 
0L)).isEqualTo(478236947162389746L);
         assertThat(copy.get(getFloatConfigOption("PI"), 3.1415926f))
                 .isCloseTo(3.1415926f, Offset.offset(0.0f));
         assertThat(copy.get(getDoubleConfigOption("E"), 
0.0)).isCloseTo(Math.E, Offset.offset(0.0));
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
index b7073cd381e..9ff2719ebf8 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
@@ -186,11 +186,11 @@ class DelegatingConfigurationTest {
 
         // Test for long
         ConfigOption<Long> longOption = 
ConfigOptions.key("long.key").longType().noDefaultValue();
-        original.setLong(longOption, 10L);
-        assertThat(delegatingConf.getLong(longOption, 11L)).isEqualTo(11L);
+        original.set(longOption, 10L);
         assertThat(delegatingConf.get(longOption, 11L)).isEqualTo(11L);
-        delegatingConf.setLong(longOption, 12L);
-        assertThat(delegatingConf.getLong(longOption, 11L)).isEqualTo(12L);
+        assertThat(delegatingConf.get(longOption, 11L)).isEqualTo(11L);
+        delegatingConf.set(longOption, 12L);
+        assertThat(delegatingConf.get(longOption, 11L)).isEqualTo(12L);
         assertThat(delegatingConf.get(longOption, 11L)).isEqualTo(12L);
 
         // Test for boolean
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
 
b/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
index 048a1d8cec9..57560504f69 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
@@ -42,6 +42,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 
+import static 
org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test base for {@link BinaryInputFormat} and {@link BinaryOutputFormat}. */
@@ -179,7 +180,8 @@ public abstract class SequentialFormatTestBase<T> {
         this.tempFile = File.createTempFile("BinaryInputFormat", null);
         this.tempFile.deleteOnExit();
         Configuration configuration = new Configuration();
-        configuration.setLong(BinaryOutputFormat.BLOCK_SIZE_PARAMETER_KEY, 
this.blockSize);
+        configuration.set(
+                
getLongConfigOption(BinaryOutputFormat.BLOCK_SIZE_PARAMETER_KEY), 
this.blockSize);
         if (this.parallelism == 1) {
             BinaryOutputFormat<T> output =
                     createOutputFormat(this.tempFile.toURI().toString(), 
configuration);
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
index 526783a67b4..4caab5c57bf 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
@@ -45,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** General tests for the {@link InitJobManagerDecorator}. */
@@ -90,9 +91,11 @@ class InitTaskManagerDecoratorTest extends 
KubernetesTaskManagerTestBase {
 
         // Set up external resource configs
         
flinkConfig.setString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), 
RESOURCE_NAME);
-        flinkConfig.setLong(
-                
ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(
-                        RESOURCE_NAME, 
ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX),
+        flinkConfig.set(
+                getLongConfigOption(
+                        
ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(
+                                RESOURCE_NAME,
+                                
ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX)),
                 RESOURCE_AMOUNT);
         flinkConfig.setString(
                 
ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 419d805a63d..99c2fb75bd7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobStatusHook;
 import org.apache.flink.core.fs.Path;
@@ -68,7 +70,10 @@ public class JobGraph implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    private static final String INITIAL_CLIENT_HEARTBEAT_TIMEOUT = 
"initialClientHeartbeatTimeout";
+    private static final ConfigOption<Long> INITIAL_CLIENT_HEARTBEAT_TIMEOUT =
+            ConfigOptions.key("initialClientHeartbeatTimeout")
+                    .longType()
+                    .defaultValue(Long.MIN_VALUE);
 
     // --- job and configuration ---
 
@@ -656,10 +661,10 @@ public class JobGraph implements Serializable {
     }
 
     public void setInitialClientHeartbeatTimeout(long 
initialClientHeartbeatTimeout) {
-        jobConfiguration.setLong(INITIAL_CLIENT_HEARTBEAT_TIMEOUT, 
initialClientHeartbeatTimeout);
+        jobConfiguration.set(INITIAL_CLIENT_HEARTBEAT_TIMEOUT, 
initialClientHeartbeatTimeout);
     }
 
     public long getInitialClientHeartbeatTimeout() {
-        return jobConfiguration.getLong(INITIAL_CLIENT_HEARTBEAT_TIMEOUT, 
Long.MIN_VALUE);
+        return jobConfiguration.get(INITIAL_CLIENT_HEARTBEAT_TIMEOUT);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index c0e4214d41e..b44c5a25553 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -100,7 +100,8 @@ public class StreamConfig implements Serializable {
     private static final String INPUTS = "inputs";
     private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out";
     private static final String TYPE_SERIALIZER_SIDEOUT_PREFIX = 
"typeSerializer_sideout_";
-    private static final String ITERATON_WAIT = "iterationWait";
+    private static final ConfigOption<Long> ITERATON_WAIT =
+            ConfigOptions.key("iterationWait").longType().defaultValue(0L);
     private static final String OP_NONCHAINED_OUTPUTS = "opNonChainedOutputs";
     private static final String VERTEX_NONCHAINED_OUTPUTS = 
"vertexNonChainedOutputs";
     private static final String IN_STREAM_EDGES = "inStreamEdges";
@@ -447,11 +448,11 @@ public class StreamConfig implements Serializable {
     }
 
     public void setIterationWaitTime(long time) {
-        config.setLong(ITERATON_WAIT, time);
+        config.set(ITERATON_WAIT, time);
     }
 
     public long getIterationWaitTime() {
-        return config.getLong(ITERATON_WAIT, 0);
+        return config.get(ITERATON_WAIT);
     }
 
     public void setNumberOfNetworkInputs(int numberOfInputs) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
index 257c309e3a4..d8e91b6b3ba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
@@ -37,6 +37,7 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.function.Consumer;
 
+import static 
org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption;
 import static 
org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.TM_LEGACY_HEAP_OPTIONS;
 import static 
org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.TM_PROCESS_MEMORY_OPTIONS;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -635,9 +636,11 @@ class TaskExecutorProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<TaskExecut
         final Configuration config = new Configuration();
         config.setString(
                 ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), 
EXTERNAL_RESOURCE_NAME_1);
-        config.setLong(
-                
ExternalResourceOptions.getAmountConfigOptionForResource(EXTERNAL_RESOURCE_NAME_1),
-                1);
+        config.set(
+                getLongConfigOption(
+                        
ExternalResourceOptions.getAmountConfigOptionForResource(
+                                EXTERNAL_RESOURCE_NAME_1)),
+                1L);
         config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(4096));
         final TaskExecutorProcessSpec taskExecutorProcessSpec =
                 TaskExecutorProcessUtils.processSpecFromConfig(config);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
index f3895a3e5f5..5f3b38060b0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
@@ -38,6 +38,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.core.Is.is;
@@ -228,8 +229,9 @@ public class ExternalResourceUtilsTest extends TestLogger {
         config.set(
                 ExternalResourceOptions.EXTERNAL_RESOURCE_LIST,
                 Collections.singletonList(RESOURCE_NAME_1));
-        config.setLong(
-                
ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1),
+        config.set(
+                getLongConfigOption(
+                        
ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1)),
                 RESOURCE_AMOUNT_1);
 
         final Map<String, Long> externalResourceAmountMap =
@@ -246,8 +248,10 @@ public class ExternalResourceUtilsTest extends TestLogger {
         config.set(
                 ExternalResourceOptions.EXTERNAL_RESOURCE_LIST,
                 Collections.singletonList(RESOURCE_NAME_1));
-        config.setLong(
-                
ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), 0);
+        config.set(
+                getLongConfigOption(
+                        
ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1)),
+                0L);
 
         final Map<String, Long> externalResourceAmountMap =
                 ExternalResourceUtils.getExternalResourceAmountMap(config);
@@ -261,8 +265,9 @@ public class ExternalResourceUtilsTest extends TestLogger {
         config.set(
                 ExternalResourceOptions.EXTERNAL_RESOURCE_LIST,
                 Collections.singletonList(RESOURCE_NAME_1));
-        config.setLong(
-                
ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1),
+        config.set(
+                getLongConfigOption(
+                        
ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1)),
                 RESOURCE_AMOUNT_1);
 
         final Collection<ExternalResource> externalResources =
@@ -279,8 +284,9 @@ public class ExternalResourceUtilsTest extends TestLogger {
         final Configuration config = new Configuration();
         config.setString(
                 ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), 
ExternalResourceOptions.NONE);
-        config.setLong(
-                
ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1),
+        config.set(
+                getLongConfigOption(
+                        
ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1)),
                 RESOURCE_AMOUNT_1);
 
         final Collection<ExternalResource> externalResources =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java
index 79d458073e6..0bd1d4264fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 
 import org.junit.jupiter.api.Test;
 
+import static 
org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link WorkerResourceSpec}. */
@@ -267,9 +268,11 @@ class WorkerResourceSpecTest {
         final Configuration config = new Configuration();
         config.setString(
                 ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), 
EXTERNAL_RESOURCE_NAME);
-        config.setLong(
-                
ExternalResourceOptions.getAmountConfigOptionForResource(EXTERNAL_RESOURCE_NAME),
-                1);
+        config.set(
+                getLongConfigOption(
+                        
ExternalResourceOptions.getAmountConfigOptionForResource(
+                                EXTERNAL_RESOURCE_NAME)),
+                1L);
 
         final TaskExecutorProcessSpec taskExecutorProcessSpec =
                 TaskExecutorProcessUtils.newProcessSpecBuilder(config)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 4aae1d19414..45dfcd32c0d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -917,8 +917,8 @@ public class TaskTest extends TestLogger {
         final TaskManagerActions taskManagerActions = new 
ProhibitFatalErrorTaskManagerActions();
 
         final Configuration config = new Configuration();
-        config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5);
-        config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 60 
* 1000);
+        config.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 
Duration.ofMillis(5));
+        config.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 
Duration.ofMillis(60 * 1000));
 
         final Task task =
                 createTaskBuilder()


Reply via email to