This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push:
new 8348f7d96c7 [FLINK-33977][runtime] Support minimize TM number during
downscaling in adaptive scheduler
8348f7d96c7 is described below
commit 8348f7d96c73418beb8fea26de698566306cc10e
Author: Yuepeng Pan <[email protected]>
AuthorDate: Thu May 15 23:27:08 2025 +0800
[FLINK-33977][runtime] Support minimize TM number during downscaling in
adaptive scheduler
Also enable this strategy by default via the introduced config option
---
.../generated/all_jobmanager_section.html | 6 ++
.../generated/expert_scheduling_section.html | 6 ++
.../generated/job_manager_configuration.html | 6 ++
.../flink/configuration/JobManagerOptions.java | 27 +++++
.../adaptive/AdaptiveSchedulerFactory.java | 17 ++-
...DefaultSlotAssigner.java => AllocatorUtil.java} | 57 +++++-----
.../adaptive/allocator/DefaultSlotAssigner.java | 118 +++++++++++++++++----
.../allocator/SlotSharingSlotAllocator.java | 34 ++++--
.../allocator/StateLocalitySlotAssigner.java | 9 +-
.../adaptive/AdaptiveSchedulerBuilder.java | 8 +-
.../allocator/DefaultSlotAssignerTest.java | 109 +++++++++++++++++++
.../allocator/SlotSharingSlotAllocatorTest.java | 56 +++++++---
.../scheduler/adaptive/allocator/TestSlotInfo.java | 15 ++-
13 files changed, 386 insertions(+), 82 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
index 626aa50f386..aa2bfee81f9 100644
--- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
@@ -14,6 +14,12 @@
<td>Integer</td>
<td>Configure the minimum increase in parallelism for a job to
scale up.</td>
</tr>
+ <tr>
+
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>This parameter defines whether the adaptive scheduler
prioritizes using the minimum number of <code
class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note,
this parameter is suitable if <code
class="highlighter-rouge">state.backend.local-recovery</code> is not enabled.
More details about this configuration are available at <a
href="https://issues.apache.org/jira/browse/FLINK-33977">FLINK-33977</a>.</td>
+ </tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index 0c9c877c844..e85161c81fb 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -68,6 +68,12 @@
<td>Integer</td>
<td>Configure the minimum increase in parallelism for a job to
scale up.</td>
</tr>
+ <tr>
+
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>This parameter defines whether the adaptive scheduler
prioritizes using the minimum number of <code
class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note,
this parameter is suitable if <code
class="highlighter-rouge">state.backend.local-recovery</code> is not enabled.
More details about this configuration are available at <a
href="https://issues.apache.org/jira/browse/FLINK-33977">FLINK-33977</a>.</td>
+ </tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html
b/docs/layouts/shortcodes/generated/job_manager_configuration.html
index b82ae03c3d7..a948f7e3aff 100644
--- a/docs/layouts/shortcodes/generated/job_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html
@@ -14,6 +14,12 @@
<td>Integer</td>
<td>Configure the minimum increase in parallelism for a job to
scale up.</td>
</tr>
+ <tr>
+
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>This parameter defines whether the adaptive scheduler
prioritizes using the minimum number of <code
class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note,
this parameter is suitable if <code
class="highlighter-rouge">state.backend.local-recovery</code> is not enabled.
More details about this configuration are available at <a
href="https://issues.apache.org/jira/browse/FLINK-33977">FLINK-33977</a>.</td>
+ </tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 1e6c68c3506..be0ff7f2f94 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -18,6 +18,7 @@
package org.apache.flink.configuration;
+import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
@@ -26,6 +27,7 @@ import
org.apache.flink.configuration.description.InlineElement;
import java.time.Duration;
+import static
org.apache.flink.configuration.CheckpointingOptions.LOCAL_RECOVERY;
import static org.apache.flink.configuration.ConfigOptions.key;
import static
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.ALL_PRODUCERS_FINISHED;
import static
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS;
@@ -678,6 +680,31 @@ public class JobManagerOptions {
code(SchedulerType.AdaptiveBatch.name()))
.build());
+ @Experimental
+ @Documentation.Section({
+ Documentation.Sections.EXPERT_SCHEDULING,
+ Documentation.Sections.ALL_JOB_MANAGER
+ })
+ public static final ConfigOption<Boolean>
SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED =
+ key("jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ Description.builder()
+ .text(
+ "This parameter defines whether
the adaptive scheduler prioritizes "
+ + "using the minimum
number of %s when scheduling tasks.",
+ code("TaskManagers"))
+ .linebreak()
+ .text(
+ "Note, this parameter is suitable
if %s is not enabled. "
+ + "More details about this
configuration are available at %s.",
+ code(LOCAL_RECOVERY.key()),
+ link(
+
"https://issues.apache.org/jira/browse/FLINK-33977",
+ "FLINK-33977"))
+ .build());
+
/** @deprecated Use {@link BatchExecutionOptions#SPECULATIVE_ENABLED}. */
@Deprecated
@Documentation.ExcludeFromDocumentation("Hidden for deprecated")
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
index a2552f1103a..6c5781397ea 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler.adaptive;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
@@ -47,6 +48,8 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.slf4j.Logger;
+import javax.annotation.Nullable;
+
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -103,7 +106,11 @@ public class AdaptiveSchedulerFactory implements
SchedulerNGFactory {
jobGraph.getJobID());
final SlotSharingSlotAllocator slotAllocator =
- createSlotSharingSlotAllocator(declarativeSlotPool);
+ createSlotSharingSlotAllocator(
+ declarativeSlotPool,
+ jobMasterConfiguration.get(DeploymentOptions.TARGET),
+ jobMasterConfiguration.get(
+
JobManagerOptions.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED));
final ExecutionGraphFactory executionGraphFactory =
new DefaultExecutionGraphFactory(
@@ -145,10 +152,14 @@ public class AdaptiveSchedulerFactory implements
SchedulerNGFactory {
}
public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
- DeclarativeSlotPool declarativeSlotPool) {
+ DeclarativeSlotPool declarativeSlotPool,
+ @Nullable String executionTarget,
+ boolean minimalTaskManagerPreferred) {
return SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
declarativeSlotPool::reserveFreeSlot,
declarativeSlotPool::freeReservedSlot,
- declarativeSlotPool::containsFreeSlot);
+ declarativeSlotPool::containsFreeSlot,
+ executionTarget,
+ minimalTaskManagerPreferred);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
similarity index 55%
copy from
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
copy to
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
index 0a8813b33a6..d516aae2596 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
@@ -18,46 +18,53 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotInfo;
-import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
-import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups
equally. */
-public class DefaultSlotAssigner implements SlotAssigner {
+import static org.apache.flink.util.Preconditions.checkState;
- @Override
- public Collection<SlotAssignment> assignSlots(
- JobInformation jobInformation,
- Collection<? extends SlotInfo> freeSlots,
- VertexParallelism vertexParallelism,
- JobAllocationsInformation previousAllocations) {
- List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
- for (SlotSharingGroup slotSharingGroup :
jobInformation.getSlotSharingGroups()) {
-
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism,
slotSharingGroup));
- }
+/** The allocator util class. */
+class AllocatorUtil {
- Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
- Collection<SlotAssignment> assignments = new ArrayList<>();
- for (ExecutionSlotSharingGroup group : allGroups) {
- assignments.add(new SlotAssignment(iterator.next(), group));
- }
- return assignments;
+ private AllocatorUtil() {}
+
+ static Map<SlotSharingGroupId,
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
+ getSlotSharingGroupMetaInfos(JobInformation jobInformation) {
+ return
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
+ }
+
+ static int getMinimumRequiredSlots(
+ Map<SlotSharingGroupId,
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
+ slotSharingGroupMetaInfos) {
+ return slotSharingGroupMetaInfos.values().stream()
+
.map(SlotSharingSlotAllocator.SlotSharingGroupMetaInfo::getMaxLowerBound)
+ .reduce(0, Integer::sum);
+ }
+
+ static void checkMinimumRequiredSlots(
+ JobInformation jobInformation, Collection<? extends SlotInfo>
freeSlots) {
+ final int minimumRequiredSlots =
+
getMinimumRequiredSlots(getSlotSharingGroupMetaInfos(jobInformation));
+ checkState(
+ freeSlots.size() >= minimumRequiredSlots,
+ "Not enough slots to allocate all the execution slot sharing
groups (have: %s, need: %s)",
+ freeSlots.size(),
+ minimumRequiredSlots);
}
- static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
- VertexParallelism vertexParallelism, SlotSharingGroup
slotSharingGroup) {
+ static List<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
+ createExecutionSlotSharingGroups(
+ VertexParallelism vertexParallelism, SlotSharingGroup
slotSharingGroup) {
final Map<Integer, Set<ExecutionVertexID>>
sharedSlotToVertexAssignment = new HashMap<>();
slotSharingGroup
.getJobVertexIds()
@@ -71,7 +78,7 @@ public class DefaultSlotAssigner implements SlotAssigner {
}
});
return sharedSlotToVertexAssignment.values().stream()
- .map(ExecutionSlotSharingGroup::new)
+ .map(SlotSharingSlotAllocator.ExecutionSlotSharingGroup::new)
.collect(Collectors.toList());
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
index 0a8813b33a6..2dc0f6dac56 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
@@ -18,37 +18,64 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups
equally. */
+import static java.util.function.Function.identity;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups;
+
+/**
+ * Simple {@link SlotAssigner} that treats all slots and slot sharing groups
equally. Specifically,
+ * when the cluster is deployed in application mode and the {@link
+ *
org.apache.flink.configuration.JobManagerOptions#SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED}
+ * is enabled, execution slot sharing groups are preferentially assigned to
the minimal number of
+ * task managers.
+ */
public class DefaultSlotAssigner implements SlotAssigner {
+ @VisibleForTesting static final String APPLICATION_MODE_EXECUTION_TARGET =
"embedded";
+
+ private final @Nullable String executionTarget;
+ private final boolean minimalTaskManagerPreferred;
+
+ DefaultSlotAssigner(@Nullable String executionTarget, boolean
minimalTaskManagerPreferred) {
+ this.executionTarget = executionTarget;
+ this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
+ }
+
@Override
public Collection<SlotAssignment> assignSlots(
JobInformation jobInformation,
Collection<? extends SlotInfo> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations) {
- List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
+ checkMinimumRequiredSlots(jobInformation, freeSlots);
+
+ final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
for (SlotSharingGroup slotSharingGroup :
jobInformation.getSlotSharingGroups()) {
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism,
slotSharingGroup));
}
- Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+ final Collection<? extends SlotInfo> pickedSlots =
+ pickSlotsIfNeeded(allGroups.size(), freeSlots);
+
+ Iterator<? extends SlotInfo> iterator = pickedSlots.iterator();
Collection<SlotAssignment> assignments = new ArrayList<>();
for (ExecutionSlotSharingGroup group : allGroups) {
assignments.add(new SlotAssignment(iterator.next(), group));
@@ -56,22 +83,67 @@ public class DefaultSlotAssigner implements SlotAssigner {
return assignments;
}
- static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
- VertexParallelism vertexParallelism, SlotSharingGroup
slotSharingGroup) {
- final Map<Integer, Set<ExecutionVertexID>>
sharedSlotToVertexAssignment = new HashMap<>();
- slotSharingGroup
- .getJobVertexIds()
- .forEach(
- jobVertexId -> {
- int parallelism =
vertexParallelism.getParallelism(jobVertexId);
- for (int subtaskIdx = 0; subtaskIdx < parallelism;
subtaskIdx++) {
- sharedSlotToVertexAssignment
- .computeIfAbsent(subtaskIdx, ignored
-> new HashSet<>())
- .add(new
ExecutionVertexID(jobVertexId, subtaskIdx));
- }
- });
- return sharedSlotToVertexAssignment.values().stream()
- .map(ExecutionSlotSharingGroup::new)
- .collect(Collectors.toList());
+ @VisibleForTesting
+ Collection<? extends SlotInfo> pickSlotsIfNeeded(
+ int requestExecutionSlotSharingGroups, Collection<? extends
SlotInfo> freeSlots) {
+ Collection<? extends SlotInfo> pickedSlots = freeSlots;
+ if (APPLICATION_MODE_EXECUTION_TARGET.equalsIgnoreCase(executionTarget)
+ && minimalTaskManagerPreferred
+ // To avoid the sort-work loading.
+ && freeSlots.size() > requestExecutionSlotSharingGroups) {
+ final Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsPerTaskExecutor =
+ getSlotsPerTaskExecutor(freeSlots);
+ pickedSlots =
+ pickSlotsInMinimalTaskExecutors(
+ slotsPerTaskExecutor,
requestExecutionSlotSharingGroups);
+ }
+ return pickedSlots;
+ }
+
+ /**
+ * In order to minimize the using of task executors at the resource
manager side in the
+ * application-mode and release more task executors in a timely manner, it
is a good choice to
+ * prioritize selecting slots on task executors with the most available
slots.
+ *
+ * @param slotsPerTaskExecutor The slots per task manager.
+ * @return The ordered task manager that orders by the number of free
slots descending.
+ */
+ private Iterator<TaskManagerLocation> getSortedTaskExecutors(
+ Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsPerTaskExecutor) {
+ final Comparator<TaskManagerLocation> taskExecutorComparator =
+ (leftTml, rightTml) ->
+ Integer.compare(
+ slotsPerTaskExecutor.get(rightTml).size(),
+ slotsPerTaskExecutor.get(leftTml).size());
+ return
slotsPerTaskExecutor.keySet().stream().sorted(taskExecutorComparator).iterator();
+ }
+
+ /**
+ * Pick the target slots to assign with the requested groups.
+ *
+ * @param slotsByTaskExecutor slots per task executor.
+ * @param requestedGroups the number of the request execution slot sharing
groups.
+ * @return the target slots that are distributed on the minimal task
executors.
+ */
+ private Collection<? extends SlotInfo> pickSlotsInMinimalTaskExecutors(
+ Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsByTaskExecutor,
+ int requestedGroups) {
+ final List<SlotInfo> pickedSlots = new ArrayList<>();
+ final Iterator<TaskManagerLocation> sortedTaskExecutors =
+ getSortedTaskExecutors(slotsByTaskExecutor);
+ while (pickedSlots.size() < requestedGroups) {
+ Set<? extends SlotInfo> slotInfos =
slotsByTaskExecutor.get(sortedTaskExecutors.next());
+ pickedSlots.addAll(slotInfos);
+ }
+ return pickedSlots;
+ }
+
+ private Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
getSlotsPerTaskExecutor(
+ Collection<? extends SlotInfo> slots) {
+ return slots.stream()
+ .collect(
+ Collectors.groupingBy(
+ SlotInfo::getTaskManagerLocation,
+ Collectors.mapping(identity(),
Collectors.toSet())));
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 8880bd2471e..6eae519a55b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -32,6 +32,7 @@ import
org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.util.ResourceCounter;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
@@ -46,28 +47,43 @@ import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.getMinimumRequiredSlots;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.getSlotSharingGroupMetaInfos;
+
/** {@link SlotAllocator} implementation that supports slot sharing. */
public class SlotSharingSlotAllocator implements SlotAllocator {
private final ReserveSlotFunction reserveSlotFunction;
private final FreeSlotFunction freeSlotFunction;
private final IsSlotAvailableAndFreeFunction
isSlotAvailableAndFreeFunction;
+ private final @Nullable String executionTarget;
+ private final boolean minimalTaskManagerPreferred;
private SlotSharingSlotAllocator(
ReserveSlotFunction reserveSlot,
FreeSlotFunction freeSlotFunction,
- IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction) {
+ IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
+ @Nullable String executionTarget,
+ boolean minimalTaskManagerPreferred) {
this.reserveSlotFunction = reserveSlot;
this.freeSlotFunction = freeSlotFunction;
this.isSlotAvailableAndFreeFunction = isSlotAvailableAndFreeFunction;
+ this.executionTarget = executionTarget;
+ this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
}
public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
ReserveSlotFunction reserveSlot,
FreeSlotFunction freeSlotFunction,
- IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction) {
+ IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
+ @Nullable String executionTarget,
+ boolean minimalTaskManagerPreferred) {
return new SlotSharingSlotAllocator(
- reserveSlot, freeSlotFunction, isSlotAvailableAndFreeFunction);
+ reserveSlot,
+ freeSlotFunction,
+ isSlotAvailableAndFreeFunction,
+ executionTarget,
+ minimalTaskManagerPreferred);
}
@Override
@@ -86,12 +102,9 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
JobInformation jobInformation, Collection<? extends SlotInfo>
freeSlots) {
final Map<SlotSharingGroupId, SlotSharingGroupMetaInfo>
slotSharingGroupMetaInfo =
- SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
+ getSlotSharingGroupMetaInfos(jobInformation);
- final int minimumRequiredSlots =
- slotSharingGroupMetaInfo.values().stream()
- .map(SlotSharingGroupMetaInfo::getMaxLowerBound)
- .reduce(0, Integer::sum);
+ final int minimumRequiredSlots =
getMinimumRequiredSlots(slotSharingGroupMetaInfo);
if (minimumRequiredSlots > freeSlots.size()) {
return Optional.empty();
@@ -132,7 +145,8 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
parallelism -> {
SlotAssigner slotAssigner =
jobAllocationsInformation.isEmpty()
- ? new DefaultSlotAssigner()
+ ? new DefaultSlotAssigner(
+ executionTarget,
minimalTaskManagerPreferred)
: new StateLocalitySlotAssigner();
return new JobSchedulingPlan(
parallelism,
@@ -299,7 +313,7 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
}
}
- private static class SlotSharingGroupMetaInfo {
+ static class SlotSharingGroupMetaInfo {
private final int minLowerBound;
private final int maxLowerBound;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
index 4e2a807c062..dcad351ed32 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
@@ -42,7 +42,8 @@ import java.util.stream.Collectors;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
-import static
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups;
import static org.apache.flink.util.Preconditions.checkState;
/** A {@link SlotAssigner} that assigns slots based on the number of local key
groups. */
@@ -94,11 +95,7 @@ public class StateLocalitySlotAssigner implements
SlotAssigner {
Collection<? extends SlotInfo> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations) {
- checkState(
- freeSlots.size() >=
jobInformation.getSlotSharingGroups().size(),
- "Not enough slots to allocate all the slot sharing groups
(have: %s, need: %s)",
- freeSlots.size(),
- jobInformation.getSlotSharingGroups().size());
+ checkMinimumRequiredSlots(jobInformation, freeSlots);
final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
for (SlotSharingGroup slotSharingGroup :
jobInformation.getSlotSharingGroups()) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index fca3c7a8548..1682886a3bf 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -19,6 +19,8 @@ package org.apache.flink.runtime.scheduler.adaptive;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
@@ -228,7 +230,11 @@ public class AdaptiveSchedulerBuilder {
declarativeSlotPool,
slotAllocator == null
?
AdaptiveSchedulerFactory.createSlotSharingSlotAllocator(
- declarativeSlotPool)
+ declarativeSlotPool,
+
jobMasterConfiguration.get(DeploymentOptions.TARGET),
+ jobMasterConfiguration.get(
+ JobManagerOptions
+
.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED))
: slotAllocator,
executorService,
userCodeLoader,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
new file mode 100644
index 00000000000..36e201a0499
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.APPLICATION_MODE_EXECUTION_TARGET;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultSlotAssigner}. */
+@ExtendWith(ParameterizedTestExtension.class)
+class DefaultSlotAssignerTest {
+
+ private static final TaskManagerLocation tml1 = new
LocalTaskManagerLocation();
+ private static final SlotInfo slot1OfTml1 = new TestSlotInfo(tml1);
+ private static final SlotInfo slot2OfTml1 = new TestSlotInfo(tml1);
+ private static final SlotInfo slot3OfTml1 = new TestSlotInfo(tml1);
+
+ private static final TaskManagerLocation tml2 = new
LocalTaskManagerLocation();
+ private static final SlotInfo slot1OfTml2 = new TestSlotInfo(tml2);
+ private static final SlotInfo slot2OfTml2 = new TestSlotInfo(tml2);
+ private static final SlotInfo slot3OfTml2 = new TestSlotInfo(tml2);
+
+ private static final TaskManagerLocation tml3 = new
LocalTaskManagerLocation();
+ private static final SlotInfo slot1OfTml3 = new TestSlotInfo(tml3);
+ private static final SlotInfo slot2OfTml3 = new TestSlotInfo(tml3);
+
+ private static final List<SlotInfo> allSlots =
+ Arrays.asList(
+ slot1OfTml1,
+ slot2OfTml1,
+ slot3OfTml1,
+ slot1OfTml2,
+ slot2OfTml2,
+ slot3OfTml2,
+ slot1OfTml3,
+ slot2OfTml3);
+
+ @Parameter int parallelism;
+
+ @Parameter(value = 1)
+ Collection<? extends SlotInfo> freeSlots;
+
+ @Parameter(value = 2)
+ List<TaskManagerLocation> expectedTaskManagerLocations;
+
+ @TestTemplate
+ void testPickSlotsIfNeeded() {
+ final DefaultSlotAssigner slotAssigner =
+ new DefaultSlotAssigner(APPLICATION_MODE_EXECUTION_TARGET,
true);
+ final Set<TaskManagerLocation> keptTaskExecutors =
+ slotAssigner.pickSlotsIfNeeded(parallelism, freeSlots).stream()
+ .map(SlotInfo::getTaskManagerLocation)
+ .collect(Collectors.toSet());
+ assertThat(expectedTaskManagerLocations)
+ .containsExactlyInAnyOrderElementsOf(keptTaskExecutors);
+ }
+
+ @Parameters(name = "parallelism={0}, freeSlots={1},
expectedTaskManagerLocations={2}")
+ private static Collection<Object[]> getTestingParameters() {
+ return Arrays.asList(
+ new Object[] {
+ 4,
+ Arrays.asList(slot1OfTml1, slot2OfTml1, slot1OfTml2,
slot2OfTml3),
+ Arrays.asList(tml1, tml2, tml3)
+ },
+ new Object[] {
+ 2,
+ Arrays.asList(slot1OfTml1, slot2OfTml1, slot1OfTml2,
slot2OfTml3),
+ singletonList(tml1)
+ },
+ new Object[] {
+ 3,
+ Arrays.asList(slot1OfTml1, slot1OfTml2, slot2OfTml2,
slot3OfTml2),
+ Arrays.asList(tml2)
+ },
+ new Object[] {7, allSlots, Arrays.asList(tml1, tml2, tml3)});
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
index e6409553aa6..b210a673474 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
@@ -61,6 +61,8 @@ class SlotSharingSlotAllocatorTest {
.build();
private static final IsSlotAvailableAndFreeFunction
TEST_IS_SLOT_FREE_FUNCTION =
ignored -> true;
+ private static final String NULL_EXECUTION_TARGET = null;
+ private static final boolean MINIMAL_TASK_MANAGER_PREFERRED_DISABLED =
true;
private static final SlotSharingGroup slotSharingGroup1 = new
SlotSharingGroup();
private static final SlotSharingGroup slotSharingGroup2 = new
SlotSharingGroup();
@@ -77,7 +79,9 @@ class SlotSharingSlotAllocatorTest {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
- TEST_IS_SLOT_FREE_FUNCTION);
+ TEST_IS_SLOT_FREE_FUNCTION,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final ResourceCounter resourceCounter =
slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1,
vertex2, vertex3));
@@ -95,7 +99,9 @@ class SlotSharingSlotAllocatorTest {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
- TEST_IS_SLOT_FREE_FUNCTION);
+ TEST_IS_SLOT_FREE_FUNCTION,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -114,7 +120,9 @@ class SlotSharingSlotAllocatorTest {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
- TEST_IS_SLOT_FREE_FUNCTION);
+ TEST_IS_SLOT_FREE_FUNCTION,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -136,7 +144,9 @@ class SlotSharingSlotAllocatorTest {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
- TEST_IS_SLOT_FREE_FUNCTION);
+ TEST_IS_SLOT_FREE_FUNCTION,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
final JobInformation.VertexInformation vertex11 =
new TestVertexInformation(new JobVertexID(), 4,
slotSharingGroup1);
@@ -169,7 +179,9 @@ class SlotSharingSlotAllocatorTest {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
- TEST_IS_SLOT_FREE_FUNCTION);
+ TEST_IS_SLOT_FREE_FUNCTION,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -186,7 +198,9 @@ class SlotSharingSlotAllocatorTest {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
- TEST_IS_SLOT_FREE_FUNCTION);
+ TEST_IS_SLOT_FREE_FUNCTION,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 1, 8, new
SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
@@ -214,7 +228,9 @@ class SlotSharingSlotAllocatorTest {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
- TEST_IS_SLOT_FREE_FUNCTION);
+ TEST_IS_SLOT_FREE_FUNCTION,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 4, 4, new
SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
@@ -235,7 +251,9 @@ class SlotSharingSlotAllocatorTest {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
- TEST_IS_SLOT_FREE_FUNCTION);
+ TEST_IS_SLOT_FREE_FUNCTION,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 2, 2,
slotSharingGroup);
@@ -256,7 +274,9 @@ class SlotSharingSlotAllocatorTest {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
- TEST_IS_SLOT_FREE_FUNCTION);
+ TEST_IS_SLOT_FREE_FUNCTION,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 4, 10, new
SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
@@ -286,7 +306,9 @@ class SlotSharingSlotAllocatorTest {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
- TEST_IS_SLOT_FREE_FUNCTION);
+ TEST_IS_SLOT_FREE_FUNCTION,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 4, 4, new
SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
@@ -316,7 +338,9 @@ class SlotSharingSlotAllocatorTest {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
- TEST_IS_SLOT_FREE_FUNCTION);
+ TEST_IS_SLOT_FREE_FUNCTION,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -357,7 +381,11 @@ class SlotSharingSlotAllocatorTest {
void testReserveUnavailableResources() {
final SlotSharingSlotAllocator slotSharingSlotAllocator =
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
- TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION,
ignored -> false);
+ TEST_RESERVE_SLOT_FUNCTION,
+ TEST_FREE_SLOT_FUNCTION,
+ ignored -> false,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -413,7 +441,9 @@ class SlotSharingSlotAllocatorTest {
(allocationId, resourceProfile) ->
TestingPhysicalSlot.builder().build(),
(allocationID, cause, ts) -> {},
- id -> false)
+ id -> false,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED)
.determineParallelismAndCalculateAssignment(
new TestJobInformation(Arrays.asList(vertex1,
vertex2, vertex3)),
freeSlots,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
index 9f3573a0705..fb9161df25e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
@@ -28,6 +28,7 @@ public class TestSlotInfo implements SlotInfo {
private final AllocationID allocationId;
private final ResourceProfile resourceProfile;
+ private final TaskManagerLocation taskManagerLocation;
public TestSlotInfo() {
this(new AllocationID(), ResourceProfile.ANY);
@@ -41,9 +42,21 @@ public class TestSlotInfo implements SlotInfo {
this(new AllocationID(), resourceProfile);
}
+ public TestSlotInfo(TaskManagerLocation tml) {
+ this(new AllocationID(), ResourceProfile.ANY, tml);
+ }
+
public TestSlotInfo(AllocationID allocationId, ResourceProfile
resourceProfile) {
+ this(allocationId, resourceProfile, new LocalTaskManagerLocation());
+ }
+
+ private TestSlotInfo(
+ AllocationID allocationId,
+ ResourceProfile resourceProfile,
+ TaskManagerLocation taskManagerLocation) {
this.allocationId = allocationId;
this.resourceProfile = resourceProfile;
+ this.taskManagerLocation = taskManagerLocation;
}
@Override
@@ -53,7 +66,7 @@ public class TestSlotInfo implements SlotInfo {
@Override
public TaskManagerLocation getTaskManagerLocation() {
- return new LocalTaskManagerLocation();
+ return taskManagerLocation;
}
@Override