This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a008f25b87130a4320f0c8a46f3f9cd1ad09f503 Author: Weijie Guo <[email protected]> AuthorDate: Mon Jun 5 21:04:38 2023 +0800 [FLINK-32254][runtime] FineGrainedSlotManager may not allocate enough taskmanagers if maxSlotNum is configured This closes #22714 --- .../slotmanager/SlotManagerConfiguration.java | 13 +++++-- .../slotmanager/SlotManagerConfigurationTest.java | 44 ++++++++++++++++++++++ 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java index 3329773b375..fe8442ee04f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java @@ -228,8 +228,8 @@ public class SlotManagerConfiguration { ? new CPUResource(Double.MAX_VALUE) : defaultWorkerResourceSpec .getCpuCores() - .divide(defaultWorkerResourceSpec.getNumSlots()) - .multiply(maxSlotNum)); + .multiply(maxSlotNum) + .divide(defaultWorkerResourceSpec.getNumSlots())); } private static MemorySize getMaxTotalMem( @@ -244,7 +244,12 @@ public class SlotManagerConfiguration { ? MemorySize.MAX_VALUE : defaultWorkerResourceSpec .getTotalMemSize() - .divide(defaultWorkerResourceSpec.getNumSlots()) - .multiply(maxSlotNum)); + // In theory, there is a possibility of long + // overflow here. However, in actual scenarios, for + // a 1TB of TM memory and a very large number of + // maxSlotNum (e.g. 1_000_000), there is still no + // overflow. + .multiply(maxSlotNum) + .divide(defaultWorkerResourceSpec.getNumSlots())); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java index 782392e6378..136a48d2bad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java @@ -20,11 +20,14 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; import org.junit.jupiter.api.Test; +import java.math.BigDecimal; + import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link SlotManagerConfiguration}. */ @@ -64,4 +67,45 @@ class SlotManagerConfigurationTest { assertThat(legacySlotIdleTimeout) .isEqualTo(slotManagerConfiguration.getSlotRequestTimeout().toMilliseconds()); } + + @Test + void testComputeMaxTotalCpu() throws Exception { + final Configuration configuration = new Configuration(); + final int maxSlotNum = 9; + final int numSlots = 3; + final double cpuCores = 10; + configuration.set(ResourceManagerOptions.MAX_SLOT_NUM, maxSlotNum); + final SlotManagerConfiguration slotManagerConfiguration = + SlotManagerConfiguration.fromConfiguration( + configuration, + new WorkerResourceSpec.Builder() + .setNumSlots(numSlots) + .setCpuCores(cpuCores) + .build()); + assertThat(slotManagerConfiguration.getMaxTotalCpu().getValue().doubleValue()) + .isEqualTo(cpuCores * maxSlotNum / numSlots); + } + + @Test + void testComputeMaxTotalMemory() throws Exception { + final Configuration configuration = new Configuration(); + final int maxSlotNum = 1_000_000; + final int numSlots = 10; + final int totalTaskManagerMB = + MemorySize.parse("1", MemorySize.MemoryUnit.TERA_BYTES).getMebiBytes(); + configuration.set(ResourceManagerOptions.MAX_SLOT_NUM, maxSlotNum); + final SlotManagerConfiguration slotManagerConfiguration = + SlotManagerConfiguration.fromConfiguration( + configuration, + new WorkerResourceSpec.Builder() + .setNumSlots(numSlots) + .setTaskHeapMemoryMB(totalTaskManagerMB) + .build()); + assertThat(slotManagerConfiguration.getMaxTotalMem().getBytes()) + .isEqualTo( + BigDecimal.valueOf(MemorySize.ofMebiBytes(totalTaskManagerMB).getBytes()) + .multiply(BigDecimal.valueOf(maxSlotNum)) + .divide(BigDecimal.valueOf(numSlots)) + .longValue()); + } }
