This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a008f25b87130a4320f0c8a46f3f9cd1ad09f503
Author: Weijie Guo <[email protected]>
AuthorDate: Mon Jun 5 21:04:38 2023 +0800

    [FLINK-32254][runtime] FineGrainedSlotManager may not allocate enough 
taskmanagers if maxSlotNum is configured
    
    This closes #22714
---
 .../slotmanager/SlotManagerConfiguration.java      | 13 +++++--
 .../slotmanager/SlotManagerConfigurationTest.java  | 44 ++++++++++++++++++++++
 2 files changed, 53 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
index 3329773b375..fe8442ee04f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -228,8 +228,8 @@ public class SlotManagerConfiguration {
                                         ? new CPUResource(Double.MAX_VALUE)
                                         : defaultWorkerResourceSpec
                                                 .getCpuCores()
-                                                
.divide(defaultWorkerResourceSpec.getNumSlots())
-                                                .multiply(maxSlotNum));
+                                                .multiply(maxSlotNum)
+                                                
.divide(defaultWorkerResourceSpec.getNumSlots()));
     }
 
     private static MemorySize getMaxTotalMem(
@@ -244,7 +244,12 @@ public class SlotManagerConfiguration {
                                         ? MemorySize.MAX_VALUE
                                         : defaultWorkerResourceSpec
                                                 .getTotalMemSize()
-                                                
.divide(defaultWorkerResourceSpec.getNumSlots())
-                                                .multiply(maxSlotNum));
+                                                // In theory, there is a 
possibility of long
+                                                // overflow here. However, in 
actual scenarios, for
+                                                // a 1TB of TM memory and a 
very large number of
+                                                // maxSlotNum (e.g. 
1_000_000), there is still no
+                                                // overflow.
+                                                .multiply(maxSlotNum)
+                                                
.divide(defaultWorkerResourceSpec.getNumSlots()));
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java
index 782392e6378..136a48d2bad 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java
@@ -20,11 +20,14 @@ package 
org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 
 import org.junit.jupiter.api.Test;
 
+import java.math.BigDecimal;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link SlotManagerConfiguration}. */
@@ -64,4 +67,45 @@ class SlotManagerConfigurationTest {
         assertThat(legacySlotIdleTimeout)
                 
.isEqualTo(slotManagerConfiguration.getSlotRequestTimeout().toMilliseconds());
     }
+
+    @Test
+    void testComputeMaxTotalCpu() throws Exception {
+        final Configuration configuration = new Configuration();
+        final int maxSlotNum = 9;
+        final int numSlots = 3;
+        final double cpuCores = 10;
+        configuration.set(ResourceManagerOptions.MAX_SLOT_NUM, maxSlotNum);
+        final SlotManagerConfiguration slotManagerConfiguration =
+                SlotManagerConfiguration.fromConfiguration(
+                        configuration,
+                        new WorkerResourceSpec.Builder()
+                                .setNumSlots(numSlots)
+                                .setCpuCores(cpuCores)
+                                .build());
+        
assertThat(slotManagerConfiguration.getMaxTotalCpu().getValue().doubleValue())
+                .isEqualTo(cpuCores * maxSlotNum / numSlots);
+    }
+
+    @Test
+    void testComputeMaxTotalMemory() throws Exception {
+        final Configuration configuration = new Configuration();
+        final int maxSlotNum = 1_000_000;
+        final int numSlots = 10;
+        final int totalTaskManagerMB =
+                MemorySize.parse("1", 
MemorySize.MemoryUnit.TERA_BYTES).getMebiBytes();
+        configuration.set(ResourceManagerOptions.MAX_SLOT_NUM, maxSlotNum);
+        final SlotManagerConfiguration slotManagerConfiguration =
+                SlotManagerConfiguration.fromConfiguration(
+                        configuration,
+                        new WorkerResourceSpec.Builder()
+                                .setNumSlots(numSlots)
+                                .setTaskHeapMemoryMB(totalTaskManagerMB)
+                                .build());
+        assertThat(slotManagerConfiguration.getMaxTotalMem().getBytes())
+                .isEqualTo(
+                        
BigDecimal.valueOf(MemorySize.ofMebiBytes(totalTaskManagerMB).getBytes())
+                                .multiply(BigDecimal.valueOf(maxSlotNum))
+                                .divide(BigDecimal.valueOf(numSlots))
+                                .longValue());
+    }
 }

Reply via email to