This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ed07a70e45a [FLINK-33448][runtime] Introduce the new configuration
'taskmanager.load-balance.mode' (#23658)
ed07a70e45a is described below
commit ed07a70e45a753a259287c9fdbdae73e3415cc21
Author: Yuepeng Pan <[email protected]>
AuthorDate: Tue Nov 7 17:54:47 2023 +0800
[FLINK-33448][runtime] Introduce the new configuration
'taskmanager.load-balance.mode' (#23658)
---
.../generated/all_taskmanager_section.html | 6 ++
.../generated/cluster_configuration.html | 6 --
.../generated/expert_scheduling_section.html | 12 ++--
.../generated/task_manager_configuration.html | 6 ++
.../apache/flink/configuration/ClusterOptions.java | 7 ++
.../flink/configuration/TaskManagerOptions.java | 52 +++++++++++++++
.../TaskManagerLoadBalanceModeTest.java | 76 ++++++++++++++++++++++
.../ResourceManagerRuntimeServices.java | 2 +-
.../DefaultResourceAllocationStrategy.java | 5 +-
.../slotmanager/SlotManagerConfiguration.java | 21 +++---
.../runtime/util/SlotSelectionStrategyUtils.java | 10 +--
.../slotmanager/DeclarativeSlotManagerBuilder.java | 15 +++--
.../slotmanager/DeclarativeSlotManagerTest.java | 3 +-
.../DefaultResourceAllocationStrategyTest.java | 19 +++---
...gerDefaultResourceAllocationStrategyITCase.java | 2 +-
.../SlotManagerConfigurationBuilder.java | 15 +++--
16 files changed, 206 insertions(+), 51 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_section.html
b/docs/layouts/shortcodes/generated/all_taskmanager_section.html
index bde8c250bdf..256672be9b5 100644
--- a/docs/layouts/shortcodes/generated/all_taskmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_taskmanager_section.html
@@ -74,6 +74,12 @@
<td>Boolean</td>
<td>Whether to kill the TaskManager when the task thread throws an
OutOfMemoryError.</td>
</tr>
+ <tr>
+ <td><h5>taskmanager.load-balance.mode</h5></td>
+ <td style="word-wrap: break-word;">NONE</td>
+ <td><p>Enum</p></td>
+ <td>Mode for the load-balance allocation strategy across all
available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code
class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots
evenly across all available <code
class="highlighter-rouge">TaskManagers</code>.</li><li>The <code
class="highlighter-rouge">NONE</code> mode is the default mode without any
specified strategy.</li></ul><br /><br />Possible
values:<ul><li>"NONE"</li><li>"SLOTS"</li [...]
+ </tr>
<tr>
<td><h5>taskmanager.memory.min-segment-size</h5></td>
<td style="word-wrap: break-word;">256 bytes</td>
diff --git a/docs/layouts/shortcodes/generated/cluster_configuration.html
b/docs/layouts/shortcodes/generated/cluster_configuration.html
index d734ea405f1..49b6af5f659 100644
--- a/docs/layouts/shortcodes/generated/cluster_configuration.html
+++ b/docs/layouts/shortcodes/generated/cluster_configuration.html
@@ -8,12 +8,6 @@
</tr>
</thead>
<tbody>
- <tr>
- <td><h5>cluster.evenly-spread-out-slots</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>Enable the slot spread out allocation strategy. This strategy
tries to spread out the slots evenly across all available <code
class="highlighter-rouge">TaskExecutors</code>.</td>
- </tr>
<tr>
<td><h5>cluster.intercept-user-system-exit</h5></td>
<td style="word-wrap: break-word;">DISABLED</td>
diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index 2c07a95372f..96fc3056099 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -8,12 +8,6 @@
</tr>
</thead>
<tbody>
- <tr>
- <td><h5>cluster.evenly-spread-out-slots</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>Enable the slot spread out allocation strategy. This strategy
tries to spread out the slots evenly across all available <code
class="highlighter-rouge">TaskExecutors</code>.</td>
- </tr>
<tr>
<td><h5>execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task</h5></td>
<td style="word-wrap: break-word;">16 mb</td>
@@ -188,5 +182,11 @@
<td>Double</td>
<td>The finished execution ratio threshold to calculate the slow
tasks detection baseline. Given that the parallelism is N and the ratio is R,
define T as the median of the first N*R finished tasks' execution time. The
baseline will be T*M, where M is the multiplier of the baseline. Note that the
execution time will be weighted with the task's input bytes to ensure the
accuracy of the detection if data skew occurs.</td>
</tr>
+ <tr>
+ <td><h5>taskmanager.load-balance.mode</h5></td>
+ <td style="word-wrap: break-word;">NONE</td>
+ <td><p>Enum</p></td>
+ <td>Mode for the load-balance allocation strategy across all
available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code
class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots
evenly across all available <code
class="highlighter-rouge">TaskManagers</code>.</li><li>The <code
class="highlighter-rouge">NONE</code> mode is the default mode without any
specified strategy.</li></ul><br /><br />Possible
values:<ul><li>"NONE"</li><li>"SLOTS"</li [...]
+ </tr>
</tbody>
</table>
diff --git a/docs/layouts/shortcodes/generated/task_manager_configuration.html
b/docs/layouts/shortcodes/generated/task_manager_configuration.html
index 6f2c12d5df3..61e01e96bba 100644
--- a/docs/layouts/shortcodes/generated/task_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/task_manager_configuration.html
@@ -56,6 +56,12 @@
<td>Boolean</td>
<td>Whether to kill the TaskManager when the task thread throws an
OutOfMemoryError.</td>
</tr>
+ <tr>
+ <td><h5>taskmanager.load-balance.mode</h5></td>
+ <td style="word-wrap: break-word;">NONE</td>
+ <td><p>Enum</p></td>
+ <td>Mode for the load-balance allocation strategy across all
available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code
class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots
evenly across all available <code
class="highlighter-rouge">TaskManagers</code>.</li><li>The <code
class="highlighter-rouge">NONE</code> mode is the default mode without any
specified strategy.</li></ul><br /><br />Possible
values:<ul><li>"NONE"</li><li>"SLOTS"</li [...]
+ </tr>
<tr>
<td><h5>taskmanager.network.bind-policy</h5></td>
<td style="word-wrap: break-word;">"ip"</td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
index 58150beb4b7..c0b9ca8459e 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
@@ -84,6 +84,13 @@ public class ClusterOptions {
+ "By default it will use 4 * the number
of CPU cores (hardware contexts) that the cluster process has access to. "
+ "Increasing the pool size allows to run
more IO operations concurrently.");
+ /**
+ * @deprecated Please use {@link
TaskManagerOptions#TASK_MANAGER_LOAD_BALANCE_MODE} instead.
+ * Note: The 'taskmanager.load-balance.mode: SLOTS' is equal to
+ * 'cluster.evenly-spread-out-slots: true'. The
'taskmanager.load-balance.mode: NONE' is
+ * equal to 'cluster.evenly-spread-out-slots: false'.
+ */
+ @Deprecated
@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
public static final ConfigOption<Boolean> EVENLY_SPREAD_OUT_SLOTS_STRATEGY
=
ConfigOptions.key("cluster.evenly-spread-out-slots")
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 8f4a836d4f3..0e4bc7f7d15 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -25,9 +25,12 @@ import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.util.TimeUtils;
+import javax.annotation.Nonnull;
+
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.TextElement.code;
@@ -708,6 +711,55 @@ public class TaskManagerOptions {
"Time we wait for the timers in milliseconds to
finish all pending timer threads"
+ " when the stream task is cancelled.");
+ @Documentation.Section({
+ Documentation.Sections.EXPERT_SCHEDULING,
+ Documentation.Sections.ALL_TASK_MANAGER
+ })
+ public static final ConfigOption<TaskManagerLoadBalanceMode>
TASK_MANAGER_LOAD_BALANCE_MODE =
+ ConfigOptions.key("taskmanager.load-balance.mode")
+ .enumType(TaskManagerLoadBalanceMode.class)
+ .defaultValue(TaskManagerLoadBalanceMode.NONE)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Mode for the load-balance
allocation strategy across all available %s.",
+ code("TaskManagers"))
+ .list(
+ text(
+ "The %s mode tries to
spread out the slots evenly across all available %s.",
+
code(TaskManagerLoadBalanceMode.SLOTS.name()),
+ code("TaskManagers")),
+ text(
+ "The %s mode is the
default mode without any specified strategy.",
+
code(TaskManagerLoadBalanceMode.NONE.name())))
+ .build());
+
+ /** Type of {@link TaskManagerOptions#TASK_MANAGER_LOAD_BALANCE_MODE}. */
+ public enum TaskManagerLoadBalanceMode {
+ NONE,
+ SLOTS;
+
+ /**
+ * The method is mainly to load the {@link
+ * TaskManagerOptions#TASK_MANAGER_LOAD_BALANCE_MODE} from {@link
Configuration}, which is
+ * compatible with {@link
ClusterOptions#EVENLY_SPREAD_OUT_SLOTS_STRATEGY}.
+ */
+ public static TaskManagerLoadBalanceMode loadFromConfiguration(
+ @Nonnull Configuration configuration) {
+ Optional<TaskManagerLoadBalanceMode>
taskManagerLoadBalanceModeOptional =
+
configuration.getOptional(TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE);
+ if (taskManagerLoadBalanceModeOptional.isPresent()) {
+ return taskManagerLoadBalanceModeOptional.get();
+ }
+ boolean evenlySpreadOutSlots =
+
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
+
+ return evenlySpreadOutSlots
+ ? TaskManagerLoadBalanceMode.SLOTS
+ :
TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE.defaultValue();
+ }
+ }
+
// ------------------------------------------------------------------------
/** Not intended to be instantiated. */
diff --git
a/flink-core/src/test/java/org/apache/flink/configuration/TaskManagerLoadBalanceModeTest.java
b/flink-core/src/test/java/org/apache/flink/configuration/TaskManagerLoadBalanceModeTest.java
new file mode 100644
index 00000000000..4ffae33b080
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/configuration/TaskManagerLoadBalanceModeTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.flink.configuration.ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY;
+import static
org.apache.flink.configuration.TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE;
+import static
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link TaskManagerLoadBalanceMode}. */
+class TaskManagerLoadBalanceModeTest {
+
+ @Test
+ void testReadTaskManagerLoadBalanceMode() {
+ // Check for non-set 'taskmanager.load-balance.mode' and
+ // 'cluster.evenly-spread-out-slots: false'
+ Configuration conf1 = new Configuration();
+ assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf1))
+ .isEqualTo(TASK_MANAGER_LOAD_BALANCE_MODE.defaultValue());
+
+ // Check for non-set 'taskmanager.load-balance.mode' and
+ // 'cluster.evenly-spread-out-slots: true'
+ Configuration conf2 = new Configuration();
+ conf2.set(EVENLY_SPREAD_OUT_SLOTS_STRATEGY, true);
+ assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf2))
+ .isEqualTo(TaskManagerLoadBalanceMode.SLOTS);
+
+ // Check for setting manually 'taskmanager.load-balance.mode: NONE' and
+ // 'cluster.evenly-spread-out-slots: false'
+ Configuration conf3 = new Configuration();
+ conf3.set(TASK_MANAGER_LOAD_BALANCE_MODE,
TaskManagerLoadBalanceMode.NONE);
+ assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf3))
+ .isEqualTo(TaskManagerLoadBalanceMode.NONE);
+
+ // Check for setting manually 'taskmanager.load-balance.mode: NONE' and
+ // 'cluster.evenly-spread-out-slots: true'
+ Configuration conf4 = new Configuration();
+ conf4.set(TASK_MANAGER_LOAD_BALANCE_MODE,
TaskManagerLoadBalanceMode.NONE);
+ conf4.set(EVENLY_SPREAD_OUT_SLOTS_STRATEGY, true);
+ assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf4))
+ .isEqualTo(TaskManagerLoadBalanceMode.NONE);
+
+ // Check for setting manually 'taskmanager.load-balance.mode: SLOTS'
and
+ // 'cluster.evenly-spread-out-slots: false'
+ Configuration conf5 = new Configuration();
+ conf5.set(TASK_MANAGER_LOAD_BALANCE_MODE,
TaskManagerLoadBalanceMode.SLOTS);
+ assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf5))
+ .isEqualTo(TaskManagerLoadBalanceMode.SLOTS);
+
+ // Check for setting manually 'taskmanager.load-balance.mode: SLOTS'
and
+ // 'cluster.evenly-spread-out-slots: true'
+ Configuration conf6 = new Configuration();
+ conf6.set(TASK_MANAGER_LOAD_BALANCE_MODE,
TaskManagerLoadBalanceMode.SLOTS);
+ conf6.set(EVENLY_SPREAD_OUT_SLOTS_STRATEGY, true);
+ assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf6))
+ .isEqualTo(TaskManagerLoadBalanceMode.SLOTS);
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
index 1a18364c207..1368748fb9c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
@@ -90,7 +90,7 @@ public class ResourceManagerRuntimeServices {
SlotManagerUtils.generateTaskManagerTotalResourceProfile(
slotManagerConfiguration.getDefaultWorkerResourceSpec()),
slotManagerConfiguration.getNumSlotsPerWorker(),
- slotManagerConfiguration.isEvenlySpreadOutSlots(),
+
slotManagerConfiguration.getTaskManagerLoadBalanceMode(),
slotManagerConfiguration.getTaskManagerTimeout(),
slotManagerConfiguration.getRedundantTaskManagerNum(),
slotManagerConfiguration.getMinTotalCpu(),
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
index 9f12c46972c..62f31480b4a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
@@ -40,6 +40,7 @@ import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
import static
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerUtils.getEffectiveResourceProfile;
/**
@@ -84,7 +85,7 @@ public class DefaultResourceAllocationStrategy implements
ResourceAllocationStra
public DefaultResourceAllocationStrategy(
ResourceProfile totalResourceProfile,
int numSlotsPerWorker,
- boolean evenlySpreadOutSlots,
+ TaskManagerLoadBalanceMode taskManagerLoadBalanceMode,
Time taskManagerTimeout,
int redundantTaskManagerNum,
CPUResource minTotalCPU,
@@ -95,7 +96,7 @@ public class DefaultResourceAllocationStrategy implements
ResourceAllocationStra
SlotManagerUtils.generateDefaultSlotResourceProfile(
totalResourceProfile, numSlotsPerWorker);
this.availableResourceMatchingStrategy =
- evenlySpreadOutSlots
+ taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS
? LeastUtilizationResourceMatchingStrategy.INSTANCE
: AnyMatchingResourceMatchingStrategy.INSTANCE;
this.taskManagerTimeout = taskManagerTimeout;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
index 330a65b7834..a41b0c16ef5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ResourceManagerOptions;
@@ -33,6 +32,8 @@ import org.apache.flink.util.Preconditions;
import java.math.RoundingMode;
import java.time.Duration;
+import static
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+
/** Configuration for the {@link SlotManager}. */
public class SlotManagerConfiguration {
private final Time taskManagerRequestTimeout;
@@ -41,7 +42,7 @@ public class SlotManagerConfiguration {
private final Duration declareNeededResourceDelay;
private final boolean waitResultConsumedBeforeRelease;
private final SlotMatchingStrategy slotMatchingStrategy;
- private final boolean evenlySpreadOutSlots;
+ private final TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
private final WorkerResourceSpec defaultWorkerResourceSpec;
private final int numSlotsPerWorker;
private final int minSlotNum;
@@ -59,7 +60,7 @@ public class SlotManagerConfiguration {
Duration declareNeededResourceDelay,
boolean waitResultConsumedBeforeRelease,
SlotMatchingStrategy slotMatchingStrategy,
- boolean evenlySpreadOutSlots,
+ TaskManagerLoadBalanceMode taskManagerLoadBalanceMode,
WorkerResourceSpec defaultWorkerResourceSpec,
int numSlotsPerWorker,
int minSlotNum,
@@ -76,7 +77,7 @@ public class SlotManagerConfiguration {
this.declareNeededResourceDelay =
Preconditions.checkNotNull(declareNeededResourceDelay);
this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease;
this.slotMatchingStrategy =
Preconditions.checkNotNull(slotMatchingStrategy);
- this.evenlySpreadOutSlots = evenlySpreadOutSlots;
+ this.taskManagerLoadBalanceMode = taskManagerLoadBalanceMode;
this.defaultWorkerResourceSpec =
Preconditions.checkNotNull(defaultWorkerResourceSpec);
Preconditions.checkState(numSlotsPerWorker > 0);
this.numSlotsPerWorker = numSlotsPerWorker;
@@ -199,8 +200,8 @@ public class SlotManagerConfiguration {
return slotMatchingStrategy;
}
- public boolean isEvenlySpreadOutSlots() {
- return evenlySpreadOutSlots;
+ public TaskManagerLoadBalanceMode getTaskManagerLoadBalanceMode() {
+ return taskManagerLoadBalanceMode;
}
public WorkerResourceSpec getDefaultWorkerResourceSpec() {
@@ -260,10 +261,10 @@ public class SlotManagerConfiguration {
configuration.getBoolean(
ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED);
- boolean evenlySpreadOutSlots =
-
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
+ TaskManagerLoadBalanceMode taskManagerLoadBalanceMode =
+
TaskManagerLoadBalanceMode.loadFromConfiguration(configuration);
final SlotMatchingStrategy slotMatchingStrategy =
- evenlySpreadOutSlots
+ taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS
? LeastUtilizationSlotMatchingStrategy.INSTANCE
: AnyMatchingSlotMatchingStrategy.INSTANCE;
@@ -282,7 +283,7 @@ public class SlotManagerConfiguration {
declareNeededResourceDelay,
waitResultConsumedBeforeRelease,
slotMatchingStrategy,
- evenlySpreadOutSlots,
+ taskManagerLoadBalanceMode,
defaultWorkerResourceSpec,
numSlotsPerWorker,
minSlotNum,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtils.java
index 3eb7a9f20f3..e4693fa353b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtils.java
@@ -20,8 +20,8 @@
package org.apache.flink.runtime.util;
import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobType;
import
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import
org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
@@ -30,6 +30,8 @@ import
org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+
/** Utility class for selecting {@link SlotSelectionStrategy}. */
public class SlotSelectionStrategyUtils {
@@ -37,13 +39,13 @@ public class SlotSelectionStrategyUtils {
public static SlotSelectionStrategy selectSlotSelectionStrategy(
final JobType jobType, final Configuration configuration) {
- final boolean evenlySpreadOutSlots =
-
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
+ TaskManagerLoadBalanceMode taskManagerLoadBalanceMode =
+
TaskManagerOptions.TaskManagerLoadBalanceMode.loadFromConfiguration(configuration);
final SlotSelectionStrategy locationPreferenceSlotSelectionStrategy;
locationPreferenceSlotSelectionStrategy =
- evenlySpreadOutSlots
+ taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS
?
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut()
:
LocationPreferenceSlotSelectionStrategy.createDefault();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
index 57492af56f1..aaa240a2b6d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
@@ -34,9 +34,11 @@ import org.apache.flink.util.concurrent.ScheduledExecutor;
import java.time.Duration;
import java.util.concurrent.Executor;
+import static
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+
/** Builder for {@link DeclarativeSlotManager}. */
public class DeclarativeSlotManagerBuilder {
- private boolean evenlySpreadOutSlots;
+ private TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
private final ScheduledExecutor scheduledExecutor;
private Time taskManagerRequestTimeout;
private Time taskManagerTimeout;
@@ -53,7 +55,7 @@ public class DeclarativeSlotManagerBuilder {
private Duration declareNeededResourceDelay;
private DeclarativeSlotManagerBuilder(ScheduledExecutor scheduledExecutor)
{
- this.evenlySpreadOutSlots = false;
+ this.taskManagerLoadBalanceMode = TaskManagerLoadBalanceMode.NONE;
this.scheduledExecutor = scheduledExecutor;
this.taskManagerRequestTimeout = TestingUtils.infiniteTime();
this.taskManagerTimeout = TestingUtils.infiniteTime();
@@ -93,8 +95,9 @@ public class DeclarativeSlotManagerBuilder {
return this;
}
- public DeclarativeSlotManagerBuilder setEvenlySpreadOutSlots(boolean
evenlySpreadOutSlots) {
- this.evenlySpreadOutSlots = evenlySpreadOutSlots;
+ public DeclarativeSlotManagerBuilder setTaskManagerLoadBalanceMode(
+ TaskManagerLoadBalanceMode taskManagerLoadBalanceMode) {
+ this.taskManagerLoadBalanceMode = taskManagerLoadBalanceMode;
return this;
}
@@ -159,10 +162,10 @@ public class DeclarativeSlotManagerBuilder {
requirementCheckDelay,
declareNeededResourceDelay,
waitResultConsumedBeforeRelease,
- evenlySpreadOutSlots
+ taskManagerLoadBalanceMode ==
TaskManagerLoadBalanceMode.SLOTS
? LeastUtilizationSlotMatchingStrategy.INSTANCE
: AnyMatchingSlotMatchingStrategy.INSTANCE,
- evenlySpreadOutSlots,
+ taskManagerLoadBalanceMode,
defaultWorkerResourceSpec,
numSlotsPerWorker,
minSlotNum,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 339509125eb..5bbb678b343 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -81,6 +81,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import static
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the {@link DeclarativeSlotManager}. */
@@ -1121,7 +1122,7 @@ class DeclarativeSlotManagerTest {
void testSpreadOutSlotAllocationStrategy() throws Exception {
try (DeclarativeSlotManager slotManager =
createDeclarativeSlotManagerBuilder()
- .setEvenlySpreadOutSlots(true)
+
.setTaskManagerLoadBalanceMode(TaskManagerLoadBalanceMode.SLOTS)
.buildAndStartWithDirectExec()) {
final List<CompletableFuture<JobID>> requestSlotFutures = new
ArrayList<>();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
index be46dfe6fd0..b4e61a89e8e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
@@ -36,6 +36,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import static
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the {@link DefaultResourceAllocationStrategy}. */
@@ -44,9 +45,10 @@ class DefaultResourceAllocationStrategyTest {
ResourceProfile.fromResources(1, 100);
private static final int NUM_OF_SLOTS = 5;
private static final DefaultResourceAllocationStrategy
ANY_MATCHING_STRATEGY =
- createStrategy(false);
+ createStrategy(TaskManagerLoadBalanceMode.NONE);
- private static final DefaultResourceAllocationStrategy EVENLY_STRATEGY =
createStrategy(true);
+ private static final DefaultResourceAllocationStrategy EVENLY_STRATEGY =
+ createStrategy(TaskManagerLoadBalanceMode.SLOTS);
@Test
void testFulfillRequirementWithRegisteredResources() {
@@ -694,20 +696,21 @@ class DefaultResourceAllocationStrategyTest {
.hasSize(pendingTaskManagersToAllocate);
}
- private static DefaultResourceAllocationStrategy createStrategy(boolean
evenlySpreadOutSlots) {
- return createStrategy(evenlySpreadOutSlots, 0);
+ private static DefaultResourceAllocationStrategy createStrategy(
+ TaskManagerLoadBalanceMode taskManagerLoadBalanceMode) {
+ return createStrategy(taskManagerLoadBalanceMode, 0);
}
private static DefaultResourceAllocationStrategy createStrategy(int
redundantTaskManagerNum) {
- return createStrategy(false, redundantTaskManagerNum);
+ return createStrategy(TaskManagerLoadBalanceMode.NONE,
redundantTaskManagerNum);
}
private static DefaultResourceAllocationStrategy createStrategy(
- boolean evenlySpreadOutSlots, int redundantTaskManagerNum) {
+ TaskManagerLoadBalanceMode taskManagerLoadBalanceMode, int
redundantTaskManagerNum) {
return new DefaultResourceAllocationStrategy(
DEFAULT_SLOT_RESOURCE.multiply(NUM_OF_SLOTS),
NUM_OF_SLOTS,
- evenlySpreadOutSlots,
+ taskManagerLoadBalanceMode,
Time.milliseconds(0),
redundantTaskManagerNum,
new CPUResource(0.0),
@@ -719,7 +722,7 @@ class DefaultResourceAllocationStrategyTest {
return new DefaultResourceAllocationStrategy(
DEFAULT_SLOT_RESOURCE.multiply(NUM_OF_SLOTS),
NUM_OF_SLOTS,
- false,
+ TaskManagerLoadBalanceMode.NONE,
Time.milliseconds(0),
0,
minRequiredCPU,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
index f4e91a6ac70..d0eaeb5020d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
@@ -55,7 +55,7 @@ class
FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
new DefaultResourceAllocationStrategy(
DEFAULT_TOTAL_RESOURCE_PROFILE,
DEFAULT_NUM_SLOTS_PER_WORKER,
- slotManagerConfiguration.isEvenlySpreadOutSlots(),
+
slotManagerConfiguration.getTaskManagerLoadBalanceMode(),
slotManagerConfiguration.getTaskManagerTimeout(),
slotManagerConfiguration.getRedundantTaskManagerNum(),
slotManagerConfiguration.getMinTotalCpu(),
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
index d5d70e164eb..260592d44dc 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
@@ -27,6 +27,8 @@ import org.apache.flink.testutils.TestingUtils;
import java.time.Duration;
+import static
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+
/** Builder for {@link SlotManagerConfiguration}. */
public class SlotManagerConfigurationBuilder {
private Time taskManagerRequestTimeout;
@@ -43,7 +45,7 @@ public class SlotManagerConfigurationBuilder {
private MemorySize minTotalMem;
private MemorySize maxTotalMem;
private int redundantTaskManagerNum;
- private boolean evenlySpreadOutSlots;
+ private TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
private SlotManagerConfigurationBuilder() {
this.taskManagerRequestTimeout = TestingUtils.infiniteTime();
@@ -62,7 +64,7 @@ public class SlotManagerConfigurationBuilder {
this.maxTotalMem = MemorySize.MAX_VALUE;
this.redundantTaskManagerNum =
ResourceManagerOptions.REDUNDANT_TASK_MANAGER_NUM.defaultValue();
- this.evenlySpreadOutSlots = false;
+ this.taskManagerLoadBalanceMode = TaskManagerLoadBalanceMode.NONE;
}
public static SlotManagerConfigurationBuilder newBuilder() {
@@ -141,8 +143,9 @@ public class SlotManagerConfigurationBuilder {
return this;
}
- public SlotManagerConfigurationBuilder setEvenlySpreadOutSlots(boolean
evenlySpreadOutSlots) {
- this.evenlySpreadOutSlots = evenlySpreadOutSlots;
+ public SlotManagerConfigurationBuilder setTaskManagerLoadBalanceMode(
+ TaskManagerLoadBalanceMode taskManagerLoadBalanceMode) {
+ this.taskManagerLoadBalanceMode = taskManagerLoadBalanceMode;
return this;
}
@@ -153,10 +156,10 @@ public class SlotManagerConfigurationBuilder {
requirementCheckDelay,
declareNeededResourceDelay,
waitResultConsumedBeforeRelease,
- evenlySpreadOutSlots
+ taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS
? LeastUtilizationSlotMatchingStrategy.INSTANCE
: AnyMatchingSlotMatchingStrategy.INSTANCE,
- evenlySpreadOutSlots,
+ taskManagerLoadBalanceMode,
defaultWorkerResourceSpec,
numSlotsPerWorker,
minSlotNum,