This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new 8cd774f7fcf [FLINK-33977][runtime] Support minimize TM number during
downscaling in adaptive scheduler
8cd774f7fcf is described below
commit 8cd774f7fcf71275a19fc0d7b5bb21a1ad90bd98
Author: Yuepeng Pan <[email protected]>
AuthorDate: Thu May 15 23:25:10 2025 +0800
[FLINK-33977][runtime] Support minimize TM number during downscaling in
adaptive scheduler
Also enable this strategy by default via the introduced config option
---
.../generated/all_jobmanager_section.html | 6 ++
.../generated/expert_scheduling_section.html | 6 ++
.../generated/job_manager_configuration.html | 6 ++
.../flink/configuration/JobManagerOptions.java | 27 +++++
.../adaptive/AdaptiveSchedulerFactory.java | 17 ++-
...DefaultSlotAssigner.java => AllocatorUtil.java} | 57 +++++-----
.../adaptive/allocator/DefaultSlotAssigner.java | 118 +++++++++++++++++----
.../allocator/SlotSharingSlotAllocator.java | 32 ++++--
.../allocator/StateLocalitySlotAssigner.java | 9 +-
.../adaptive/AdaptiveSchedulerBuilder.java | 8 +-
.../allocator/DefaultSlotAssignerTest.java | 109 +++++++++++++++++++
.../allocator/SlotSharingSlotAllocatorTest.java | 54 +++++++---
.../scheduler/adaptive/allocator/TestSlotInfo.java | 15 ++-
13 files changed, 382 insertions(+), 82 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
index 626aa50f386..e9efa60e78e 100644
--- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
@@ -14,6 +14,12 @@
<td>Integer</td>
<td>Configure the minimum increase in parallelism for a job to
scale up.</td>
</tr>
+ <tr>
+
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>This parameter defines whether the adaptive scheduler
prioritizes using the minimum number of <code
class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note,
this parameter is suitable if <code
class="highlighter-rouge">execution.state-recovery.from-local</code> is not
enabled. More details about this configuration are available at <a
href="https://issues.apache.org/jira/browse/FLINK-33977">FLINK-33977</a>.</td>
+ </tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index 86682f4fbfd..44495201f8a 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -92,6 +92,12 @@
<td>Integer</td>
<td>Configure the minimum increase in parallelism for a job to
scale up.</td>
</tr>
+ <tr>
+
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>This parameter defines whether the adaptive scheduler
prioritizes using the minimum number of <code
class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note,
this parameter is suitable if <code
class="highlighter-rouge">execution.state-recovery.from-local</code> is not
enabled. More details about this configuration are available at <a
href="https://issues.apache.org/jira/browse/FLINK-33977">FLINK-33977</a>.</td>
+ </tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html
b/docs/layouts/shortcodes/generated/job_manager_configuration.html
index c01601a031a..c8bd7f381f8 100644
--- a/docs/layouts/shortcodes/generated/job_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html
@@ -14,6 +14,12 @@
<td>Integer</td>
<td>Configure the minimum increase in parallelism for a job to
scale up.</td>
</tr>
+ <tr>
+
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>This parameter defines whether the adaptive scheduler
prioritizes using the minimum number of <code
class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note,
this parameter is suitable if <code
class="highlighter-rouge">execution.state-recovery.from-local</code> is not
enabled. More details about this configuration are available at <a
href="https://issues.apache.org/jira/browse/FLINK-33977">FLINK-33977</a>.</td>
+ </tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 77bfce2a9f8..ed179cc12aa 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -18,6 +18,7 @@
package org.apache.flink.configuration;
+import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
@@ -30,6 +31,7 @@ import static
org.apache.flink.configuration.ConfigOptions.key;
import static
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.ALL_PRODUCERS_FINISHED;
import static
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS;
import static
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.UNFINISHED_PRODUCERS;
+import static
org.apache.flink.configuration.StateRecoveryOptions.LOCAL_RECOVERY;
import static org.apache.flink.configuration.description.LinkElement.link;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.configuration.description.TextElement.text;
@@ -677,6 +679,31 @@ public class JobManagerOptions {
code(SchedulerType.AdaptiveBatch.name()))
.build());
+ @Experimental
+ @Documentation.Section({
+ Documentation.Sections.EXPERT_SCHEDULING,
+ Documentation.Sections.ALL_JOB_MANAGER
+ })
+ public static final ConfigOption<Boolean>
SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED =
+ key("jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ Description.builder()
+ .text(
+ "This parameter defines whether
the adaptive scheduler prioritizes "
+ + "using the minimum
number of %s when scheduling tasks.",
+ code("TaskManagers"))
+ .linebreak()
+ .text(
+ "Note, this parameter is suitable
if %s is not enabled. "
+ + "More details about this
configuration are available at %s.",
+ code(LOCAL_RECOVERY.key()),
+ link(
+
"https://issues.apache.org/jira/browse/FLINK-33977",
+ "FLINK-33977"))
+ .build());
+
/** @deprecated Use {@link BatchExecutionOptions#SPECULATIVE_ENABLED}. */
@Deprecated
@Documentation.ExcludeFromDocumentation("Hidden for deprecated")
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
index d707700149a..3dd0c0a014b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler.adaptive;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.failure.FailureEnricher;
@@ -48,6 +49,8 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.slf4j.Logger;
+import javax.annotation.Nullable;
+
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -106,7 +109,10 @@ public class AdaptiveSchedulerFactory implements
SchedulerNGFactory {
final SlotSharingSlotAllocator slotAllocator =
createSlotSharingSlotAllocator(
declarativeSlotPool,
-
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY));
+
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY),
+ jobMasterConfiguration.get(DeploymentOptions.TARGET),
+ jobMasterConfiguration.get(
+
JobManagerOptions.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED));
final ExecutionGraphFactory executionGraphFactory =
new DefaultExecutionGraphFactory(
@@ -148,11 +154,16 @@ public class AdaptiveSchedulerFactory implements
SchedulerNGFactory {
}
public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
- DeclarativeSlotPool declarativeSlotPool, boolean
localRecoveryEnabled) {
+ DeclarativeSlotPool declarativeSlotPool,
+ boolean localRecoveryEnabled,
+ @Nullable String executionTarget,
+ boolean minimalTaskManagerPreferred) {
return SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
declarativeSlotPool::reserveFreeSlot,
declarativeSlotPool::freeReservedSlot,
declarativeSlotPool::containsFreeSlot,
- localRecoveryEnabled);
+ localRecoveryEnabled,
+ executionTarget,
+ minimalTaskManagerPreferred);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
similarity index 55%
copy from
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
copy to
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
index 0a8813b33a6..d516aae2596 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
@@ -18,46 +18,53 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotInfo;
-import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
-import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups
equally. */
-public class DefaultSlotAssigner implements SlotAssigner {
+import static org.apache.flink.util.Preconditions.checkState;
- @Override
- public Collection<SlotAssignment> assignSlots(
- JobInformation jobInformation,
- Collection<? extends SlotInfo> freeSlots,
- VertexParallelism vertexParallelism,
- JobAllocationsInformation previousAllocations) {
- List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
- for (SlotSharingGroup slotSharingGroup :
jobInformation.getSlotSharingGroups()) {
-
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism,
slotSharingGroup));
- }
+/** The allocator util class. */
+class AllocatorUtil {
- Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
- Collection<SlotAssignment> assignments = new ArrayList<>();
- for (ExecutionSlotSharingGroup group : allGroups) {
- assignments.add(new SlotAssignment(iterator.next(), group));
- }
- return assignments;
+ private AllocatorUtil() {}
+
+ static Map<SlotSharingGroupId,
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
+ getSlotSharingGroupMetaInfos(JobInformation jobInformation) {
+ return
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
+ }
+
+ static int getMinimumRequiredSlots(
+ Map<SlotSharingGroupId,
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
+ slotSharingGroupMetaInfos) {
+ return slotSharingGroupMetaInfos.values().stream()
+
.map(SlotSharingSlotAllocator.SlotSharingGroupMetaInfo::getMaxLowerBound)
+ .reduce(0, Integer::sum);
+ }
+
+ static void checkMinimumRequiredSlots(
+ JobInformation jobInformation, Collection<? extends SlotInfo>
freeSlots) {
+ final int minimumRequiredSlots =
+
getMinimumRequiredSlots(getSlotSharingGroupMetaInfos(jobInformation));
+ checkState(
+ freeSlots.size() >= minimumRequiredSlots,
+ "Not enough slots to allocate all the execution slot sharing
groups (have: %s, need: %s)",
+ freeSlots.size(),
+ minimumRequiredSlots);
}
- static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
- VertexParallelism vertexParallelism, SlotSharingGroup
slotSharingGroup) {
+ static List<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
+ createExecutionSlotSharingGroups(
+ VertexParallelism vertexParallelism, SlotSharingGroup
slotSharingGroup) {
final Map<Integer, Set<ExecutionVertexID>>
sharedSlotToVertexAssignment = new HashMap<>();
slotSharingGroup
.getJobVertexIds()
@@ -71,7 +78,7 @@ public class DefaultSlotAssigner implements SlotAssigner {
}
});
return sharedSlotToVertexAssignment.values().stream()
- .map(ExecutionSlotSharingGroup::new)
+ .map(SlotSharingSlotAllocator.ExecutionSlotSharingGroup::new)
.collect(Collectors.toList());
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
index 0a8813b33a6..2dc0f6dac56 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
@@ -18,37 +18,64 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups
equally. */
+import static java.util.function.Function.identity;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups;
+
+/**
+ * Simple {@link SlotAssigner} that treats all slots and slot sharing groups
equally. Specifically,
+ * when the cluster is deployed in application mode and the {@link
+ *
org.apache.flink.configuration.JobManagerOptions#SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED}
+ * is enabled, execution slot sharing groups are preferentially assigned to
the minimal number of
+ * task managers.
+ */
public class DefaultSlotAssigner implements SlotAssigner {
+ @VisibleForTesting static final String APPLICATION_MODE_EXECUTION_TARGET =
"embedded";
+
+ private final @Nullable String executionTarget;
+ private final boolean minimalTaskManagerPreferred;
+
+ DefaultSlotAssigner(@Nullable String executionTarget, boolean
minimalTaskManagerPreferred) {
+ this.executionTarget = executionTarget;
+ this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
+ }
+
@Override
public Collection<SlotAssignment> assignSlots(
JobInformation jobInformation,
Collection<? extends SlotInfo> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations) {
- List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
+ checkMinimumRequiredSlots(jobInformation, freeSlots);
+
+ final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
for (SlotSharingGroup slotSharingGroup :
jobInformation.getSlotSharingGroups()) {
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism,
slotSharingGroup));
}
- Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+ final Collection<? extends SlotInfo> pickedSlots =
+ pickSlotsIfNeeded(allGroups.size(), freeSlots);
+
+ Iterator<? extends SlotInfo> iterator = pickedSlots.iterator();
Collection<SlotAssignment> assignments = new ArrayList<>();
for (ExecutionSlotSharingGroup group : allGroups) {
assignments.add(new SlotAssignment(iterator.next(), group));
@@ -56,22 +83,67 @@ public class DefaultSlotAssigner implements SlotAssigner {
return assignments;
}
- static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
- VertexParallelism vertexParallelism, SlotSharingGroup
slotSharingGroup) {
- final Map<Integer, Set<ExecutionVertexID>>
sharedSlotToVertexAssignment = new HashMap<>();
- slotSharingGroup
- .getJobVertexIds()
- .forEach(
- jobVertexId -> {
- int parallelism =
vertexParallelism.getParallelism(jobVertexId);
- for (int subtaskIdx = 0; subtaskIdx < parallelism;
subtaskIdx++) {
- sharedSlotToVertexAssignment
- .computeIfAbsent(subtaskIdx, ignored
-> new HashSet<>())
- .add(new
ExecutionVertexID(jobVertexId, subtaskIdx));
- }
- });
- return sharedSlotToVertexAssignment.values().stream()
- .map(ExecutionSlotSharingGroup::new)
- .collect(Collectors.toList());
+ @VisibleForTesting
+ Collection<? extends SlotInfo> pickSlotsIfNeeded(
+ int requestExecutionSlotSharingGroups, Collection<? extends
SlotInfo> freeSlots) {
+ Collection<? extends SlotInfo> pickedSlots = freeSlots;
+ if (APPLICATION_MODE_EXECUTION_TARGET.equalsIgnoreCase(executionTarget)
+ && minimalTaskManagerPreferred
+ // To avoid the sort-work loading.
+ && freeSlots.size() > requestExecutionSlotSharingGroups) {
+ final Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsPerTaskExecutor =
+ getSlotsPerTaskExecutor(freeSlots);
+ pickedSlots =
+ pickSlotsInMinimalTaskExecutors(
+ slotsPerTaskExecutor,
requestExecutionSlotSharingGroups);
+ }
+ return pickedSlots;
+ }
+
+ /**
+ * In order to minimize the using of task executors at the resource
manager side in the
+ * application-mode and release more task executors in a timely manner, it
is a good choice to
+ * prioritize selecting slots on task executors with the most available
slots.
+ *
+ * @param slotsPerTaskExecutor The slots per task manager.
+ * @return The ordered task manager that orders by the number of free
slots descending.
+ */
+ private Iterator<TaskManagerLocation> getSortedTaskExecutors(
+ Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsPerTaskExecutor) {
+ final Comparator<TaskManagerLocation> taskExecutorComparator =
+ (leftTml, rightTml) ->
+ Integer.compare(
+ slotsPerTaskExecutor.get(rightTml).size(),
+ slotsPerTaskExecutor.get(leftTml).size());
+ return
slotsPerTaskExecutor.keySet().stream().sorted(taskExecutorComparator).iterator();
+ }
+
+ /**
+ * Pick the target slots to assign with the requested groups.
+ *
+ * @param slotsByTaskExecutor slots per task executor.
+ * @param requestedGroups the number of the request execution slot sharing
groups.
+ * @return the target slots that are distributed on the minimal task
executors.
+ */
+ private Collection<? extends SlotInfo> pickSlotsInMinimalTaskExecutors(
+ Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsByTaskExecutor,
+ int requestedGroups) {
+ final List<SlotInfo> pickedSlots = new ArrayList<>();
+ final Iterator<TaskManagerLocation> sortedTaskExecutors =
+ getSortedTaskExecutors(slotsByTaskExecutor);
+ while (pickedSlots.size() < requestedGroups) {
+ Set<? extends SlotInfo> slotInfos =
slotsByTaskExecutor.get(sortedTaskExecutors.next());
+ pickedSlots.addAll(slotInfos);
+ }
+ return pickedSlots;
+ }
+
+ private Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
getSlotsPerTaskExecutor(
+ Collection<? extends SlotInfo> slots) {
+ return slots.stream()
+ .collect(
+ Collectors.groupingBy(
+ SlotInfo::getTaskManagerLocation,
+ Collectors.mapping(identity(),
Collectors.toSet())));
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 4b002ced5b0..425159cfe0f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -32,6 +32,7 @@ import
org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.util.ResourceCounter;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
@@ -46,6 +47,9 @@ import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.getMinimumRequiredSlots;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.getSlotSharingGroupMetaInfos;
+
/** {@link SlotAllocator} implementation that supports slot sharing. */
public class SlotSharingSlotAllocator implements SlotAllocator {
@@ -53,28 +57,38 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
private final FreeSlotFunction freeSlotFunction;
private final IsSlotAvailableAndFreeFunction
isSlotAvailableAndFreeFunction;
private final boolean localRecoveryEnabled;
+ private final @Nullable String executionTarget;
+ private final boolean minimalTaskManagerPreferred;
private SlotSharingSlotAllocator(
ReserveSlotFunction reserveSlot,
FreeSlotFunction freeSlotFunction,
IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
- boolean localRecoveryEnabled) {
+ boolean localRecoveryEnabled,
+ @Nullable String executionTarget,
+ boolean minimalTaskManagerPreferred) {
this.reserveSlotFunction = reserveSlot;
this.freeSlotFunction = freeSlotFunction;
this.isSlotAvailableAndFreeFunction = isSlotAvailableAndFreeFunction;
this.localRecoveryEnabled = localRecoveryEnabled;
+ this.executionTarget = executionTarget;
+ this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
}
public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
ReserveSlotFunction reserveSlot,
FreeSlotFunction freeSlotFunction,
IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
- boolean localRecoveryEnabled) {
+ boolean localRecoveryEnabled,
+ @Nullable String executionTarget,
+ boolean minimalTaskManagerPreferred) {
return new SlotSharingSlotAllocator(
reserveSlot,
freeSlotFunction,
isSlotAvailableAndFreeFunction,
- localRecoveryEnabled);
+ localRecoveryEnabled,
+ executionTarget,
+ minimalTaskManagerPreferred);
}
@Override
@@ -93,12 +107,9 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
JobInformation jobInformation, Collection<? extends SlotInfo>
freeSlots) {
final Map<SlotSharingGroupId, SlotSharingGroupMetaInfo>
slotSharingGroupMetaInfo =
- SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
+ getSlotSharingGroupMetaInfos(jobInformation);
- final int minimumRequiredSlots =
- slotSharingGroupMetaInfo.values().stream()
- .map(SlotSharingGroupMetaInfo::getMaxLowerBound)
- .reduce(0, Integer::sum);
+ final int minimumRequiredSlots =
getMinimumRequiredSlots(slotSharingGroupMetaInfo);
if (minimumRequiredSlots > freeSlots.size()) {
return Optional.empty();
@@ -140,7 +151,8 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
SlotAssigner slotAssigner =
localRecoveryEnabled &&
!jobAllocationsInformation.isEmpty()
? new StateLocalitySlotAssigner()
- : new DefaultSlotAssigner();
+ : new DefaultSlotAssigner(
+ executionTarget,
minimalTaskManagerPreferred);
return new JobSchedulingPlan(
parallelism,
slotAssigner.assignSlots(
@@ -306,7 +318,7 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
}
}
- private static class SlotSharingGroupMetaInfo {
+ static class SlotSharingGroupMetaInfo {
private final int minLowerBound;
private final int maxLowerBound;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
index 4e2a807c062..dcad351ed32 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
@@ -42,7 +42,8 @@ import java.util.stream.Collectors;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
-import static
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups;
import static org.apache.flink.util.Preconditions.checkState;
/** A {@link SlotAssigner} that assigns slots based on the number of local key
groups. */
@@ -94,11 +95,7 @@ public class StateLocalitySlotAssigner implements
SlotAssigner {
Collection<? extends SlotInfo> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations) {
- checkState(
- freeSlots.size() >=
jobInformation.getSlotSharingGroups().size(),
- "Not enough slots to allocate all the slot sharing groups
(have: %s, need: %s)",
- freeSlots.size(),
- jobInformation.getSlotSharingGroups().size());
+ checkMinimumRequiredSlots(jobInformation, freeSlots);
final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
for (SlotSharingGroup slotSharingGroup :
jobInformation.getSlotSharingGroups()) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index e5e844aab9a..01de94f6d57 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -19,6 +19,8 @@ package org.apache.flink.runtime.scheduler.adaptive;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
@@ -230,7 +232,11 @@ public class AdaptiveSchedulerBuilder {
slotAllocator == null
?
AdaptiveSchedulerFactory.createSlotSharingSlotAllocator(
declarativeSlotPool,
-
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY))
+
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY),
+
jobMasterConfiguration.get(DeploymentOptions.TARGET),
+ jobMasterConfiguration.get(
+ JobManagerOptions
+
.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED))
: slotAllocator,
executorService,
userCodeLoader,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
new file mode 100644
index 00000000000..36e201a0499
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.APPLICATION_MODE_EXECUTION_TARGET;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultSlotAssigner}. */
+@ExtendWith(ParameterizedTestExtension.class)
+class DefaultSlotAssignerTest {
+
+ private static final TaskManagerLocation tml1 = new
LocalTaskManagerLocation();
+ private static final SlotInfo slot1OfTml1 = new TestSlotInfo(tml1);
+ private static final SlotInfo slot2OfTml1 = new TestSlotInfo(tml1);
+ private static final SlotInfo slot3OfTml1 = new TestSlotInfo(tml1);
+
+ private static final TaskManagerLocation tml2 = new
LocalTaskManagerLocation();
+ private static final SlotInfo slot1OfTml2 = new TestSlotInfo(tml2);
+ private static final SlotInfo slot2OfTml2 = new TestSlotInfo(tml2);
+ private static final SlotInfo slot3OfTml2 = new TestSlotInfo(tml2);
+
+ private static final TaskManagerLocation tml3 = new
LocalTaskManagerLocation();
+ private static final SlotInfo slot1OfTml3 = new TestSlotInfo(tml3);
+ private static final SlotInfo slot2OfTml3 = new TestSlotInfo(tml3);
+
+ private static final List<SlotInfo> allSlots =
+ Arrays.asList(
+ slot1OfTml1,
+ slot2OfTml1,
+ slot3OfTml1,
+ slot1OfTml2,
+ slot2OfTml2,
+ slot3OfTml2,
+ slot1OfTml3,
+ slot2OfTml3);
+
+ @Parameter int parallelism;
+
+ @Parameter(value = 1)
+ Collection<? extends SlotInfo> freeSlots;
+
+ @Parameter(value = 2)
+ List<TaskManagerLocation> expectedTaskManagerLocations;
+
+ @TestTemplate
+ void testPickSlotsIfNeeded() {
+ final DefaultSlotAssigner slotAssigner =
+ new DefaultSlotAssigner(APPLICATION_MODE_EXECUTION_TARGET,
true);
+ final Set<TaskManagerLocation> keptTaskExecutors =
+ slotAssigner.pickSlotsIfNeeded(parallelism, freeSlots).stream()
+ .map(SlotInfo::getTaskManagerLocation)
+ .collect(Collectors.toSet());
+ assertThat(expectedTaskManagerLocations)
+ .containsExactlyInAnyOrderElementsOf(keptTaskExecutors);
+ }
+
+ @Parameters(name = "parallelism={0}, freeSlots={1},
expectedTaskManagerLocations={2}")
+ private static Collection<Object[]> getTestingParameters() {
+ return Arrays.asList(
+ new Object[] {
+ 4,
+ Arrays.asList(slot1OfTml1, slot2OfTml1, slot1OfTml2,
slot2OfTml3),
+ Arrays.asList(tml1, tml2, tml3)
+ },
+ new Object[] {
+ 2,
+ Arrays.asList(slot1OfTml1, slot2OfTml1, slot1OfTml2,
slot2OfTml3),
+ singletonList(tml1)
+ },
+ new Object[] {
+ 3,
+ Arrays.asList(slot1OfTml1, slot1OfTml2, slot2OfTml2,
slot3OfTml2),
+ Arrays.asList(tml2)
+ },
+ new Object[] {7, allSlots, Arrays.asList(tml1, tml2, tml3)});
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
index 5311571f7a6..6121948e353 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
@@ -62,6 +62,8 @@ class SlotSharingSlotAllocatorTest {
private static final IsSlotAvailableAndFreeFunction
TEST_IS_SLOT_FREE_FUNCTION =
ignored -> true;
private static final boolean DISABLE_LOCAL_RECOVERY = false;
+ private static final String NULL_EXECUTION_TARGET = null;
+ private static final boolean MINIMAL_TASK_MANAGER_PREFERRED_DISABLED =
true;
private static final SlotSharingGroup slotSharingGroup1 = new
SlotSharingGroup();
private static final SlotSharingGroup slotSharingGroup2 = new
SlotSharingGroup();
@@ -79,7 +81,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final ResourceCounter resourceCounter =
slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1,
vertex2, vertex3));
@@ -98,7 +102,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -118,7 +124,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -141,7 +149,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
final JobInformation.VertexInformation vertex11 =
new TestVertexInformation(new JobVertexID(), 4,
slotSharingGroup1);
@@ -175,7 +185,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -193,7 +205,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 1, 8, new
SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
@@ -222,7 +236,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 4, 4, new
SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
@@ -244,7 +260,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 2, 2,
slotSharingGroup);
@@ -266,7 +284,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 4, 10, new
SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
@@ -297,7 +317,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 4, 4, new
SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
@@ -328,7 +350,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -372,7 +396,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
ignored -> false,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -429,7 +455,9 @@ class SlotSharingSlotAllocatorTest {
TestingPhysicalSlot.builder().build(),
(allocationID, cause, ts) -> {},
id -> false,
- true)
+ true,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED)
.determineParallelismAndCalculateAssignment(
new TestJobInformation(Arrays.asList(vertex1,
vertex2, vertex3)),
freeSlots,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
index 9f3573a0705..fb9161df25e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
@@ -28,6 +28,7 @@ public class TestSlotInfo implements SlotInfo {
private final AllocationID allocationId;
private final ResourceProfile resourceProfile;
+ private final TaskManagerLocation taskManagerLocation;
public TestSlotInfo() {
this(new AllocationID(), ResourceProfile.ANY);
@@ -41,9 +42,21 @@ public class TestSlotInfo implements SlotInfo {
this(new AllocationID(), resourceProfile);
}
+ public TestSlotInfo(TaskManagerLocation tml) {
+ this(new AllocationID(), ResourceProfile.ANY, tml);
+ }
+
public TestSlotInfo(AllocationID allocationId, ResourceProfile
resourceProfile) {
+ this(allocationId, resourceProfile, new LocalTaskManagerLocation());
+ }
+
+ private TestSlotInfo(
+ AllocationID allocationId,
+ ResourceProfile resourceProfile,
+ TaskManagerLocation taskManagerLocation) {
this.allocationId = allocationId;
this.resourceProfile = resourceProfile;
+ this.taskManagerLocation = taskManagerLocation;
}
@Override
@@ -53,7 +66,7 @@ public class TestSlotInfo implements SlotInfo {
@Override
public TaskManagerLocation getTaskManagerLocation() {
- return new LocalTaskManagerLocation();
+ return taskManagerLocation;
}
@Override