This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1ac4f3d182c [FLINK-33977][runtime] Support minimize TM number during
downscaling in adaptive scheduler
1ac4f3d182c is described below
commit 1ac4f3d182cba946663d69dc180a6875f17ab542
Author: Yuepeng Pan <[email protected]>
AuthorDate: Thu May 15 23:28:25 2025 +0800
[FLINK-33977][runtime] Support minimize TM number during downscaling in
adaptive scheduler
Also enable this strategy by default via the introduced config option
---
.../generated/all_jobmanager_section.html | 6 ++
.../generated/expert_scheduling_section.html | 6 ++
.../generated/job_manager_configuration.html | 6 ++
.../flink/configuration/JobManagerOptions.java | 27 +++++
.../adaptive/AdaptiveSchedulerFactory.java | 17 ++-
...DefaultSlotAssigner.java => AllocatorUtil.java} | 57 +++++-----
.../adaptive/allocator/DefaultSlotAssigner.java | 118 +++++++++++++++++----
.../allocator/SlotSharingSlotAllocator.java | 32 ++++--
.../allocator/StateLocalitySlotAssigner.java | 9 +-
.../adaptive/AdaptiveSchedulerBuilder.java | 8 +-
.../allocator/DefaultSlotAssignerTest.java | 109 +++++++++++++++++++
.../allocator/SlotSharingSlotAllocatorTest.java | 54 +++++++---
.../scheduler/adaptive/allocator/TestingSlot.java | 15 ++-
13 files changed, 382 insertions(+), 82 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
index d1df33a72f3..33a16edd648 100644
--- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
@@ -20,6 +20,12 @@
<td>Duration</td>
<td>Defines the duration the JobManager delays the scaling
operation after a resource change if only sufficient resources are available.
The scaling operation is performed immediately if the resources have changed
and the desired resources are available. The timeout begins as soon as either
the available resources or the job's resource requirements are changed.<br
/>The resource requirements of a running job can be changed using the <a
href="{{.Site.BaseURL}}{{.Site.LanguageP [...]
</tr>
+ <tr>
+
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>This parameter defines whether the adaptive scheduler
prioritizes using the minimum number of <code
class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note,
this parameter is suitable if <code
class="highlighter-rouge">execution.state-recovery.from-local</code> is not
enabled. More details about this configuration are available at <a
href="https://issues.apache.org/jira/browse/FLINK-33977">FLINK-33977</a>.</td>
+ </tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.rescale-trigger.max-checkpoint-failures</h5></td>
<td style="word-wrap: break-word;">2</td>
diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index f03d0eabb73..24c547eb562 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -98,6 +98,12 @@
<td>Duration</td>
<td>Defines the duration the JobManager delays the scaling
operation after a resource change if only sufficient resources are available.
The scaling operation is performed immediately if the resources have changed
and the desired resources are available. The timeout begins as soon as either
the available resources or the job's resource requirements are changed.<br
/>The resource requirements of a running job can be changed using the <a
href="{{.Site.BaseURL}}{{.Site.LanguageP [...]
</tr>
+ <tr>
+
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>This parameter defines whether the adaptive scheduler
prioritizes using the minimum number of <code
class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note,
this parameter is suitable if <code
class="highlighter-rouge">execution.state-recovery.from-local</code> is not
enabled. More details about this configuration are available at <a
href="https://issues.apache.org/jira/browse/FLINK-33977">FLINK-33977</a>.</td>
+ </tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.rescale-trigger.max-checkpoint-failures</h5></td>
<td style="word-wrap: break-word;">2</td>
diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html
b/docs/layouts/shortcodes/generated/job_manager_configuration.html
index e423e49ab2f..c7669226f17 100644
--- a/docs/layouts/shortcodes/generated/job_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html
@@ -20,6 +20,12 @@
<td>Duration</td>
<td>Defines the duration the JobManager delays the scaling
operation after a resource change if only sufficient resources are available.
The scaling operation is performed immediately if the resources have changed
and the desired resources are available. The timeout begins as soon as either
the available resources or the job's resource requirements are changed.<br
/>The resource requirements of a running job can be changed using the <a
href="{{.Site.BaseURL}}{{.Site.LanguageP [...]
</tr>
+ <tr>
+
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>This parameter defines whether the adaptive scheduler
prioritizes using the minimum number of <code
class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note,
this parameter is suitable if <code
class="highlighter-rouge">execution.state-recovery.from-local</code> is not
enabled. More details about this configuration are available at <a
href="https://issues.apache.org/jira/browse/FLINK-33977">FLINK-33977</a>.</td>
+ </tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.rescale-trigger.max-checkpoint-failures</h5></td>
<td style="word-wrap: break-word;">2</td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 13245fb87ea..85f048edd32 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -18,6 +18,7 @@
package org.apache.flink.configuration;
+import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
@@ -30,6 +31,7 @@ import static
org.apache.flink.configuration.ConfigOptions.key;
import static
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.ALL_PRODUCERS_FINISHED;
import static
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS;
import static
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.UNFINISHED_PRODUCERS;
+import static
org.apache.flink.configuration.StateRecoveryOptions.LOCAL_RECOVERY;
import static org.apache.flink.configuration.description.LinkElement.link;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.configuration.description.TextElement.text;
@@ -708,6 +710,31 @@ public class JobManagerOptions {
code(SchedulerExecutionMode.REACTIVE.name()))
.build());
+ @Experimental
+ @Documentation.Section({
+ Documentation.Sections.EXPERT_SCHEDULING,
+ Documentation.Sections.ALL_JOB_MANAGER
+ })
+ public static final ConfigOption<Boolean>
SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED =
+ key("jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ Description.builder()
+ .text(
+ "This parameter defines whether
the adaptive scheduler prioritizes "
+ + "using the minimum
number of %s when scheduling tasks.",
+ code("TaskManagers"))
+ .linebreak()
+ .text(
+ "Note, this parameter is suitable
if %s is not enabled. "
+ + "More details about this
configuration are available at %s.",
+ code(LOCAL_RECOVERY.key()),
+ link(
+
"https://issues.apache.org/jira/browse/FLINK-33977",
+ "FLINK-33977"))
+ .build());
+
/**
* Config parameter controlling whether partitions should already be
released during the job
* execution.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
index 38261d4d8df..9f182bf3587 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.scheduler.adaptive;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.failure.FailureEnricher;
@@ -50,6 +51,8 @@ import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
+import javax.annotation.Nullable;
+
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.Executor;
@@ -117,7 +120,10 @@ public class AdaptiveSchedulerFactory implements
SchedulerNGFactory {
final SlotSharingSlotAllocator slotAllocator =
createSlotSharingSlotAllocator(
declarativeSlotPool,
-
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY));
+
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY),
+ jobMasterConfiguration.get(DeploymentOptions.TARGET),
+ jobMasterConfiguration.get(
+
JobManagerOptions.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED));
final ExecutionGraphFactory executionGraphFactory =
new DefaultExecutionGraphFactory(
@@ -160,11 +166,16 @@ public class AdaptiveSchedulerFactory implements
SchedulerNGFactory {
}
public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
- DeclarativeSlotPool declarativeSlotPool, boolean
localRecoveryEnabled) {
+ DeclarativeSlotPool declarativeSlotPool,
+ boolean localRecoveryEnabled,
+ @Nullable String executionTarget,
+ boolean minimalTaskManagerPreferred) {
return SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
declarativeSlotPool::reserveFreeSlot,
declarativeSlotPool::freeReservedSlot,
declarativeSlotPool::containsFreeSlot,
- localRecoveryEnabled);
+ localRecoveryEnabled,
+ executionTarget,
+ minimalTaskManagerPreferred);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
similarity index 55%
copy from
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
copy to
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
index 0a8813b33a6..d516aae2596 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
@@ -18,46 +18,53 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotInfo;
-import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
-import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups
equally. */
-public class DefaultSlotAssigner implements SlotAssigner {
+import static org.apache.flink.util.Preconditions.checkState;
- @Override
- public Collection<SlotAssignment> assignSlots(
- JobInformation jobInformation,
- Collection<? extends SlotInfo> freeSlots,
- VertexParallelism vertexParallelism,
- JobAllocationsInformation previousAllocations) {
- List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
- for (SlotSharingGroup slotSharingGroup :
jobInformation.getSlotSharingGroups()) {
-
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism,
slotSharingGroup));
- }
+/** The allocator util class. */
+class AllocatorUtil {
- Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
- Collection<SlotAssignment> assignments = new ArrayList<>();
- for (ExecutionSlotSharingGroup group : allGroups) {
- assignments.add(new SlotAssignment(iterator.next(), group));
- }
- return assignments;
+ private AllocatorUtil() {}
+
+ static Map<SlotSharingGroupId,
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
+ getSlotSharingGroupMetaInfos(JobInformation jobInformation) {
+ return
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
+ }
+
+ static int getMinimumRequiredSlots(
+ Map<SlotSharingGroupId,
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
+ slotSharingGroupMetaInfos) {
+ return slotSharingGroupMetaInfos.values().stream()
+
.map(SlotSharingSlotAllocator.SlotSharingGroupMetaInfo::getMaxLowerBound)
+ .reduce(0, Integer::sum);
+ }
+
+ static void checkMinimumRequiredSlots(
+ JobInformation jobInformation, Collection<? extends SlotInfo>
freeSlots) {
+ final int minimumRequiredSlots =
+
getMinimumRequiredSlots(getSlotSharingGroupMetaInfos(jobInformation));
+ checkState(
+ freeSlots.size() >= minimumRequiredSlots,
+ "Not enough slots to allocate all the execution slot sharing
groups (have: %s, need: %s)",
+ freeSlots.size(),
+ minimumRequiredSlots);
}
- static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
- VertexParallelism vertexParallelism, SlotSharingGroup
slotSharingGroup) {
+ static List<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
+ createExecutionSlotSharingGroups(
+ VertexParallelism vertexParallelism, SlotSharingGroup
slotSharingGroup) {
final Map<Integer, Set<ExecutionVertexID>>
sharedSlotToVertexAssignment = new HashMap<>();
slotSharingGroup
.getJobVertexIds()
@@ -71,7 +78,7 @@ public class DefaultSlotAssigner implements SlotAssigner {
}
});
return sharedSlotToVertexAssignment.values().stream()
- .map(ExecutionSlotSharingGroup::new)
+ .map(SlotSharingSlotAllocator.ExecutionSlotSharingGroup::new)
.collect(Collectors.toList());
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
index 0a8813b33a6..2dc0f6dac56 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
@@ -18,37 +18,64 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups
equally. */
+import static java.util.function.Function.identity;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups;
+
+/**
+ * Simple {@link SlotAssigner} that treats all slots and slot sharing groups
equally. Specifically,
+ * when the cluster is deployed in application mode and the {@link
+ *
org.apache.flink.configuration.JobManagerOptions#SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED}
+ * is enabled, execution slot sharing groups are preferentially assigned to
the minimal number of
+ * task managers.
+ */
public class DefaultSlotAssigner implements SlotAssigner {
+ @VisibleForTesting static final String APPLICATION_MODE_EXECUTION_TARGET =
"embedded";
+
+ private final @Nullable String executionTarget;
+ private final boolean minimalTaskManagerPreferred;
+
+ DefaultSlotAssigner(@Nullable String executionTarget, boolean
minimalTaskManagerPreferred) {
+ this.executionTarget = executionTarget;
+ this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
+ }
+
@Override
public Collection<SlotAssignment> assignSlots(
JobInformation jobInformation,
Collection<? extends SlotInfo> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations) {
- List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
+ checkMinimumRequiredSlots(jobInformation, freeSlots);
+
+ final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
for (SlotSharingGroup slotSharingGroup :
jobInformation.getSlotSharingGroups()) {
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism,
slotSharingGroup));
}
- Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+ final Collection<? extends SlotInfo> pickedSlots =
+ pickSlotsIfNeeded(allGroups.size(), freeSlots);
+
+ Iterator<? extends SlotInfo> iterator = pickedSlots.iterator();
Collection<SlotAssignment> assignments = new ArrayList<>();
for (ExecutionSlotSharingGroup group : allGroups) {
assignments.add(new SlotAssignment(iterator.next(), group));
@@ -56,22 +83,67 @@ public class DefaultSlotAssigner implements SlotAssigner {
return assignments;
}
- static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
- VertexParallelism vertexParallelism, SlotSharingGroup
slotSharingGroup) {
- final Map<Integer, Set<ExecutionVertexID>>
sharedSlotToVertexAssignment = new HashMap<>();
- slotSharingGroup
- .getJobVertexIds()
- .forEach(
- jobVertexId -> {
- int parallelism =
vertexParallelism.getParallelism(jobVertexId);
- for (int subtaskIdx = 0; subtaskIdx < parallelism;
subtaskIdx++) {
- sharedSlotToVertexAssignment
- .computeIfAbsent(subtaskIdx, ignored
-> new HashSet<>())
- .add(new
ExecutionVertexID(jobVertexId, subtaskIdx));
- }
- });
- return sharedSlotToVertexAssignment.values().stream()
- .map(ExecutionSlotSharingGroup::new)
- .collect(Collectors.toList());
+ @VisibleForTesting
+ Collection<? extends SlotInfo> pickSlotsIfNeeded(
+ int requestExecutionSlotSharingGroups, Collection<? extends
SlotInfo> freeSlots) {
+ Collection<? extends SlotInfo> pickedSlots = freeSlots;
+ if (APPLICATION_MODE_EXECUTION_TARGET.equalsIgnoreCase(executionTarget)
+ && minimalTaskManagerPreferred
+ // To avoid the sort-work loading.
+ && freeSlots.size() > requestExecutionSlotSharingGroups) {
+ final Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsPerTaskExecutor =
+ getSlotsPerTaskExecutor(freeSlots);
+ pickedSlots =
+ pickSlotsInMinimalTaskExecutors(
+ slotsPerTaskExecutor,
requestExecutionSlotSharingGroups);
+ }
+ return pickedSlots;
+ }
+
+ /**
+ * In order to minimize the using of task executors at the resource
manager side in the
+ * application-mode and release more task executors in a timely manner, it
is a good choice to
+ * prioritize selecting slots on task executors with the most available
slots.
+ *
+ * @param slotsPerTaskExecutor The slots per task manager.
+ * @return The ordered task manager that orders by the number of free
slots descending.
+ */
+ private Iterator<TaskManagerLocation> getSortedTaskExecutors(
+ Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsPerTaskExecutor) {
+ final Comparator<TaskManagerLocation> taskExecutorComparator =
+ (leftTml, rightTml) ->
+ Integer.compare(
+ slotsPerTaskExecutor.get(rightTml).size(),
+ slotsPerTaskExecutor.get(leftTml).size());
+ return
slotsPerTaskExecutor.keySet().stream().sorted(taskExecutorComparator).iterator();
+ }
+
+ /**
+ * Pick the target slots to assign with the requested groups.
+ *
+ * @param slotsByTaskExecutor slots per task executor.
+ * @param requestedGroups the number of the request execution slot sharing
groups.
+ * @return the target slots that are distributed on the minimal task
executors.
+ */
+ private Collection<? extends SlotInfo> pickSlotsInMinimalTaskExecutors(
+ Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsByTaskExecutor,
+ int requestedGroups) {
+ final List<SlotInfo> pickedSlots = new ArrayList<>();
+ final Iterator<TaskManagerLocation> sortedTaskExecutors =
+ getSortedTaskExecutors(slotsByTaskExecutor);
+ while (pickedSlots.size() < requestedGroups) {
+ Set<? extends SlotInfo> slotInfos =
slotsByTaskExecutor.get(sortedTaskExecutors.next());
+ pickedSlots.addAll(slotInfos);
+ }
+ return pickedSlots;
+ }
+
+ private Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
getSlotsPerTaskExecutor(
+ Collection<? extends SlotInfo> slots) {
+ return slots.stream()
+ .collect(
+ Collectors.groupingBy(
+ SlotInfo::getTaskManagerLocation,
+ Collectors.mapping(identity(),
Collectors.toSet())));
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 4b002ced5b0..425159cfe0f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -32,6 +32,7 @@ import
org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.util.ResourceCounter;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
@@ -46,6 +47,9 @@ import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.getMinimumRequiredSlots;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.getSlotSharingGroupMetaInfos;
+
/** {@link SlotAllocator} implementation that supports slot sharing. */
public class SlotSharingSlotAllocator implements SlotAllocator {
@@ -53,28 +57,38 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
private final FreeSlotFunction freeSlotFunction;
private final IsSlotAvailableAndFreeFunction
isSlotAvailableAndFreeFunction;
private final boolean localRecoveryEnabled;
+ private final @Nullable String executionTarget;
+ private final boolean minimalTaskManagerPreferred;
private SlotSharingSlotAllocator(
ReserveSlotFunction reserveSlot,
FreeSlotFunction freeSlotFunction,
IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
- boolean localRecoveryEnabled) {
+ boolean localRecoveryEnabled,
+ @Nullable String executionTarget,
+ boolean minimalTaskManagerPreferred) {
this.reserveSlotFunction = reserveSlot;
this.freeSlotFunction = freeSlotFunction;
this.isSlotAvailableAndFreeFunction = isSlotAvailableAndFreeFunction;
this.localRecoveryEnabled = localRecoveryEnabled;
+ this.executionTarget = executionTarget;
+ this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
}
public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
ReserveSlotFunction reserveSlot,
FreeSlotFunction freeSlotFunction,
IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
- boolean localRecoveryEnabled) {
+ boolean localRecoveryEnabled,
+ @Nullable String executionTarget,
+ boolean minimalTaskManagerPreferred) {
return new SlotSharingSlotAllocator(
reserveSlot,
freeSlotFunction,
isSlotAvailableAndFreeFunction,
- localRecoveryEnabled);
+ localRecoveryEnabled,
+ executionTarget,
+ minimalTaskManagerPreferred);
}
@Override
@@ -93,12 +107,9 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
JobInformation jobInformation, Collection<? extends SlotInfo>
freeSlots) {
final Map<SlotSharingGroupId, SlotSharingGroupMetaInfo>
slotSharingGroupMetaInfo =
- SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
+ getSlotSharingGroupMetaInfos(jobInformation);
- final int minimumRequiredSlots =
- slotSharingGroupMetaInfo.values().stream()
- .map(SlotSharingGroupMetaInfo::getMaxLowerBound)
- .reduce(0, Integer::sum);
+ final int minimumRequiredSlots =
getMinimumRequiredSlots(slotSharingGroupMetaInfo);
if (minimumRequiredSlots > freeSlots.size()) {
return Optional.empty();
@@ -140,7 +151,8 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
SlotAssigner slotAssigner =
localRecoveryEnabled &&
!jobAllocationsInformation.isEmpty()
? new StateLocalitySlotAssigner()
- : new DefaultSlotAssigner();
+ : new DefaultSlotAssigner(
+ executionTarget,
minimalTaskManagerPreferred);
return new JobSchedulingPlan(
parallelism,
slotAssigner.assignSlots(
@@ -306,7 +318,7 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
}
}
- private static class SlotSharingGroupMetaInfo {
+ static class SlotSharingGroupMetaInfo {
private final int minLowerBound;
private final int maxLowerBound;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
index a9f119c5607..458b15c1041 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
@@ -43,7 +43,8 @@ import java.util.stream.Collectors;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
-import static
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups;
import static org.apache.flink.util.Preconditions.checkState;
/** A {@link SlotAssigner} that assigns slots based on the number of local key
groups. */
@@ -95,11 +96,7 @@ public class StateLocalitySlotAssigner implements
SlotAssigner {
Collection<? extends SlotInfo> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations) {
- checkState(
- freeSlots.size() >=
jobInformation.getSlotSharingGroups().size(),
- "Not enough slots to allocate all the slot sharing groups
(have: %s, need: %s)",
- freeSlots.size(),
- jobInformation.getSlotSharingGroups().size());
+ checkMinimumRequiredSlots(jobInformation, freeSlots);
final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
for (SlotSharingGroup slotSharingGroup :
jobInformation.getSlotSharingGroups()) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index 5762528bb96..b6626e97ace 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.scheduler.adaptive;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
@@ -272,7 +274,11 @@ public class AdaptiveSchedulerBuilder {
slotAllocator == null
?
AdaptiveSchedulerFactory.createSlotSharingSlotAllocator(
declarativeSlotPool,
-
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY))
+
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY),
+
jobMasterConfiguration.get(DeploymentOptions.TARGET),
+ jobMasterConfiguration.get(
+ JobManagerOptions
+
.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED))
: slotAllocator,
executorService,
userCodeLoader,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
new file mode 100644
index 00000000000..60a1067efb0
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.APPLICATION_MODE_EXECUTION_TARGET;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultSlotAssigner}. */
+@ExtendWith(ParameterizedTestExtension.class)
+class DefaultSlotAssignerTest {
+
+ private static final TaskManagerLocation tml1 = new
LocalTaskManagerLocation();
+ private static final SlotInfo slot1OfTml1 = new TestingSlot(tml1);
+ private static final SlotInfo slot2OfTml1 = new TestingSlot(tml1);
+ private static final SlotInfo slot3OfTml1 = new TestingSlot(tml1);
+
+ private static final TaskManagerLocation tml2 = new
LocalTaskManagerLocation();
+ private static final SlotInfo slot1OfTml2 = new TestingSlot(tml2);
+ private static final SlotInfo slot2OfTml2 = new TestingSlot(tml2);
+ private static final SlotInfo slot3OfTml2 = new TestingSlot(tml2);
+
+ private static final TaskManagerLocation tml3 = new
LocalTaskManagerLocation();
+ private static final SlotInfo slot1OfTml3 = new TestingSlot(tml3);
+ private static final SlotInfo slot2OfTml3 = new TestingSlot(tml3);
+
+ private static final List<SlotInfo> allSlots =
+ Arrays.asList(
+ slot1OfTml1,
+ slot2OfTml1,
+ slot3OfTml1,
+ slot1OfTml2,
+ slot2OfTml2,
+ slot3OfTml2,
+ slot1OfTml3,
+ slot2OfTml3);
+
+ @Parameter int parallelism;
+
+ @Parameter(value = 1)
+ Collection<? extends SlotInfo> freeSlots;
+
+ @Parameter(value = 2)
+ List<TaskManagerLocation> expectedTaskManagerLocations;
+
+ @TestTemplate
+ void testPickSlotsIfNeeded() {
+ final DefaultSlotAssigner slotAssigner =
+ new DefaultSlotAssigner(APPLICATION_MODE_EXECUTION_TARGET,
true);
+ final Set<TaskManagerLocation> keptTaskExecutors =
+ slotAssigner.pickSlotsIfNeeded(parallelism, freeSlots).stream()
+ .map(SlotInfo::getTaskManagerLocation)
+ .collect(Collectors.toSet());
+ assertThat(expectedTaskManagerLocations)
+ .containsExactlyInAnyOrderElementsOf(keptTaskExecutors);
+ }
+
+ @Parameters(name = "parallelism={0}, freeSlots={1},
expectedTaskManagerLocations={2}")
+ private static Collection<Object[]> getTestingParameters() {
+ return Arrays.asList(
+ new Object[] {
+ 4,
+ Arrays.asList(slot1OfTml1, slot2OfTml1, slot1OfTml2,
slot2OfTml3),
+ Arrays.asList(tml1, tml2, tml3)
+ },
+ new Object[] {
+ 2,
+ Arrays.asList(slot1OfTml1, slot2OfTml1, slot1OfTml2,
slot2OfTml3),
+ singletonList(tml1)
+ },
+ new Object[] {
+ 3,
+ Arrays.asList(slot1OfTml1, slot1OfTml2, slot2OfTml2,
slot3OfTml2),
+ Arrays.asList(tml2)
+ },
+ new Object[] {7, allSlots, Arrays.asList(tml1, tml2, tml3)});
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
index 9747193eac4..f6260f4fb7e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
@@ -62,6 +62,8 @@ class SlotSharingSlotAllocatorTest {
private static final IsSlotAvailableAndFreeFunction
TEST_IS_SLOT_FREE_FUNCTION =
ignored -> true;
private static final boolean DISABLE_LOCAL_RECOVERY = false;
+ private static final String NULL_EXECUTION_TARGET = null;
+ private static final boolean MINIMAL_TASK_MANAGER_PREFERRED_DISABLED =
true;
private static final SlotSharingGroup slotSharingGroup1 = new
SlotSharingGroup();
private static final SlotSharingGroup slotSharingGroup2 = new
SlotSharingGroup();
@@ -79,7 +81,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final ResourceCounter resourceCounter =
slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1,
vertex2, vertex3));
@@ -98,7 +102,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -118,7 +124,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -141,7 +149,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
final JobInformation.VertexInformation vertex11 =
new TestVertexInformation(new JobVertexID(), 4,
slotSharingGroup1);
@@ -175,7 +185,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -193,7 +205,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 1, 8, new
SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
@@ -222,7 +236,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 4, 4, new
SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
@@ -244,7 +260,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 2, 2,
slotSharingGroup);
@@ -266,7 +284,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 4, 10, new
SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
@@ -297,7 +317,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 4, 4, new
SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
@@ -328,7 +350,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -372,7 +396,9 @@ class SlotSharingSlotAllocatorTest {
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
ignored -> false,
- DISABLE_LOCAL_RECOVERY);
+ DISABLE_LOCAL_RECOVERY,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -438,7 +464,9 @@ class SlotSharingSlotAllocatorTest {
TestingPhysicalSlot.builder().build(),
(allocationID, cause, ts) -> {},
id -> false,
- true)
+ true,
+ NULL_EXECUTION_TARGET,
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED)
.determineParallelismAndCalculateAssignment(
new TestJobInformation(Arrays.asList(vertex1,
vertex2, vertex3)),
freeSlots,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
index 85133a83678..0976ac4881e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
@@ -29,6 +29,7 @@ public class TestingSlot implements PhysicalSlot {
private final AllocationID allocationId;
private final ResourceProfile resourceProfile;
+ private final TaskManagerLocation taskManagerLocation;
public TestingSlot() {
this(new AllocationID(), ResourceProfile.ANY);
@@ -42,9 +43,21 @@ public class TestingSlot implements PhysicalSlot {
this(new AllocationID(), resourceProfile);
}
+ public TestingSlot(TaskManagerLocation tml) {
+ this(new AllocationID(), ResourceProfile.ANY, tml);
+ }
+
public TestingSlot(AllocationID allocationId, ResourceProfile
resourceProfile) {
+ this(allocationId, resourceProfile, new LocalTaskManagerLocation());
+ }
+
+ private TestingSlot(
+ AllocationID allocationId,
+ ResourceProfile resourceProfile,
+ TaskManagerLocation taskManagerLocation) {
this.allocationId = allocationId;
this.resourceProfile = resourceProfile;
+ this.taskManagerLocation = taskManagerLocation;
}
@Override
@@ -54,7 +67,7 @@ public class TestingSlot implements PhysicalSlot {
@Override
public TaskManagerLocation getTaskManagerLocation() {
- return new LocalTaskManagerLocation();
+ return taskManagerLocation;
}
@Override