This is an automated email from the ASF dual-hosted git repository. sxnan pushed a commit to branch config-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9c804a587242054b3ee919e00884a2f53162f603 Author: sxnan <[email protected]> AuthorDate: Tue Sep 10 11:59:12 2024 +0800 [FLINK-34079][config] Refactor callers that use get/setLong --- .../file/table/stream/StreamingFileWriterTest.java | 4 +- .../flink/api/common/io/BinaryInputFormat.java | 6 ++- .../flink/api/common/io/BinaryOutputFormat.java | 5 +- .../flink/configuration/ConfigurationUtils.java | 4 ++ .../flink/api/common/io/BinaryInputFormatTest.java | 5 +- .../ConfigurationConversionsTest.java | 55 +++++++++++++--------- .../flink/configuration/ConfigurationTest.java | 5 +- .../configuration/DelegatingConfigurationTest.java | 8 ++-- .../api/common/io/SequentialFormatTestBase.java | 4 +- .../decorators/InitTaskManagerDecoratorTest.java | 9 ++-- .../apache/flink/runtime/jobgraph/JobGraph.java | 11 +++-- .../flink/streaming/api/graph/StreamConfig.java | 7 +-- .../TaskExecutorProcessUtilsTest.java | 9 ++-- .../ExternalResourceUtilsTest.java | 22 +++++---- .../resourcemanager/WorkerResourceSpecTest.java | 9 ++-- .../apache/flink/runtime/taskmanager/TaskTest.java | 4 +- 16 files changed, 107 insertions(+), 60 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java index 7d58d36ccbc..f5562f1d196 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java @@ -384,7 +384,7 @@ class StreamingFileWriterTest { configuration.set(SINK_PARTITION_COMMIT_POLICY_KIND, "success-file"); configuration.setString(PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER.key(), "yyyy-MM-dd"); configuration.setString(SINK_PARTITION_COMMIT_TRIGGER.key(), "partition-time"); - configuration.setLong(SINK_PARTITION_COMMIT_DELAY.key(), commitDelay); + configuration.set(SINK_PARTITION_COMMIT_DELAY, Duration.ofMillis(commitDelay)); configuration.setString(SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE.key(), "UTC"); return configuration; } @@ -393,7 +393,7 @@ class StreamingFileWriterTest { Configuration configuration = new Configuration(); configuration.set(SINK_PARTITION_COMMIT_POLICY_KIND, "success-file"); configuration.setString(SINK_PARTITION_COMMIT_TRIGGER.key(), "process-time"); - configuration.setLong(SINK_PARTITION_COMMIT_DELAY.key(), commitDelay); + configuration.set(SINK_PARTITION_COMMIT_DELAY, Duration.ofMillis(commitDelay)); configuration.setString(SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE.key(), "UTC"); return configuration; } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java index 6d7c7fcdbb9..b9ed4f1da71 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java @@ -42,6 +42,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; + /** * Base class for all input formats that use blocks of fixed size. The input splits are aligned to * these blocks, meaning that each split will consist of one block. Without configuration, these @@ -90,7 +92,9 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> // overwriting the value set by the setter if (this.blockSize == NATIVE_BLOCK_SIZE) { - long blockSize = parameters.getLong(BLOCK_SIZE_PARAMETER_KEY, NATIVE_BLOCK_SIZE); + long blockSize = + parameters.get( + getLongConfigOption(BLOCK_SIZE_PARAMETER_KEY), NATIVE_BLOCK_SIZE); setBlockSize(blockSize); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java index f2dc9ca8883..cddeb40c163 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java @@ -27,6 +27,8 @@ import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; + @Public public abstract class BinaryOutputFormat<T> extends FileOutputFormat<T> { @@ -63,7 +65,8 @@ public abstract class BinaryOutputFormat<T> extends FileOutputFormat<T> { super.configure(parameters); // read own parameters - this.blockSize = parameters.getLong(BLOCK_SIZE_PARAMETER_KEY, NATIVE_BLOCK_SIZE); + this.blockSize = + parameters.get(getLongConfigOption(BLOCK_SIZE_PARAMETER_KEY), NATIVE_BLOCK_SIZE); if (this.blockSize < 1 && this.blockSize != NATIVE_BLOCK_SIZE) { throw new IllegalArgumentException( "The block size parameter must be set and larger than 0."); diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index f561b77cab9..f68c4da20d7 100755 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -707,4 +707,8 @@ public class ConfigurationUtils { public static ConfigOption<Integer> getIntConfigOption(String key) { return ConfigOptions.key(key).intType().noDefaultValue(); } + + public static ConfigOption<Long> getLongConfigOption(String key) { + return ConfigOptions.key(key).longType().noDefaultValue(); + } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java index ab4791f88f2..9add3efd7ec 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java @@ -33,6 +33,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.file.Path; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.fail; @@ -67,7 +68,7 @@ class BinaryInputFormatTest { "test_create_input_splits_with_one_file", blockSize, numBlocks); final Configuration config = new Configuration(); - config.setLong("input.block_size", blockSize + 10); + config.set(getLongConfigOption("input.block_size"), blockSize + 10L); final BinaryInputFormat<Record> inputFormat = new MyBinaryInputFormat(); inputFormat.setFilePath(tempFile.toURI().toString()); @@ -184,7 +185,7 @@ class BinaryInputFormatTest { "test_create_input_splits_with_empty_split", blockSize, numBlocks); final Configuration config = new Configuration(); - config.setLong("input.block_size", blockSize + 10); + config.set(getLongConfigOption("input.block_size"), blockSize + 10L); final BinaryInputFormat<Record> inputFormat = new MyBinaryInputFormat(); inputFormat.setFilePath(tempFile.toURI().toString()); diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java index d96bcd85e73..cf59fc0ccbc 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java @@ -38,6 +38,7 @@ import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfig import static org.apache.flink.configuration.ConfigurationUtils.getDoubleConfigOption; import static org.apache.flink.configuration.ConfigurationUtils.getFloatConfigOption; import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -62,8 +63,8 @@ class ConfigurationConversionsTest { pc = new Configuration(); pc.set(getIntConfigOption("int"), 5); - pc.setLong("long", 15); - pc.setLong("too_long", TOO_LONG); + pc.set(getLongConfigOption("long"), 15L); + pc.set(getLongConfigOption("too_long"), TOO_LONG); pc.set(getFloatConfigOption("float"), 2.1456775f); pc.set(getDoubleConfigOption("double"), Math.PI); pc.set(getDoubleConfigOption("negative_double"), -1.0); @@ -79,7 +80,7 @@ class ConfigurationConversionsTest { return Arrays.asList( // from integer TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("int"), 0)).expect(5), - TestSpec.whenAccessed(conf -> conf.getLong("int", 0)).expect(5L), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("int"), 0L)).expect(5L), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("int"), 0f)).expect(5f), TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("int"), 0.0)) .expect(5.0), @@ -91,7 +92,8 @@ class ConfigurationConversionsTest { // from long TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("long"), 0)).expect(15), - TestSpec.whenAccessed(conf -> conf.getLong("long", 0)).expect(15L), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("long"), 0L)) + .expect(15L), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("long"), 0f)) .expect(15f), TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("long"), 0.0)) @@ -106,7 +108,8 @@ class ConfigurationConversionsTest { // from too long TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("too_long"), 0)) .expectException("Could not parse value '2147483657' for key 'too_long'."), - TestSpec.whenAccessed(conf -> conf.getLong("too_long", 0)).expect(TOO_LONG), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("too_long"), 0L)) + .expect(TOO_LONG), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("too_long"), 0f)) .expect((float) TOO_LONG), TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("too_long"), 0.0)) @@ -124,9 +127,10 @@ class ConfigurationConversionsTest { .expectException( "Could not parse value '2.1456776' for key 'float'.", IllegalArgumentException.class), - TestSpec.whenAccessed(conf -> conf.getLong("float", 0)) + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("float"), 0L)) .expectException( - "For input string: \"2.1456776\"", NumberFormatException.class), + "Could not parse value '2.1456776' for key 'float'.", + IllegalArgumentException.class), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("float"), 0f)) .expect(2.1456775f), TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("float"), 0.0)) @@ -147,10 +151,10 @@ class ConfigurationConversionsTest { .expectException( "Could not parse value '3.141592653589793' for key 'double'.", IllegalArgumentException.class), - TestSpec.whenAccessed(conf -> conf.getLong("double", 0)) + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("double"), 0L)) .expectException( - "For input string: \"3.141592653589793\"", - NumberFormatException.class), + "Could not parse value '3.141592653589793' for key 'double'.", + IllegalArgumentException.class), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("double"), 0f)) .expect(new IsCloseTo(3.141592f, 0.000001f)), TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("double"), 0.0)) @@ -169,8 +173,10 @@ class ConfigurationConversionsTest { .expectException( "Could not parse value '-1.0' for key 'negative_double'.", IllegalArgumentException.class), - TestSpec.whenAccessed(conf -> conf.getLong("negative_double", 0)) - .expectException("For input string: \"-1.0\"", NumberFormatException.class), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("negative_double"), 0L)) + .expectException( + "Could not parse value '-1.0' for key 'negative_double'.", + IllegalArgumentException.class), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("negative_double"), 0f)) .expect(new IsCloseTo(-1f, 0.000001f)), TestSpec.whenAccessed( @@ -190,8 +196,10 @@ class ConfigurationConversionsTest { .expectException( "Could not parse value '0.0' for key 'zero'.", IllegalArgumentException.class), - TestSpec.whenAccessed(conf -> conf.getLong("zero", 0)) - .expectException("For input string: \"0.0\"", NumberFormatException.class), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("zero"), 0L)) + .expectException( + "Could not parse value '0.0' for key 'zero'.", + IllegalArgumentException.class), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("zero"), 0f)) .expect(new IsCloseTo(0f, 0.000001f)), TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("zero"), 0.0)) @@ -209,10 +217,10 @@ class ConfigurationConversionsTest { .expectException( "Could not parse value '1.7976931348623157E308' for key 'too_long_double'.", IllegalArgumentException.class), - TestSpec.whenAccessed(conf -> conf.getLong("too_long_double", 0)) + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("too_long_double"), 0L)) .expectException( - "For input string: \"1.7976931348623157E308\"", - NumberFormatException.class), + "Could not parse value '1.7976931348623157E308' for key 'too_long_double'.", + IllegalArgumentException.class), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("too_long_double"), 0f)) .expectException( "Could not parse value '1.7976931348623157E308' for key 'too_long_double'."), @@ -231,7 +239,8 @@ class ConfigurationConversionsTest { // from string TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("string"), 0)).expect(42), - TestSpec.whenAccessed(conf -> conf.getLong("string", 0)).expect(42L), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("string"), 0L)) + .expect(42L), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("string"), 0f)) .expect(42f), TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("string"), 0.0)) @@ -249,9 +258,11 @@ class ConfigurationConversionsTest { .expectException( "Could not parse value 'bcdefg&&' for key 'non_convertible_string'.", IllegalArgumentException.class), - TestSpec.whenAccessed(conf -> conf.getLong("non_convertible_string", 0)) + TestSpec.whenAccessed( + conf -> conf.get(getLongConfigOption("non_convertible_string"), 0L)) .expectException( - "For input string: \"bcdefg&&\"", NumberFormatException.class), + "Could not parse value 'bcdefg&&' for key 'non_convertible_string'.", + IllegalArgumentException.class), TestSpec.whenAccessed( conf -> conf.get( @@ -283,8 +294,8 @@ class ConfigurationConversionsTest { // from boolean TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("boolean"), 0)) .expectException("Could not parse value 'true' for key 'boolean'."), - TestSpec.whenAccessed(conf -> conf.getLong("boolean", 0)) - .expectException("For input string: \"true\""), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("boolean"), 0L)) + .expectException("Could not parse value 'true' for key 'boolean'."), TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("boolean"), 0f)) .expectException("Could not parse value 'true' for key 'boolean'."), TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("boolean"), 0.0)) diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java index 6363d748d5e..f0ddec497e2 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java @@ -41,6 +41,7 @@ import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfig import static org.apache.flink.configuration.ConfigurationUtils.getDoubleConfigOption; import static org.apache.flink.configuration.ConfigurationUtils.getFloatConfigOption; import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -88,7 +89,7 @@ class ConfigurationTest { final Configuration orig = new Configuration(standardYaml); orig.setString("mykey", "myvalue"); orig.set(getIntConfigOption("mynumber"), 100); - orig.setLong("longvalue", 478236947162389746L); + orig.set(getLongConfigOption("longvalue"), 478236947162389746L); orig.set(getFloatConfigOption("PI"), 3.1415926f); orig.set(getDoubleConfigOption("E"), Math.E); orig.set(getBooleanConfigOption("shouldbetrue"), true); @@ -97,7 +98,7 @@ class ConfigurationTest { final Configuration copy = InstantiationUtil.createCopyWritable(orig); assertThat("myvalue").isEqualTo(copy.getString("mykey", "null")); assertThat(copy.get(getIntConfigOption("mynumber"), 0)).isEqualTo(100); - assertThat(copy.getLong("longvalue", 0L)).isEqualTo(478236947162389746L); + assertThat(copy.get(getLongConfigOption("longvalue"), 0L)).isEqualTo(478236947162389746L); assertThat(copy.get(getFloatConfigOption("PI"), 3.1415926f)) .isCloseTo(3.1415926f, Offset.offset(0.0f)); assertThat(copy.get(getDoubleConfigOption("E"), 0.0)).isCloseTo(Math.E, Offset.offset(0.0)); diff --git a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java index b7073cd381e..9ff2719ebf8 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java @@ -186,11 +186,11 @@ class DelegatingConfigurationTest { // Test for long ConfigOption<Long> longOption = ConfigOptions.key("long.key").longType().noDefaultValue(); - original.setLong(longOption, 10L); - assertThat(delegatingConf.getLong(longOption, 11L)).isEqualTo(11L); + original.set(longOption, 10L); assertThat(delegatingConf.get(longOption, 11L)).isEqualTo(11L); - delegatingConf.setLong(longOption, 12L); - assertThat(delegatingConf.getLong(longOption, 11L)).isEqualTo(12L); + assertThat(delegatingConf.get(longOption, 11L)).isEqualTo(11L); + delegatingConf.set(longOption, 12L); + assertThat(delegatingConf.get(longOption, 11L)).isEqualTo(12L); assertThat(delegatingConf.get(longOption, 11L)).isEqualTo(12L); // Test for boolean diff --git a/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java b/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java index 048a1d8cec9..57560504f69 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java @@ -42,6 +42,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; import static org.assertj.core.api.Assertions.assertThat; /** Test base for {@link BinaryInputFormat} and {@link BinaryOutputFormat}. */ @@ -179,7 +180,8 @@ public abstract class SequentialFormatTestBase<T> { this.tempFile = File.createTempFile("BinaryInputFormat", null); this.tempFile.deleteOnExit(); Configuration configuration = new Configuration(); - configuration.setLong(BinaryOutputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize); + configuration.set( + getLongConfigOption(BinaryOutputFormat.BLOCK_SIZE_PARAMETER_KEY), this.blockSize); if (this.parallelism == 1) { BinaryOutputFormat<T> output = createOutputFormat(this.tempFile.toURI().toString(), configuration); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java index 526783a67b4..4caab5c57bf 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java @@ -45,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; import static org.assertj.core.api.Assertions.assertThat; /** General tests for the {@link InitJobManagerDecorator}. */ @@ -90,9 +91,11 @@ class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase { // Set up external resource configs flinkConfig.setString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), RESOURCE_NAME); - flinkConfig.setLong( - ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource( - RESOURCE_NAME, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), + flinkConfig.set( + getLongConfigOption( + ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource( + RESOURCE_NAME, + ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX)), RESOURCE_AMOUNT); flinkConfig.setString( ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 419d805a63d..99c2fb75bd7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobStatusHook; import org.apache.flink.core.fs.Path; @@ -68,7 +70,10 @@ public class JobGraph implements Serializable { private static final long serialVersionUID = 1L; - private static final String INITIAL_CLIENT_HEARTBEAT_TIMEOUT = "initialClientHeartbeatTimeout"; + private static final ConfigOption<Long> INITIAL_CLIENT_HEARTBEAT_TIMEOUT = + ConfigOptions.key("initialClientHeartbeatTimeout") + .longType() + .defaultValue(Long.MIN_VALUE); // --- job and configuration --- @@ -656,10 +661,10 @@ public class JobGraph implements Serializable { } public void setInitialClientHeartbeatTimeout(long initialClientHeartbeatTimeout) { - jobConfiguration.setLong(INITIAL_CLIENT_HEARTBEAT_TIMEOUT, initialClientHeartbeatTimeout); + jobConfiguration.set(INITIAL_CLIENT_HEARTBEAT_TIMEOUT, initialClientHeartbeatTimeout); } public long getInitialClientHeartbeatTimeout() { - return jobConfiguration.getLong(INITIAL_CLIENT_HEARTBEAT_TIMEOUT, Long.MIN_VALUE); + return jobConfiguration.get(INITIAL_CLIENT_HEARTBEAT_TIMEOUT); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index c0e4214d41e..b44c5a25553 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -100,7 +100,8 @@ public class StreamConfig implements Serializable { private static final String INPUTS = "inputs"; private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out"; private static final String TYPE_SERIALIZER_SIDEOUT_PREFIX = "typeSerializer_sideout_"; - private static final String ITERATON_WAIT = "iterationWait"; + private static final ConfigOption<Long> ITERATON_WAIT = + ConfigOptions.key("iterationWait").longType().defaultValue(0L); private static final String OP_NONCHAINED_OUTPUTS = "opNonChainedOutputs"; private static final String VERTEX_NONCHAINED_OUTPUTS = "vertexNonChainedOutputs"; private static final String IN_STREAM_EDGES = "inStreamEdges"; @@ -447,11 +448,11 @@ public class StreamConfig implements Serializable { } public void setIterationWaitTime(long time) { - config.setLong(ITERATON_WAIT, time); + config.set(ITERATON_WAIT, time); } public long getIterationWaitTime() { - return config.getLong(ITERATON_WAIT, 0); + return config.get(ITERATON_WAIT); } public void setNumberOfNetworkInputs(int numberOfInputs) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java index 257c309e3a4..d8e91b6b3ba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java @@ -37,6 +37,7 @@ import java.util.Arrays; import java.util.Map; import java.util.function.Consumer; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; import static org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.TM_LEGACY_HEAP_OPTIONS; import static org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.TM_PROCESS_MEMORY_OPTIONS; import static org.assertj.core.api.Assertions.assertThat; @@ -635,9 +636,11 @@ class TaskExecutorProcessUtilsTest extends ProcessMemoryUtilsTestBase<TaskExecut final Configuration config = new Configuration(); config.setString( ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), EXTERNAL_RESOURCE_NAME_1); - config.setLong( - ExternalResourceOptions.getAmountConfigOptionForResource(EXTERNAL_RESOURCE_NAME_1), - 1); + config.set( + getLongConfigOption( + ExternalResourceOptions.getAmountConfigOptionForResource( + EXTERNAL_RESOURCE_NAME_1)), + 1L); config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(4096)); final TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromConfig(config); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java index f3895a3e5f5..5f3b38060b0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java @@ -38,6 +38,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.core.Is.is; @@ -228,8 +229,9 @@ public class ExternalResourceUtilsTest extends TestLogger { config.set( ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); - config.setLong( - ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), + config.set( + getLongConfigOption( + ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1)), RESOURCE_AMOUNT_1); final Map<String, Long> externalResourceAmountMap = @@ -246,8 +248,10 @@ public class ExternalResourceUtilsTest extends TestLogger { config.set( ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); - config.setLong( - ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), 0); + config.set( + getLongConfigOption( + ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1)), + 0L); final Map<String, Long> externalResourceAmountMap = ExternalResourceUtils.getExternalResourceAmountMap(config); @@ -261,8 +265,9 @@ public class ExternalResourceUtilsTest extends TestLogger { config.set( ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); - config.setLong( - ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), + config.set( + getLongConfigOption( + ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1)), RESOURCE_AMOUNT_1); final Collection<ExternalResource> externalResources = @@ -279,8 +284,9 @@ public class ExternalResourceUtilsTest extends TestLogger { final Configuration config = new Configuration(); config.setString( ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), ExternalResourceOptions.NONE); - config.setLong( - ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), + config.set( + getLongConfigOption( + ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1)), RESOURCE_AMOUNT_1); final Collection<ExternalResource> externalResources = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java index 79d458073e6..0bd1d4264fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.junit.jupiter.api.Test; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link WorkerResourceSpec}. */ @@ -267,9 +268,11 @@ class WorkerResourceSpecTest { final Configuration config = new Configuration(); config.setString( ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), EXTERNAL_RESOURCE_NAME); - config.setLong( - ExternalResourceOptions.getAmountConfigOptionForResource(EXTERNAL_RESOURCE_NAME), - 1); + config.set( + getLongConfigOption( + ExternalResourceOptions.getAmountConfigOptionForResource( + EXTERNAL_RESOURCE_NAME)), + 1L); final TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.newProcessSpecBuilder(config) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 4aae1d19414..45dfcd32c0d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -917,8 +917,8 @@ public class TaskTest extends TestLogger { final TaskManagerActions taskManagerActions = new ProhibitFatalErrorTaskManagerActions(); final Configuration config = new Configuration(); - config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5); - config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 60 * 1000); + config.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, Duration.ofMillis(5)); + config.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, Duration.ofMillis(60 * 1000)); final Task task = createTaskBuilder()
