This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new 8cd774f7fcf [FLINK-33977][runtime] Support minimize TM number during 
downscaling in adaptive scheduler
8cd774f7fcf is described below

commit 8cd774f7fcf71275a19fc0d7b5bb21a1ad90bd98
Author: Yuepeng Pan <[email protected]>
AuthorDate: Thu May 15 23:25:10 2025 +0800

    [FLINK-33977][runtime] Support minimize TM number during downscaling in 
adaptive scheduler
    
    Also enable this strategy by default via the introduced config option
---
 .../generated/all_jobmanager_section.html          |   6 ++
 .../generated/expert_scheduling_section.html       |   6 ++
 .../generated/job_manager_configuration.html       |   6 ++
 .../flink/configuration/JobManagerOptions.java     |  27 +++++
 .../adaptive/AdaptiveSchedulerFactory.java         |  17 ++-
 ...DefaultSlotAssigner.java => AllocatorUtil.java} |  57 +++++-----
 .../adaptive/allocator/DefaultSlotAssigner.java    | 118 +++++++++++++++++----
 .../allocator/SlotSharingSlotAllocator.java        |  32 ++++--
 .../allocator/StateLocalitySlotAssigner.java       |   9 +-
 .../adaptive/AdaptiveSchedulerBuilder.java         |   8 +-
 .../allocator/DefaultSlotAssignerTest.java         | 109 +++++++++++++++++++
 .../allocator/SlotSharingSlotAllocatorTest.java    |  54 +++++++---
 .../scheduler/adaptive/allocator/TestSlotInfo.java |  15 ++-
 13 files changed, 382 insertions(+), 82 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html 
b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
index 626aa50f386..e9efa60e78e 100644
--- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
@@ -14,6 +14,12 @@
             <td>Integer</td>
             <td>Configure the minimum increase in parallelism for a job to 
scale up.</td>
         </tr>
+        <tr>
+            
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>This parameter defines whether the adaptive scheduler 
prioritizes using the minimum number of <code 
class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note, 
this parameter is suitable if <code 
class="highlighter-rouge">execution.state-recovery.from-local</code> is not 
enabled. More details about this configuration are available at <a 
href="https://issues.apache.org/jira/browse/FLINK-33977";>FLINK-33977</a>.</td>
+        </tr>
         <tr>
             
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
             <td style="word-wrap: break-word;">10 s</td>
diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html 
b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index 86682f4fbfd..44495201f8a 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -92,6 +92,12 @@
             <td>Integer</td>
             <td>Configure the minimum increase in parallelism for a job to 
scale up.</td>
         </tr>
+        <tr>
+            
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>This parameter defines whether the adaptive scheduler 
prioritizes using the minimum number of <code 
class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note, 
this parameter is suitable if <code 
class="highlighter-rouge">execution.state-recovery.from-local</code> is not 
enabled. More details about this configuration are available at <a 
href="https://issues.apache.org/jira/browse/FLINK-33977";>FLINK-33977</a>.</td>
+        </tr>
         <tr>
             
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
             <td style="word-wrap: break-word;">10 s</td>
diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html 
b/docs/layouts/shortcodes/generated/job_manager_configuration.html
index c01601a031a..c8bd7f381f8 100644
--- a/docs/layouts/shortcodes/generated/job_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html
@@ -14,6 +14,12 @@
             <td>Integer</td>
             <td>Configure the minimum increase in parallelism for a job to 
scale up.</td>
         </tr>
+        <tr>
+            
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>This parameter defines whether the adaptive scheduler 
prioritizes using the minimum number of <code 
class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note, 
this parameter is suitable if <code 
class="highlighter-rouge">execution.state-recovery.from-local</code> is not 
enabled. More details about this configuration are available at <a 
href="https://issues.apache.org/jira/browse/FLINK-33977";>FLINK-33977</a>.</td>
+        </tr>
         <tr>
             
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
             <td style="word-wrap: break-word;">10 s</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 77bfce2a9f8..ed179cc12aa 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.configuration;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.Documentation;
