This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ed07a70e45a [FLINK-33448][runtime] Introduce the new configuration 
'taskmanager.load-balance.mode' (#23658)
ed07a70e45a is described below

commit ed07a70e45a753a259287c9fdbdae73e3415cc21
Author: Yuepeng Pan <[email protected]>
AuthorDate: Tue Nov 7 17:54:47 2023 +0800

    [FLINK-33448][runtime] Introduce the new configuration 
'taskmanager.load-balance.mode' (#23658)
---
 .../generated/all_taskmanager_section.html         |  6 ++
 .../generated/cluster_configuration.html           |  6 --
 .../generated/expert_scheduling_section.html       | 12 ++--
 .../generated/task_manager_configuration.html      |  6 ++
 .../apache/flink/configuration/ClusterOptions.java |  7 ++
 .../flink/configuration/TaskManagerOptions.java    | 52 +++++++++++++++
 .../TaskManagerLoadBalanceModeTest.java            | 76 ++++++++++++++++++++++
 .../ResourceManagerRuntimeServices.java            |  2 +-
 .../DefaultResourceAllocationStrategy.java         |  5 +-
 .../slotmanager/SlotManagerConfiguration.java      | 21 +++---
 .../runtime/util/SlotSelectionStrategyUtils.java   | 10 +--
 .../slotmanager/DeclarativeSlotManagerBuilder.java | 15 +++--
 .../slotmanager/DeclarativeSlotManagerTest.java    |  3 +-
 .../DefaultResourceAllocationStrategyTest.java     | 19 +++---
 ...gerDefaultResourceAllocationStrategyITCase.java |  2 +-
 .../SlotManagerConfigurationBuilder.java           | 15 +++--
 16 files changed, 206 insertions(+), 51 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_section.html 
b/docs/layouts/shortcodes/generated/all_taskmanager_section.html
index bde8c250bdf..256672be9b5 100644
--- a/docs/layouts/shortcodes/generated/all_taskmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_taskmanager_section.html
@@ -74,6 +74,12 @@
             <td>Boolean</td>
             <td>Whether to kill the TaskManager when the task thread throws an 
OutOfMemoryError.</td>
         </tr>
+        <tr>
+            <td><h5>taskmanager.load-balance.mode</h5></td>
+            <td style="word-wrap: break-word;">NONE</td>
+            <td><p>Enum</p></td>
+            <td>Mode for the load-balance allocation strategy across all 
available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code 
class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots 
evenly across all available <code 
class="highlighter-rouge">TaskManagers</code>.</li><li>The <code 
class="highlighter-rouge">NONE</code> mode is the default mode without any 
specified strategy.</li></ul><br /><br />Possible 
values:<ul><li>"NONE"</li><li>"SLOTS"</li [...]
+        </tr>
         <tr>
             <td><h5>taskmanager.memory.min-segment-size</h5></td>
             <td style="word-wrap: break-word;">256 bytes</td>
diff --git a/docs/layouts/shortcodes/generated/cluster_configuration.html 
b/docs/layouts/shortcodes/generated/cluster_configuration.html
index d734ea405f1..49b6af5f659 100644
--- a/docs/layouts/shortcodes/generated/cluster_configuration.html
+++ b/docs/layouts/shortcodes/generated/cluster_configuration.html
@@ -8,12 +8,6 @@
         </tr>
     </thead>
     <tbody>
-        <tr>
-            <td><h5>cluster.evenly-spread-out-slots</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Enable the slot spread out allocation strategy. This strategy 
tries to spread out the slots evenly across all available <code 
class="highlighter-rouge">TaskExecutors</code>.</td>
-        </tr>
         <tr>
             <td><h5>cluster.intercept-user-system-exit</h5></td>
             <td style="word-wrap: break-word;">DISABLED</td>
diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html 
b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index 2c07a95372f..96fc3056099 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -8,12 +8,6 @@
         </tr>
     </thead>
     <tbody>
-        <tr>
-            <td><h5>cluster.evenly-spread-out-slots</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Enable the slot spread out allocation strategy. This strategy 
tries to spread out the slots evenly across all available <code 
class="highlighter-rouge">TaskExecutors</code>.</td>
-        </tr>
         <tr>
             
<td><h5>execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task</h5></td>
             <td style="word-wrap: break-word;">16 mb</td>
@@ -188,5 +182,11 @@
             <td>Double</td>
             <td>The finished execution ratio threshold to calculate the slow 
tasks detection baseline. Given that the parallelism is N and the ratio is R, 
define T as the median of the first N*R finished tasks' execution time. The 
baseline will be T*M, where M is the multiplier of the baseline. Note that the 
execution time will be weighted with the task's input bytes to ensure the 
accuracy of the detection if data skew occurs.</td>
         </tr>
+        <tr>
+            <td><h5>taskmanager.load-balance.mode</h5></td>
+            <td style="word-wrap: break-word;">NONE</td>
+            <td><p>Enum</p></td>
+            <td>Mode for the load-balance allocation strategy across all 
available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code 
class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots 
evenly across all available <code 
class="highlighter-rouge">TaskManagers</code>.</li><li>The <code 
class="highlighter-rouge">NONE</code> mode is the default mode without any 
specified strategy.</li></ul><br /><br />Possible 
values:<ul><li>"NONE"</li><li>"SLOTS"</li [...]
+        </tr>
     </tbody>
 </table>
diff --git a/docs/layouts/shortcodes/generated/task_manager_configuration.html 
b/docs/layouts/shortcodes/generated/task_manager_configuration.html
index 6f2c12d5df3..61e01e96bba 100644
--- a/docs/layouts/shortcodes/generated/task_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/task_manager_configuration.html
@@ -56,6 +56,12 @@
             <td>Boolean</td>
             <td>Whether to kill the TaskManager when the task thread throws an 
OutOfMemoryError.</td>
         </tr>
+        <tr>
+            <td><h5>taskmanager.load-balance.mode</h5></td>
+            <td style="word-wrap: break-word;">NONE</td>
+            <td><p>Enum</p></td>
+            <td>Mode for the load-balance allocation strategy across all 
available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code 
class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots 
evenly across all available <code 
class="highlighter-rouge">TaskManagers</code>.</li><li>The <code 
class="highlighter-rouge">NONE</code> mode is the default mode without any 
specified strategy.</li></ul><br /><br />Possible 
values:<ul><li>"NONE"</li><li>"SLOTS"</li [...]
+        </tr>
         <tr>
             <td><h5>taskmanager.network.bind-policy</h5></td>
             <td style="word-wrap: break-word;">"ip"</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
index 58150beb4b7..c0b9ca8459e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
@@ -84,6 +84,13 @@ public class ClusterOptions {
                                     + "By default it will use 4 * the number 
of CPU cores (hardware contexts) that the cluster process has access to. "
                                     + "Increasing the pool size allows to run 
more IO operations concurrently.");
 
+    /**
+     * @deprecated Please use {@link 
TaskManagerOptions#TASK_MANAGER_LOAD_BALANCE_MODE} instead.
+     *     Note: The 'taskmanager.load-balance.mode: SLOTS' is equal to
+     *     'cluster.evenly-spread-out-slots: true'. The 
'taskmanager.load-balance.mode: NONE' is
+     *     equal to 'cluster.evenly-spread-out-slots: false'.
+     */
+    @Deprecated
     @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
     public static final ConfigOption<Boolean> EVENLY_SPREAD_OUT_SLOTS_STRATEGY 
=
             ConfigOptions.key("cluster.evenly-spread-out-slots")
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 8f4a836d4f3..0e4bc7f7d15 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -25,9 +25,12 @@ import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.util.TimeUtils;
 
+import javax.annotation.Nonnull;
+
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.apache.flink.configuration.description.TextElement.code;
@@ -708,6 +711,55 @@ public class TaskManagerOptions {
                             "Time we wait for the timers in milliseconds to 
finish all pending timer threads"
                                     + " when the stream task is cancelled.");
 
+    @Documentation.Section({
+        Documentation.Sections.EXPERT_SCHEDULING,
+        Documentation.Sections.ALL_TASK_MANAGER
+    })
+    public static final ConfigOption<TaskManagerLoadBalanceMode> 
TASK_MANAGER_LOAD_BALANCE_MODE =
+            ConfigOptions.key("taskmanager.load-balance.mode")
+                    .enumType(TaskManagerLoadBalanceMode.class)
+                    .defaultValue(TaskManagerLoadBalanceMode.NONE)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Mode for the load-balance 
allocation strategy across all available %s.",
+                                            code("TaskManagers"))
+                                    .list(
+                                            text(
+                                                    "The %s mode tries to 
spread out the slots evenly across all available %s.",
+                                                    
code(TaskManagerLoadBalanceMode.SLOTS.name()),
+                                                    code("TaskManagers")),
+                                            text(
+                                                    "The %s mode is the 
default mode without any specified strategy.",
+                                                    
code(TaskManagerLoadBalanceMode.NONE.name())))
+                                    .build());
+
+    /** Type of {@link TaskManagerOptions#TASK_MANAGER_LOAD_BALANCE_MODE}. */
+    public enum TaskManagerLoadBalanceMode {
+        NONE,
+        SLOTS;
+
+        /**
+         * The method is mainly to load the {@link
+         * TaskManagerOptions#TASK_MANAGER_LOAD_BALANCE_MODE} from {@link 
Configuration}, which is
+         * compatible with {@link 
ClusterOptions#EVENLY_SPREAD_OUT_SLOTS_STRATEGY}.
+         */
+        public static TaskManagerLoadBalanceMode loadFromConfiguration(
+                @Nonnull Configuration configuration) {
+            Optional<TaskManagerLoadBalanceMode> 
taskManagerLoadBalanceModeOptional =
+                    
configuration.getOptional(TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE);
+            if (taskManagerLoadBalanceModeOptional.isPresent()) {
+                return taskManagerLoadBalanceModeOptional.get();
+            }
+            boolean evenlySpreadOutSlots =
+                    
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
+
+            return evenlySpreadOutSlots
+                    ? TaskManagerLoadBalanceMode.SLOTS
+                    : 
TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE.defaultValue();
+        }
+    }
+
     // ------------------------------------------------------------------------
 
     /** Not intended to be instantiated. */
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/TaskManagerLoadBalanceModeTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/TaskManagerLoadBalanceModeTest.java
new file mode 100644
index 00000000000..4ffae33b080
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/TaskManagerLoadBalanceModeTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.flink.configuration.ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY;
+import static 
org.apache.flink.configuration.TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE;
+import static 
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link TaskManagerLoadBalanceMode}. */
+class TaskManagerLoadBalanceModeTest {
+
+    @Test
+    void testReadTaskManagerLoadBalanceMode() {
+        // Check for non-set 'taskmanager.load-balance.mode' and
+        // 'cluster.evenly-spread-out-slots: false'
+        Configuration conf1 = new Configuration();
+        assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf1))
+                .isEqualTo(TASK_MANAGER_LOAD_BALANCE_MODE.defaultValue());
+
+        // Check for non-set 'taskmanager.load-balance.mode' and
+        // 'cluster.evenly-spread-out-slots: true'
+        Configuration conf2 = new Configuration();
+        conf2.set(EVENLY_SPREAD_OUT_SLOTS_STRATEGY, true);
+        assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf2))
+                .isEqualTo(TaskManagerLoadBalanceMode.SLOTS);
+
+        // Check for setting manually 'taskmanager.load-balance.mode: NONE' and
+        // 'cluster.evenly-spread-out-slots: false'
+        Configuration conf3 = new Configuration();
+        conf3.set(TASK_MANAGER_LOAD_BALANCE_MODE, 
TaskManagerLoadBalanceMode.NONE);
+        assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf3))
+                .isEqualTo(TaskManagerLoadBalanceMode.NONE);
+
+        // Check for setting manually 'taskmanager.load-balance.mode: NONE' and
+        // 'cluster.evenly-spread-out-slots: true'
+        Configuration conf4 = new Configuration();
+        conf4.set(TASK_MANAGER_LOAD_BALANCE_MODE, 
TaskManagerLoadBalanceMode.NONE);
+        conf4.set(EVENLY_SPREAD_OUT_SLOTS_STRATEGY, true);
+        assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf4))
+                .isEqualTo(TaskManagerLoadBalanceMode.NONE);
+
+        // Check for setting manually 'taskmanager.load-balance.mode: SLOTS' 
and
+        // 'cluster.evenly-spread-out-slots: false'
+        Configuration conf5 = new Configuration();
+        conf5.set(TASK_MANAGER_LOAD_BALANCE_MODE, 
TaskManagerLoadBalanceMode.SLOTS);
+        assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf5))
+                .isEqualTo(TaskManagerLoadBalanceMode.SLOTS);
+
+        // Check for setting manually 'taskmanager.load-balance.mode: SLOTS' 
and
+        // 'cluster.evenly-spread-out-slots: true'
+        Configuration conf6 = new Configuration();
+        conf6.set(TASK_MANAGER_LOAD_BALANCE_MODE, 
TaskManagerLoadBalanceMode.SLOTS);
+        conf6.set(EVENLY_SPREAD_OUT_SLOTS_STRATEGY, true);
+        assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf6))
+                .isEqualTo(TaskManagerLoadBalanceMode.SLOTS);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
index 1a18364c207..1368748fb9c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
@@ -90,7 +90,7 @@ public class ResourceManagerRuntimeServices {
                             
SlotManagerUtils.generateTaskManagerTotalResourceProfile(
                                     
slotManagerConfiguration.getDefaultWorkerResourceSpec()),
                             slotManagerConfiguration.getNumSlotsPerWorker(),
-                            slotManagerConfiguration.isEvenlySpreadOutSlots(),
+                            
slotManagerConfiguration.getTaskManagerLoadBalanceMode(),
                             slotManagerConfiguration.getTaskManagerTimeout(),
                             
slotManagerConfiguration.getRedundantTaskManagerNum(),
                             slotManagerConfiguration.getMinTotalCpu(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
index 9f12c46972c..62f31480b4a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
@@ -40,6 +40,7 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
 import static 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerUtils.getEffectiveResourceProfile;
 
 /**
@@ -84,7 +85,7 @@ public class DefaultResourceAllocationStrategy implements 
ResourceAllocationStra
     public DefaultResourceAllocationStrategy(
             ResourceProfile totalResourceProfile,
             int numSlotsPerWorker,
-            boolean evenlySpreadOutSlots,
+            TaskManagerLoadBalanceMode taskManagerLoadBalanceMode,
             Time taskManagerTimeout,
             int redundantTaskManagerNum,
             CPUResource minTotalCPU,
@@ -95,7 +96,7 @@ public class DefaultResourceAllocationStrategy implements 
ResourceAllocationStra
                 SlotManagerUtils.generateDefaultSlotResourceProfile(
                         totalResourceProfile, numSlotsPerWorker);
         this.availableResourceMatchingStrategy =
-                evenlySpreadOutSlots
+                taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS
                         ? LeastUtilizationResourceMatchingStrategy.INSTANCE
                         : AnyMatchingResourceMatchingStrategy.INSTANCE;
         this.taskManagerTimeout = taskManagerTimeout;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
index 330a65b7834..a41b0c16ef5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 import org.apache.flink.api.common.resources.CPUResource;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ResourceManagerOptions;
@@ -33,6 +32,8 @@ import org.apache.flink.util.Preconditions;
 import java.math.RoundingMode;
 import java.time.Duration;
 
+import static 
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+
 /** Configuration for the {@link SlotManager}. */
 public class SlotManagerConfiguration {
     private final Time taskManagerRequestTimeout;
@@ -41,7 +42,7 @@ public class SlotManagerConfiguration {
     private final Duration declareNeededResourceDelay;
     private final boolean waitResultConsumedBeforeRelease;
     private final SlotMatchingStrategy slotMatchingStrategy;
-    private final boolean evenlySpreadOutSlots;
+    private final TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
     private final WorkerResourceSpec defaultWorkerResourceSpec;
     private final int numSlotsPerWorker;
     private final int minSlotNum;
@@ -59,7 +60,7 @@ public class SlotManagerConfiguration {
             Duration declareNeededResourceDelay,
             boolean waitResultConsumedBeforeRelease,
             SlotMatchingStrategy slotMatchingStrategy,
-            boolean evenlySpreadOutSlots,
+            TaskManagerLoadBalanceMode taskManagerLoadBalanceMode,
             WorkerResourceSpec defaultWorkerResourceSpec,
             int numSlotsPerWorker,
             int minSlotNum,
@@ -76,7 +77,7 @@ public class SlotManagerConfiguration {
         this.declareNeededResourceDelay = 
Preconditions.checkNotNull(declareNeededResourceDelay);
         this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease;
         this.slotMatchingStrategy = 
Preconditions.checkNotNull(slotMatchingStrategy);
-        this.evenlySpreadOutSlots = evenlySpreadOutSlots;
+        this.taskManagerLoadBalanceMode = taskManagerLoadBalanceMode;
         this.defaultWorkerResourceSpec = 
Preconditions.checkNotNull(defaultWorkerResourceSpec);
         Preconditions.checkState(numSlotsPerWorker > 0);
         this.numSlotsPerWorker = numSlotsPerWorker;
@@ -199,8 +200,8 @@ public class SlotManagerConfiguration {
         return slotMatchingStrategy;
     }
 
-    public boolean isEvenlySpreadOutSlots() {
-        return evenlySpreadOutSlots;
+    public TaskManagerLoadBalanceMode getTaskManagerLoadBalanceMode() {
+        return taskManagerLoadBalanceMode;
     }
 
     public WorkerResourceSpec getDefaultWorkerResourceSpec() {
@@ -260,10 +261,10 @@ public class SlotManagerConfiguration {
                 configuration.getBoolean(
                         
ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED);
 
-        boolean evenlySpreadOutSlots =
-                
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
+        TaskManagerLoadBalanceMode taskManagerLoadBalanceMode =
+                
TaskManagerLoadBalanceMode.loadFromConfiguration(configuration);
         final SlotMatchingStrategy slotMatchingStrategy =
-                evenlySpreadOutSlots
+                taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS
                         ? LeastUtilizationSlotMatchingStrategy.INSTANCE
                         : AnyMatchingSlotMatchingStrategy.INSTANCE;
 
@@ -282,7 +283,7 @@ public class SlotManagerConfiguration {
                 declareNeededResourceDelay,
                 waitResultConsumedBeforeRelease,
                 slotMatchingStrategy,
-                evenlySpreadOutSlots,
+                taskManagerLoadBalanceMode,
                 defaultWorkerResourceSpec,
                 numSlotsPerWorker,
                 minSlotNum,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtils.java
index 3eb7a9f20f3..e4693fa353b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtils.java
@@ -20,8 +20,8 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobType;
 import 
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
 import 
org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
@@ -30,6 +30,8 @@ import 
org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+
 /** Utility class for selecting {@link SlotSelectionStrategy}. */
 public class SlotSelectionStrategyUtils {
 
@@ -37,13 +39,13 @@ public class SlotSelectionStrategyUtils {
 
     public static SlotSelectionStrategy selectSlotSelectionStrategy(
             final JobType jobType, final Configuration configuration) {
-        final boolean evenlySpreadOutSlots =
-                
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
+        TaskManagerLoadBalanceMode taskManagerLoadBalanceMode =
+                
TaskManagerOptions.TaskManagerLoadBalanceMode.loadFromConfiguration(configuration);
 
         final SlotSelectionStrategy locationPreferenceSlotSelectionStrategy;
 
         locationPreferenceSlotSelectionStrategy =
-                evenlySpreadOutSlots
+                taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS
                         ? 
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut()
                         : 
LocationPreferenceSlotSelectionStrategy.createDefault();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
index 57492af56f1..aaa240a2b6d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
@@ -34,9 +34,11 @@ import org.apache.flink.util.concurrent.ScheduledExecutor;
 import java.time.Duration;
 import java.util.concurrent.Executor;
 
+import static 
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+
 /** Builder for {@link DeclarativeSlotManager}. */
 public class DeclarativeSlotManagerBuilder {
-    private boolean evenlySpreadOutSlots;
+    private TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
     private final ScheduledExecutor scheduledExecutor;
     private Time taskManagerRequestTimeout;
     private Time taskManagerTimeout;
@@ -53,7 +55,7 @@ public class DeclarativeSlotManagerBuilder {
     private Duration declareNeededResourceDelay;
 
     private DeclarativeSlotManagerBuilder(ScheduledExecutor scheduledExecutor) 
{
-        this.evenlySpreadOutSlots = false;
+        this.taskManagerLoadBalanceMode = TaskManagerLoadBalanceMode.NONE;
         this.scheduledExecutor = scheduledExecutor;
         this.taskManagerRequestTimeout = TestingUtils.infiniteTime();
         this.taskManagerTimeout = TestingUtils.infiniteTime();
@@ -93,8 +95,9 @@ public class DeclarativeSlotManagerBuilder {
         return this;
     }
 
-    public DeclarativeSlotManagerBuilder setEvenlySpreadOutSlots(boolean 
evenlySpreadOutSlots) {
-        this.evenlySpreadOutSlots = evenlySpreadOutSlots;
+    public DeclarativeSlotManagerBuilder setTaskManagerLoadBalanceMode(
+            TaskManagerLoadBalanceMode taskManagerLoadBalanceMode) {
+        this.taskManagerLoadBalanceMode = taskManagerLoadBalanceMode;
         return this;
     }
 
@@ -159,10 +162,10 @@ public class DeclarativeSlotManagerBuilder {
                         requirementCheckDelay,
                         declareNeededResourceDelay,
                         waitResultConsumedBeforeRelease,
-                        evenlySpreadOutSlots
+                        taskManagerLoadBalanceMode == 
TaskManagerLoadBalanceMode.SLOTS
                                 ? LeastUtilizationSlotMatchingStrategy.INSTANCE
                                 : AnyMatchingSlotMatchingStrategy.INSTANCE,
-                        evenlySpreadOutSlots,
+                        taskManagerLoadBalanceMode,
                         defaultWorkerResourceSpec,
                         numSlotsPerWorker,
                         minSlotNum,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 339509125eb..5bbb678b343 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -81,6 +81,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
+import static 
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link DeclarativeSlotManager}. */
@@ -1121,7 +1122,7 @@ class DeclarativeSlotManagerTest {
     void testSpreadOutSlotAllocationStrategy() throws Exception {
         try (DeclarativeSlotManager slotManager =
                 createDeclarativeSlotManagerBuilder()
-                        .setEvenlySpreadOutSlots(true)
+                        
.setTaskManagerLoadBalanceMode(TaskManagerLoadBalanceMode.SLOTS)
                         .buildAndStartWithDirectExec()) {
 
             final List<CompletableFuture<JobID>> requestSlotFutures = new 
ArrayList<>();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
index be46dfe6fd0..b4e61a89e8e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
@@ -36,6 +36,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link DefaultResourceAllocationStrategy}. */
@@ -44,9 +45,10 @@ class DefaultResourceAllocationStrategyTest {
             ResourceProfile.fromResources(1, 100);
     private static final int NUM_OF_SLOTS = 5;
     private static final DefaultResourceAllocationStrategy 
ANY_MATCHING_STRATEGY =
-            createStrategy(false);
+            createStrategy(TaskManagerLoadBalanceMode.NONE);
 
-    private static final DefaultResourceAllocationStrategy EVENLY_STRATEGY = 
createStrategy(true);
+    private static final DefaultResourceAllocationStrategy EVENLY_STRATEGY =
+            createStrategy(TaskManagerLoadBalanceMode.SLOTS);
 
     @Test
     void testFulfillRequirementWithRegisteredResources() {
@@ -694,20 +696,21 @@ class DefaultResourceAllocationStrategyTest {
                 .hasSize(pendingTaskManagersToAllocate);
     }
 
-    private static DefaultResourceAllocationStrategy createStrategy(boolean 
evenlySpreadOutSlots) {
-        return createStrategy(evenlySpreadOutSlots, 0);
+    private static DefaultResourceAllocationStrategy createStrategy(
+            TaskManagerLoadBalanceMode taskManagerLoadBalanceMode) {
+        return createStrategy(taskManagerLoadBalanceMode, 0);
     }
 
     private static DefaultResourceAllocationStrategy createStrategy(int 
redundantTaskManagerNum) {
-        return createStrategy(false, redundantTaskManagerNum);
+        return createStrategy(TaskManagerLoadBalanceMode.NONE, 
redundantTaskManagerNum);
     }
 
     private static DefaultResourceAllocationStrategy createStrategy(
-            boolean evenlySpreadOutSlots, int redundantTaskManagerNum) {
+            TaskManagerLoadBalanceMode taskManagerLoadBalanceMode, int 
redundantTaskManagerNum) {
         return new DefaultResourceAllocationStrategy(
                 DEFAULT_SLOT_RESOURCE.multiply(NUM_OF_SLOTS),
                 NUM_OF_SLOTS,
-                evenlySpreadOutSlots,
+                taskManagerLoadBalanceMode,
                 Time.milliseconds(0),
                 redundantTaskManagerNum,
                 new CPUResource(0.0),
@@ -719,7 +722,7 @@ class DefaultResourceAllocationStrategyTest {
         return new DefaultResourceAllocationStrategy(
                 DEFAULT_SLOT_RESOURCE.multiply(NUM_OF_SLOTS),
                 NUM_OF_SLOTS,
-                false,
+                TaskManagerLoadBalanceMode.NONE,
                 Time.milliseconds(0),
                 0,
                 minRequiredCPU,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
index f4e91a6ac70..d0eaeb5020d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
@@ -55,7 +55,7 @@ class 
FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
                 new DefaultResourceAllocationStrategy(
                         DEFAULT_TOTAL_RESOURCE_PROFILE,
                         DEFAULT_NUM_SLOTS_PER_WORKER,
-                        slotManagerConfiguration.isEvenlySpreadOutSlots(),
+                        
slotManagerConfiguration.getTaskManagerLoadBalanceMode(),
                         slotManagerConfiguration.getTaskManagerTimeout(),
                         slotManagerConfiguration.getRedundantTaskManagerNum(),
                         slotManagerConfiguration.getMinTotalCpu(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
index d5d70e164eb..260592d44dc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
@@ -27,6 +27,8 @@ import org.apache.flink.testutils.TestingUtils;
 
 import java.time.Duration;
 
+import static 
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+
 /** Builder for {@link SlotManagerConfiguration}. */
 public class SlotManagerConfigurationBuilder {
     private Time taskManagerRequestTimeout;
@@ -43,7 +45,7 @@ public class SlotManagerConfigurationBuilder {
     private MemorySize minTotalMem;
     private MemorySize maxTotalMem;
     private int redundantTaskManagerNum;
-    private boolean evenlySpreadOutSlots;
+    private TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
 
     private SlotManagerConfigurationBuilder() {
         this.taskManagerRequestTimeout = TestingUtils.infiniteTime();
@@ -62,7 +64,7 @@ public class SlotManagerConfigurationBuilder {
         this.maxTotalMem = MemorySize.MAX_VALUE;
         this.redundantTaskManagerNum =
                 
ResourceManagerOptions.REDUNDANT_TASK_MANAGER_NUM.defaultValue();
-        this.evenlySpreadOutSlots = false;
+        this.taskManagerLoadBalanceMode = TaskManagerLoadBalanceMode.NONE;
     }
 
     public static SlotManagerConfigurationBuilder newBuilder() {
@@ -141,8 +143,9 @@ public class SlotManagerConfigurationBuilder {
         return this;
     }
 
-    public SlotManagerConfigurationBuilder setEvenlySpreadOutSlots(boolean 
evenlySpreadOutSlots) {
-        this.evenlySpreadOutSlots = evenlySpreadOutSlots;
+    public SlotManagerConfigurationBuilder setTaskManagerLoadBalanceMode(
+            TaskManagerLoadBalanceMode taskManagerLoadBalanceMode) {
+        this.taskManagerLoadBalanceMode = taskManagerLoadBalanceMode;
         return this;
     }
 
@@ -153,10 +156,10 @@ public class SlotManagerConfigurationBuilder {
                 requirementCheckDelay,
                 declareNeededResourceDelay,
                 waitResultConsumedBeforeRelease,
-                evenlySpreadOutSlots
+                taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS
                         ? LeastUtilizationSlotMatchingStrategy.INSTANCE
                         : AnyMatchingSlotMatchingStrategy.INSTANCE,
-                evenlySpreadOutSlots,
+                taskManagerLoadBalanceMode,
                 defaultWorkerResourceSpec,
                 numSlotsPerWorker,
                 minSlotNum,


Reply via email to