@@ -30,6 +31,7 @@ import static 
org.apache.flink.configuration.ConfigOptions.key;
 import static 
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.ALL_PRODUCERS_FINISHED;
 import static 
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS;
 import static 
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.UNFINISHED_PRODUCERS;
+import static 
org.apache.flink.configuration.StateRecoveryOptions.LOCAL_RECOVERY;
 import static org.apache.flink.configuration.description.LinkElement.link;
 import static org.apache.flink.configuration.description.TextElement.code;
 import static org.apache.flink.configuration.description.TextElement.text;
@@ -677,6 +679,31 @@ public class JobManagerOptions {
                                             
code(SchedulerType.AdaptiveBatch.name()))
                                     .build());
 
+    @Experimental
+    @Documentation.Section({
+        Documentation.Sections.EXPERT_SCHEDULING,
+        Documentation.Sections.ALL_JOB_MANAGER
+    })
+    public static final ConfigOption<Boolean> 
SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED =
+            key("jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "This parameter defines whether 
the adaptive scheduler prioritizes "
+                                                    + "using the minimum 
number of %s when scheduling tasks.",
+                                            code("TaskManagers"))
+                                    .linebreak()
+                                    .text(
+                                            "Note, this parameter is suitable 
if %s is not enabled. "
+                                                    + "More details about this 
configuration are available at %s.",
+                                            code(LOCAL_RECOVERY.key()),
+                                            link(
+                                                    
"https://issues.apache.org/jira/browse/FLINK-33977";,
+                                                    "FLINK-33977"))
+                                    .build());
+
     /** @deprecated Use {@link BatchExecutionOptions#SPECULATIVE_ENABLED}. */
     @Deprecated
     @Documentation.ExcludeFromDocumentation("Hidden for deprecated")
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
index d707700149a..3dd0c0a014b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.core.failure.FailureEnricher;
@@ -48,6 +49,8 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
@@ -106,7 +109,10 @@ public class AdaptiveSchedulerFactory implements 
SchedulerNGFactory {
         final SlotSharingSlotAllocator slotAllocator =
                 createSlotSharingSlotAllocator(
                         declarativeSlotPool,
-                        
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY));
+                        
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY),
+                        jobMasterConfiguration.get(DeploymentOptions.TARGET),
+                        jobMasterConfiguration.get(
+                                
JobManagerOptions.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED));
 
         final ExecutionGraphFactory executionGraphFactory =
                 new DefaultExecutionGraphFactory(
@@ -148,11 +154,16 @@ public class AdaptiveSchedulerFactory implements 
SchedulerNGFactory {
     }
 
     public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
-            DeclarativeSlotPool declarativeSlotPool, boolean 
localRecoveryEnabled) {
+            DeclarativeSlotPool declarativeSlotPool,
+            boolean localRecoveryEnabled,
+            @Nullable String executionTarget,
+            boolean minimalTaskManagerPreferred) {
         return SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
                 declarativeSlotPool::reserveFreeSlot,
                 declarativeSlotPool::freeReservedSlot,
                 declarativeSlotPool::containsFreeSlot,
-                localRecoveryEnabled);
+                localRecoveryEnabled,
+                executionTarget,
+                minimalTaskManagerPreferred);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
similarity index 55%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
index 0a8813b33a6..d516aae2596 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
@@ -18,46 +18,53 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
-import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
-import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. */
-public class DefaultSlotAssigner implements SlotAssigner {
+import static org.apache.flink.util.Preconditions.checkState;
 
-    @Override
-    public Collection<SlotAssignment> assignSlots(
-            JobInformation jobInformation,
-            Collection<? extends SlotInfo> freeSlots,
-            VertexParallelism vertexParallelism,
-            JobAllocationsInformation previousAllocations) {
-        List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
-        for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
-            
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
-        }
+/** The allocator util class. */
+class AllocatorUtil {
 
-        Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
-        Collection<SlotAssignment> assignments = new ArrayList<>();
-        for (ExecutionSlotSharingGroup group : allGroups) {
-            assignments.add(new SlotAssignment(iterator.next(), group));
-        }
-        return assignments;
+    private AllocatorUtil() {}
+
+    static Map<SlotSharingGroupId, 
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
+            getSlotSharingGroupMetaInfos(JobInformation jobInformation) {
+        return 
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
+    }
+
+    static int getMinimumRequiredSlots(
+            Map<SlotSharingGroupId, 
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
+                    slotSharingGroupMetaInfos) {
+        return slotSharingGroupMetaInfos.values().stream()
+                
.map(SlotSharingSlotAllocator.SlotSharingGroupMetaInfo::getMaxLowerBound)
+                .reduce(0, Integer::sum);
+    }
+
+    static void checkMinimumRequiredSlots(
+            JobInformation jobInformation, Collection<? extends SlotInfo> 
freeSlots) {
+        final int minimumRequiredSlots =
+                
getMinimumRequiredSlots(getSlotSharingGroupMetaInfos(jobInformation));
+        checkState(
+                freeSlots.size() >= minimumRequiredSlots,
+                "Not enough slots to allocate all the execution slot sharing 
groups (have: %s, need: %s)",
+                freeSlots.size(),
+                minimumRequiredSlots);
     }
 
-    static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
-            VertexParallelism vertexParallelism, SlotSharingGroup 
slotSharingGroup) {
+    static List<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
+            createExecutionSlotSharingGroups(
+                    VertexParallelism vertexParallelism, SlotSharingGroup 
slotSharingGroup) {
         final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
         slotSharingGroup
                 .getJobVertexIds()
@@ -71,7 +78,7 @@ public class DefaultSlotAssigner implements SlotAssigner {
                             }
                         });
         return sharedSlotToVertexAssignment.values().stream()
-                .map(ExecutionSlotSharingGroup::new)
+                .map(SlotSharingSlotAllocator.ExecutionSlotSharingGroup::new)
                 .collect(Collectors.toList());
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
index 0a8813b33a6..2dc0f6dac56 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
@@ -18,37 +18,64 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. */
+import static java.util.function.Function.identity;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups;
+
+/**
+ * Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. Specifically,
+ * when the cluster is deployed in application mode and the {@link
+ * 
org.apache.flink.configuration.JobManagerOptions#SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED}
+ * is enabled, execution slot sharing groups are preferentially assigned to 
the minimal number of
+ * task managers.
+ */
 public class DefaultSlotAssigner implements SlotAssigner {
 
+    @VisibleForTesting static final String APPLICATION_MODE_EXECUTION_TARGET = 
"embedded";
+
+    private final @Nullable String executionTarget;
+    private final boolean minimalTaskManagerPreferred;
+
+    DefaultSlotAssigner(@Nullable String executionTarget, boolean 
minimalTaskManagerPreferred) {
+        this.executionTarget = executionTarget;
+        this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
+    }
+
     @Override
     public Collection<SlotAssignment> assignSlots(
             JobInformation jobInformation,
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations) {
-        List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
+        checkMinimumRequiredSlots(jobInformation, freeSlots);
+
+        final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
         for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
             
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
         }
 
-        Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+        final Collection<? extends SlotInfo> pickedSlots =
+                pickSlotsIfNeeded(allGroups.size(), freeSlots);
+
+        Iterator<? extends SlotInfo> iterator = pickedSlots.iterator();
         Collection<SlotAssignment> assignments = new ArrayList<>();
         for (ExecutionSlotSharingGroup group : allGroups) {
             assignments.add(new SlotAssignment(iterator.next(), group));
@@ -56,22 +83,67 @@ public class DefaultSlotAssigner implements SlotAssigner {
         return assignments;
     }
 
-    static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
-            VertexParallelism vertexParallelism, SlotSharingGroup 
slotSharingGroup) {
-        final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
-        slotSharingGroup
-                .getJobVertexIds()
-                .forEach(
-                        jobVertexId -> {
-                            int parallelism = 
vertexParallelism.getParallelism(jobVertexId);
-                            for (int subtaskIdx = 0; subtaskIdx < parallelism; 
subtaskIdx++) {
-                                sharedSlotToVertexAssignment
-                                        .computeIfAbsent(subtaskIdx, ignored 
-> new HashSet<>())
-                                        .add(new 
ExecutionVertexID(jobVertexId, subtaskIdx));
-                            }
-                        });
-        return sharedSlotToVertexAssignment.values().stream()
-                .map(ExecutionSlotSharingGroup::new)
-                .collect(Collectors.toList());
+    @VisibleForTesting
+    Collection<? extends SlotInfo> pickSlotsIfNeeded(
+            int requestExecutionSlotSharingGroups, Collection<? extends 
SlotInfo> freeSlots) {
+        Collection<? extends SlotInfo> pickedSlots = freeSlots;
+        if (APPLICATION_MODE_EXECUTION_TARGET.equalsIgnoreCase(executionTarget)
+                && minimalTaskManagerPreferred
+                // To avoid the sort-work loading.
+                && freeSlots.size() > requestExecutionSlotSharingGroups) {
+            final Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsPerTaskExecutor =
+                    getSlotsPerTaskExecutor(freeSlots);
+            pickedSlots =
+                    pickSlotsInMinimalTaskExecutors(
+                            slotsPerTaskExecutor, 
requestExecutionSlotSharingGroups);
+        }
+        return pickedSlots;
+    }
+
+    /**
+     * In order to minimize the using of task executors at the resource 
manager side in the
+     * application-mode and release more task executors in a timely manner, it 
is a good choice to
+     * prioritize selecting slots on task executors with the most available 
slots.
+     *
+     * @param slotsPerTaskExecutor The slots per task manager.
+     * @return The ordered task manager that orders by the number of free 
slots descending.
+     */
+    private Iterator<TaskManagerLocation> getSortedTaskExecutors(
+            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsPerTaskExecutor) {
+        final Comparator<TaskManagerLocation> taskExecutorComparator =
+                (leftTml, rightTml) ->
+                        Integer.compare(
+                                slotsPerTaskExecutor.get(rightTml).size(),
+                                slotsPerTaskExecutor.get(leftTml).size());
+        return 
slotsPerTaskExecutor.keySet().stream().sorted(taskExecutorComparator).iterator();
+    }
+
+    /**
+     * Pick the target slots to assign with the requested groups.
+     *
+     * @param slotsByTaskExecutor slots per task executor.
+     * @param requestedGroups the number of the request execution slot sharing 
groups.
+     * @return the target slots that are distributed on the minimal task 
executors.
+     */
+    private Collection<? extends SlotInfo> pickSlotsInMinimalTaskExecutors(
+            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsByTaskExecutor,
+            int requestedGroups) {
+        final List<SlotInfo> pickedSlots = new ArrayList<>();
+        final Iterator<TaskManagerLocation> sortedTaskExecutors =
+                getSortedTaskExecutors(slotsByTaskExecutor);
+        while (pickedSlots.size() < requestedGroups) {
+            Set<? extends SlotInfo> slotInfos = 
slotsByTaskExecutor.get(sortedTaskExecutors.next());
+            pickedSlots.addAll(slotInfos);
+        }
+        return pickedSlots;
+    }
+
+    private Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
getSlotsPerTaskExecutor(
+            Collection<? extends SlotInfo> slots) {
+        return slots.stream()
+                .collect(
+                        Collectors.groupingBy(
+                                SlotInfo::getTaskManagerLocation,
+                                Collectors.mapping(identity(), 
Collectors.toSet())));
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 4b002ced5b0..425159cfe0f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.util.ResourceCounter;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -46,6 +47,9 @@ import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.getMinimumRequiredSlots;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.getSlotSharingGroupMetaInfos;
+
 /** {@link SlotAllocator} implementation that supports slot sharing. */
 public class SlotSharingSlotAllocator implements SlotAllocator {
 
@@ -53,28 +57,38 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
     private final FreeSlotFunction freeSlotFunction;
     private final IsSlotAvailableAndFreeFunction 
isSlotAvailableAndFreeFunction;
     private final boolean localRecoveryEnabled;
+    private final @Nullable String executionTarget;
+    private final boolean minimalTaskManagerPreferred;
 
     private SlotSharingSlotAllocator(
             ReserveSlotFunction reserveSlot,
             FreeSlotFunction freeSlotFunction,
             IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
-            boolean localRecoveryEnabled) {
+            boolean localRecoveryEnabled,
+            @Nullable String executionTarget,
+            boolean minimalTaskManagerPreferred) {
         this.reserveSlotFunction = reserveSlot;
         this.freeSlotFunction = freeSlotFunction;
         this.isSlotAvailableAndFreeFunction = isSlotAvailableAndFreeFunction;
         this.localRecoveryEnabled = localRecoveryEnabled;
+        this.executionTarget = executionTarget;
+        this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
     }
 
     public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
             ReserveSlotFunction reserveSlot,
             FreeSlotFunction freeSlotFunction,
             IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
-            boolean localRecoveryEnabled) {
+            boolean localRecoveryEnabled,
+            @Nullable String executionTarget,
+            boolean minimalTaskManagerPreferred) {
         return new SlotSharingSlotAllocator(
                 reserveSlot,
                 freeSlotFunction,
                 isSlotAvailableAndFreeFunction,
-                localRecoveryEnabled);
+                localRecoveryEnabled,
+                executionTarget,
+                minimalTaskManagerPreferred);
     }
 
     @Override
@@ -93,12 +107,9 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
             JobInformation jobInformation, Collection<? extends SlotInfo> 
freeSlots) {
 
         final Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> 
slotSharingGroupMetaInfo =
-                SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
+                getSlotSharingGroupMetaInfos(jobInformation);
 
-        final int minimumRequiredSlots =
-                slotSharingGroupMetaInfo.values().stream()
-                        .map(SlotSharingGroupMetaInfo::getMaxLowerBound)
-                        .reduce(0, Integer::sum);
+        final int minimumRequiredSlots = 
getMinimumRequiredSlots(slotSharingGroupMetaInfo);
 
         if (minimumRequiredSlots > freeSlots.size()) {
             return Optional.empty();
@@ -140,7 +151,8 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
                             SlotAssigner slotAssigner =
                                     localRecoveryEnabled && 
!jobAllocationsInformation.isEmpty()
                                             ? new StateLocalitySlotAssigner()
-                                            : new DefaultSlotAssigner();
+                                            : new DefaultSlotAssigner(
+                                                    executionTarget, 
minimalTaskManagerPreferred);
                             return new JobSchedulingPlan(
                                     parallelism,
                                     slotAssigner.assignSlots(
@@ -306,7 +318,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
         }
     }
 
-    private static class SlotSharingGroupMetaInfo {
+    static class SlotSharingGroupMetaInfo {
 
         private final int minLowerBound;
         private final int maxLowerBound;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
index 4e2a807c062..dcad351ed32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
@@ -42,7 +42,8 @@ import java.util.stream.Collectors;
 
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toMap;
-import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
@@ -94,11 +95,7 @@ public class StateLocalitySlotAssigner implements 
SlotAssigner {
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations) {
-        checkState(
-                freeSlots.size() >= 
jobInformation.getSlotSharingGroups().size(),
-                "Not enough slots to allocate all the slot sharing groups 
(have: %s, need: %s)",
-                freeSlots.size(),
-                jobInformation.getSlotSharingGroups().size());
+        checkMinimumRequiredSlots(jobInformation, freeSlots);
 
         final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
         for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index e5e844aab9a..01de94f6d57 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -19,6 +19,8 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobWriter;
@@ -230,7 +232,11 @@ public class AdaptiveSchedulerBuilder {
                 slotAllocator == null
                         ? 
AdaptiveSchedulerFactory.createSlotSharingSlotAllocator(
                                 declarativeSlotPool,
-                                
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY))
+                                
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY),
+                                
jobMasterConfiguration.get(DeploymentOptions.TARGET),
+                                jobMasterConfiguration.get(
+                                        JobManagerOptions
+                                                
.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED))
                         : slotAllocator,
                 executorService,
                 userCodeLoader,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
new file mode 100644
index 00000000000..36e201a0499
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.APPLICATION_MODE_EXECUTION_TARGET;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultSlotAssigner}. */
+@ExtendWith(ParameterizedTestExtension.class)
+class DefaultSlotAssignerTest {
+
+    private static final TaskManagerLocation tml1 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml1 = new TestSlotInfo(tml1);
+    private static final SlotInfo slot2OfTml1 = new TestSlotInfo(tml1);
+    private static final SlotInfo slot3OfTml1 = new TestSlotInfo(tml1);
+
+    private static final TaskManagerLocation tml2 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml2 = new TestSlotInfo(tml2);
+    private static final SlotInfo slot2OfTml2 = new TestSlotInfo(tml2);
+    private static final SlotInfo slot3OfTml2 = new TestSlotInfo(tml2);
+
+    private static final TaskManagerLocation tml3 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml3 = new TestSlotInfo(tml3);
+    private static final SlotInfo slot2OfTml3 = new TestSlotInfo(tml3);
+
+    private static final List<SlotInfo> allSlots =
+            Arrays.asList(
+                    slot1OfTml1,
+                    slot2OfTml1,
+                    slot3OfTml1,
+                    slot1OfTml2,
+                    slot2OfTml2,
+                    slot3OfTml2,
+                    slot1OfTml3,
+                    slot2OfTml3);
+
+    @Parameter int parallelism;
+
+    @Parameter(value = 1)
+    Collection<? extends SlotInfo> freeSlots;
+
+    @Parameter(value = 2)
+    List<TaskManagerLocation> expectedTaskManagerLocations;
+
+    @TestTemplate
+    void testPickSlotsIfNeeded() {
+        final DefaultSlotAssigner slotAssigner =
+                new DefaultSlotAssigner(APPLICATION_MODE_EXECUTION_TARGET, 
true);
+        final Set<TaskManagerLocation> keptTaskExecutors =
+                slotAssigner.pickSlotsIfNeeded(parallelism, freeSlots).stream()
+                        .map(SlotInfo::getTaskManagerLocation)
+                        .collect(Collectors.toSet());
+        assertThat(expectedTaskManagerLocations)
+                .containsExactlyInAnyOrderElementsOf(keptTaskExecutors);
+    }
+
+    @Parameters(name = "parallelism={0}, freeSlots={1}, 
expectedTaskManagerLocations={2}")
+    private static Collection<Object[]> getTestingParameters() {
+        return Arrays.asList(
+                new Object[] {
+                    4,
+                    Arrays.asList(slot1OfTml1, slot2OfTml1, slot1OfTml2, 
slot2OfTml3),
+                    Arrays.asList(tml1, tml2, tml3)
+                },
+                new Object[] {
+                    2,
+                    Arrays.asList(slot1OfTml1, slot2OfTml1, slot1OfTml2, 
slot2OfTml3),
+                    singletonList(tml1)
+                },
+                new Object[] {
+                    3,
+                    Arrays.asList(slot1OfTml1, slot1OfTml2, slot2OfTml2, 
slot3OfTml2),
+                    Arrays.asList(tml2)
+                },
+                new Object[] {7, allSlots, Arrays.asList(tml1, tml2, tml3)});
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
index 5311571f7a6..6121948e353 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
@@ -62,6 +62,8 @@ class SlotSharingSlotAllocatorTest {
     private static final IsSlotAvailableAndFreeFunction 
TEST_IS_SLOT_FREE_FUNCTION =
             ignored -> true;
     private static final boolean DISABLE_LOCAL_RECOVERY = false;
+    private static final String NULL_EXECUTION_TARGET = null;
+    private static final boolean MINIMAL_TASK_MANAGER_PREFERRED_DISABLED = 
true;
 
     private static final SlotSharingGroup slotSharingGroup1 = new 
SlotSharingGroup();
     private static final SlotSharingGroup slotSharingGroup2 = new 
SlotSharingGroup();
@@ -79,7 +81,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
 
         final ResourceCounter resourceCounter =
                 slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1, 
vertex2, vertex3));
@@ -98,7 +102,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -118,7 +124,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -141,7 +149,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
         final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
         final JobInformation.VertexInformation vertex11 =
                 new TestVertexInformation(new JobVertexID(), 4, 
slotSharingGroup1);
@@ -175,7 +185,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -193,7 +205,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
         final JobInformation.VertexInformation vertex1 =
                 new TestVertexInformation(new JobVertexID(), 1, 8, new 
SlotSharingGroup());
         final JobInformation.VertexInformation vertex2 =
@@ -222,7 +236,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
         final JobInformation.VertexInformation vertex1 =
                 new TestVertexInformation(new JobVertexID(), 4, 4, new 
SlotSharingGroup());
         final JobInformation.VertexInformation vertex2 =
@@ -244,7 +260,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
         SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
         final JobInformation.VertexInformation vertex1 =
                 new TestVertexInformation(new JobVertexID(), 2, 2, 
slotSharingGroup);
@@ -266,7 +284,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
         final JobInformation.VertexInformation vertex1 =
                 new TestVertexInformation(new JobVertexID(), 4, 10, new 
SlotSharingGroup());
         final JobInformation.VertexInformation vertex2 =
@@ -297,7 +317,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
         final JobInformation.VertexInformation vertex1 =
                 new TestVertexInformation(new JobVertexID(), 4, 4, new 
SlotSharingGroup());
         final JobInformation.VertexInformation vertex2 =
@@ -328,7 +350,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -372,7 +396,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         ignored -> false,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -429,7 +455,9 @@ class SlotSharingSlotAllocatorTest {
                                         TestingPhysicalSlot.builder().build(),
                                 (allocationID, cause, ts) -> {},
                                 id -> false,
-                                true)
+                                true,
+                                NULL_EXECUTION_TARGET,
+                                MINIMAL_TASK_MANAGER_PREFERRED_DISABLED)
                         .determineParallelismAndCalculateAssignment(
                                 new TestJobInformation(Arrays.asList(vertex1, 
vertex2, vertex3)),
                                 freeSlots,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
index 9f3573a0705..fb9161df25e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
@@ -28,6 +28,7 @@ public class TestSlotInfo implements SlotInfo {
 
     private final AllocationID allocationId;
     private final ResourceProfile resourceProfile;
+    private final TaskManagerLocation taskManagerLocation;
 
     public TestSlotInfo() {
         this(new AllocationID(), ResourceProfile.ANY);
@@ -41,9 +42,21 @@ public class TestSlotInfo implements SlotInfo {
         this(new AllocationID(), resourceProfile);
     }
 
+    public TestSlotInfo(TaskManagerLocation tml) {
+        this(new AllocationID(), ResourceProfile.ANY, tml);
+    }
+
     public TestSlotInfo(AllocationID allocationId, ResourceProfile 
resourceProfile) {
+        this(allocationId, resourceProfile, new LocalTaskManagerLocation());
+    }
+
+    private TestSlotInfo(
+            AllocationID allocationId,
+            ResourceProfile resourceProfile,
+            TaskManagerLocation taskManagerLocation) {
         this.allocationId = allocationId;
         this.resourceProfile = resourceProfile;
+        this.taskManagerLocation = taskManagerLocation;
     }
 
     @Override
@@ -53,7 +66,7 @@ public class TestSlotInfo implements SlotInfo {
 
     @Override
     public TaskManagerLocation getTaskManagerLocation() {
-        return new LocalTaskManagerLocation();
+        return taskManagerLocation;
     }
 
     @Override

Reply via email to