This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.0 by this push:
     new c691899859b [FLINK-33977][runtime] Support minimize TM number during 
downscaling in adaptive scheduler
c691899859b is described below

commit c691899859b8caacf63251ae4de89012ade3836d
Author: Yuepeng Pan <[email protected]>
AuthorDate: Thu May 15 23:27:58 2025 +0800

    [FLINK-33977][runtime] Support minimize TM number during downscaling in 
adaptive scheduler
    
    Also enable this strategy by default via the introduced config option
---
 .../generated/all_jobmanager_section.html          |   6 ++
 .../generated/expert_scheduling_section.html       |   6 ++
 .../generated/job_manager_configuration.html       |   6 ++
 .../flink/configuration/JobManagerOptions.java     |  27 +++++
 .../adaptive/AdaptiveSchedulerFactory.java         |  17 ++-
 ...DefaultSlotAssigner.java => AllocatorUtil.java} |  57 +++++-----
 .../adaptive/allocator/DefaultSlotAssigner.java    | 118 +++++++++++++++++----
 .../allocator/SlotSharingSlotAllocator.java        |  32 ++++--
 .../allocator/StateLocalitySlotAssigner.java       |   9 +-
 .../adaptive/AdaptiveSchedulerBuilder.java         |   8 +-
 .../allocator/DefaultSlotAssignerTest.java         | 109 +++++++++++++++++++
 .../allocator/SlotSharingSlotAllocatorTest.java    |  54 +++++++---
 .../scheduler/adaptive/allocator/TestingSlot.java  |  15 ++-
 13 files changed, 382 insertions(+), 82 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html 
b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
index d1df33a72f3..33a16edd648 100644
--- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
@@ -20,6 +20,12 @@
             <td>Duration</td>
             <td>Defines the duration the JobManager delays the scaling 
operation after a resource change if only sufficient resources are available. 
The scaling operation is performed immediately if the resources have changed 
and the desired resources are available. The timeout begins as soon as either 
the available resources or the job's resource requirements are changed.<br 
/>The resource requirements of a running job can be changed using the <a 
href="{{.Site.BaseURL}}{{.Site.LanguageP [...]
         </tr>
+        <tr>
+            
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>This parameter defines whether the adaptive scheduler 
prioritizes using the minimum number of <code 
class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note, 
this parameter is suitable if <code 
class="highlighter-rouge">execution.state-recovery.from-local</code> is not 
enabled. More details about this configuration are available at <a 
href="https://issues.apache.org/jira/browse/FLINK-33977";>FLINK-33977</a>.</td>
+        </tr>
         <tr>
             
<td><h5>jobmanager.adaptive-scheduler.rescale-trigger.max-checkpoint-failures</h5></td>
             <td style="word-wrap: break-word;">2</td>
diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html 
b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index ae2e5804e42..43f66529184 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -98,6 +98,12 @@
             <td>Duration</td>
             <td>Defines the duration the JobManager delays the scaling 
operation after a resource change if only sufficient resources are available. 
The scaling operation is performed immediately if the resources have changed 
and the desired resources are available. The timeout begins as soon as either 
the available resources or the job's resource requirements are changed.<br 
/>The resource requirements of a running job can be changed using the <a 
href="{{.Site.BaseURL}}{{.Site.LanguageP [...]
         </tr>
+        <tr>
+            
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>This parameter defines whether the adaptive scheduler 
prioritizes using the minimum number of <code 
class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note, 
this parameter is suitable if <code 
class="highlighter-rouge">execution.state-recovery.from-local</code> is not 
enabled. More details about this configuration are available at <a 
href="https://issues.apache.org/jira/browse/FLINK-33977";>FLINK-33977</a>.</td>
+        </tr>
         <tr>
             
<td><h5>jobmanager.adaptive-scheduler.rescale-trigger.max-checkpoint-failures</h5></td>
             <td style="word-wrap: break-word;">2</td>
diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html 
b/docs/layouts/shortcodes/generated/job_manager_configuration.html
index e423e49ab2f..c7669226f17 100644
--- a/docs/layouts/shortcodes/generated/job_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html
@@ -20,6 +20,12 @@
             <td>Duration</td>
             <td>Defines the duration the JobManager delays the scaling 
operation after a resource change if only sufficient resources are available. 
The scaling operation is performed immediately if the resources have changed 
and the desired resources are available. The timeout begins as soon as either 
the available resources or the job's resource requirements are changed.<br 
/>The resource requirements of a running job can be changed using the <a 
href="{{.Site.BaseURL}}{{.Site.LanguageP [...]
         </tr>
+        <tr>
+            
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>This parameter defines whether the adaptive scheduler 
prioritizes using the minimum number of <code 
class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note, 
this parameter is suitable if <code 
class="highlighter-rouge">execution.state-recovery.from-local</code> is not 
enabled. More details about this configuration are available at <a 
href="https://issues.apache.org/jira/browse/FLINK-33977";>FLINK-33977</a>.</td>
+        </tr>
         <tr>
             
<td><h5>jobmanager.adaptive-scheduler.rescale-trigger.max-checkpoint-failures</h5></td>
             <td style="word-wrap: break-word;">2</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 13245fb87ea..85f048edd32 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.configuration;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.Documentation;
@@ -30,6 +31,7 @@ import static 
org.apache.flink.configuration.ConfigOptions.key;
 import static 
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.ALL_PRODUCERS_FINISHED;
 import static 
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS;
 import static 
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.UNFINISHED_PRODUCERS;
+import static 
org.apache.flink.configuration.StateRecoveryOptions.LOCAL_RECOVERY;
 import static org.apache.flink.configuration.description.LinkElement.link;
 import static org.apache.flink.configuration.description.TextElement.code;
 import static org.apache.flink.configuration.description.TextElement.text;
@@ -708,6 +710,31 @@ public class JobManagerOptions {
                                             
code(SchedulerExecutionMode.REACTIVE.name()))
                                     .build());
 
+    @Experimental
+    @Documentation.Section({
+        Documentation.Sections.EXPERT_SCHEDULING,
+        Documentation.Sections.ALL_JOB_MANAGER
+    })
+    public static final ConfigOption<Boolean> 
SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED =
+            key("jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "This parameter defines whether 
the adaptive scheduler prioritizes "
+                                                    + "using the minimum 
number of %s when scheduling tasks.",
+                                            code("TaskManagers"))
+                                    .linebreak()
+                                    .text(
+                                            "Note, this parameter is suitable 
if %s is not enabled. "
+                                                    + "More details about this 
configuration are available at %s.",
+                                            code(LOCAL_RECOVERY.key()),
+                                            link(
+                                                    
"https://issues.apache.org/jira/browse/FLINK-33977";,
+                                                    "FLINK-33977"))
+                                    .build());
+
     /**
      * Config parameter controlling whether partitions should already be 
released during the job
      * execution.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
index 38261d4d8df..9f182bf3587 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.core.failure.FailureEnricher;
@@ -50,6 +51,8 @@ import org.apache.flink.util.FlinkException;
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.util.Collection;
 import java.util.concurrent.Executor;
@@ -117,7 +120,10 @@ public class AdaptiveSchedulerFactory implements 
SchedulerNGFactory {
         final SlotSharingSlotAllocator slotAllocator =
                 createSlotSharingSlotAllocator(
                         declarativeSlotPool,
-                        
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY));
+                        
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY),
+                        jobMasterConfiguration.get(DeploymentOptions.TARGET),
+                        jobMasterConfiguration.get(
+                                
JobManagerOptions.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED));
 
         final ExecutionGraphFactory executionGraphFactory =
                 new DefaultExecutionGraphFactory(
@@ -160,11 +166,16 @@ public class AdaptiveSchedulerFactory implements 
SchedulerNGFactory {
     }
 
     public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
-            DeclarativeSlotPool declarativeSlotPool, boolean 
localRecoveryEnabled) {
+            DeclarativeSlotPool declarativeSlotPool,
+            boolean localRecoveryEnabled,
+            @Nullable String executionTarget,
+            boolean minimalTaskManagerPreferred) {
         return SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
                 declarativeSlotPool::reserveFreeSlot,
                 declarativeSlotPool::freeReservedSlot,
                 declarativeSlotPool::containsFreeSlot,
-                localRecoveryEnabled);
+                localRecoveryEnabled,
+                executionTarget,
+                minimalTaskManagerPreferred);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
similarity index 55%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
index 0a8813b33a6..d516aae2596 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
@@ -18,46 +18,53 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
-import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
-import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. */
-public class DefaultSlotAssigner implements SlotAssigner {
+import static org.apache.flink.util.Preconditions.checkState;
 
-    @Override
-    public Collection<SlotAssignment> assignSlots(
-            JobInformation jobInformation,
-            Collection<? extends SlotInfo> freeSlots,
-            VertexParallelism vertexParallelism,
-            JobAllocationsInformation previousAllocations) {
-        List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
-        for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
-            
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
-        }
+/** The allocator util class. */
+class AllocatorUtil {
 
-        Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
-        Collection<SlotAssignment> assignments = new ArrayList<>();
-        for (ExecutionSlotSharingGroup group : allGroups) {
-            assignments.add(new SlotAssignment(iterator.next(), group));
-        }
-        return assignments;
+    private AllocatorUtil() {}
+
+    static Map<SlotSharingGroupId, 
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
+            getSlotSharingGroupMetaInfos(JobInformation jobInformation) {
+        return 
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
+    }
+
+    static int getMinimumRequiredSlots(
+            Map<SlotSharingGroupId, 
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
+                    slotSharingGroupMetaInfos) {
+        return slotSharingGroupMetaInfos.values().stream()
+                
.map(SlotSharingSlotAllocator.SlotSharingGroupMetaInfo::getMaxLowerBound)
+                .reduce(0, Integer::sum);
+    }
+
+    static void checkMinimumRequiredSlots(
+            JobInformation jobInformation, Collection<? extends SlotInfo> 
freeSlots) {
+        final int minimumRequiredSlots =
+                
getMinimumRequiredSlots(getSlotSharingGroupMetaInfos(jobInformation));
+        checkState(
+                freeSlots.size() >= minimumRequiredSlots,
+                "Not enough slots to allocate all the execution slot sharing 
groups (have: %s, need: %s)",
+                freeSlots.size(),
+                minimumRequiredSlots);
     }
 
-    static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
-            VertexParallelism vertexParallelism, SlotSharingGroup 
slotSharingGroup) {
+    static List<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
+            createExecutionSlotSharingGroups(
+                    VertexParallelism vertexParallelism, SlotSharingGroup 
slotSharingGroup) {
         final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
         slotSharingGroup
                 .getJobVertexIds()
@@ -71,7 +78,7 @@ public class DefaultSlotAssigner implements SlotAssigner {
                             }
                         });
         return sharedSlotToVertexAssignment.values().stream()
-                .map(ExecutionSlotSharingGroup::new)
+                .map(SlotSharingSlotAllocator.ExecutionSlotSharingGroup::new)
                 .collect(Collectors.toList());
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
index 0a8813b33a6..2dc0f6dac56 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
@@ -18,37 +18,64 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. */
+import static java.util.function.Function.identity;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups;
+
+/**
+ * Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. Specifically,
+ * when the cluster is deployed in application mode and the {@link
+ * 
org.apache.flink.configuration.JobManagerOptions#SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED}
+ * is enabled, execution slot sharing groups are preferentially assigned to 
the minimal number of
+ * task managers.
+ */
 public class DefaultSlotAssigner implements SlotAssigner {
 
+    @VisibleForTesting static final String APPLICATION_MODE_EXECUTION_TARGET = 
"embedded";
+
+    private final @Nullable String executionTarget;
+    private final boolean minimalTaskManagerPreferred;
+
+    DefaultSlotAssigner(@Nullable String executionTarget, boolean 
minimalTaskManagerPreferred) {
+        this.executionTarget = executionTarget;
+        this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
+    }
+
     @Override
     public Collection<SlotAssignment> assignSlots(
             JobInformation jobInformation,
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations) {
-        List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
+        checkMinimumRequiredSlots(jobInformation, freeSlots);
+
+        final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
         for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
             
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
         }
 
-        Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+        final Collection<? extends SlotInfo> pickedSlots =
+                pickSlotsIfNeeded(allGroups.size(), freeSlots);
+
+        Iterator<? extends SlotInfo> iterator = pickedSlots.iterator();
         Collection<SlotAssignment> assignments = new ArrayList<>();
         for (ExecutionSlotSharingGroup group : allGroups) {
             assignments.add(new SlotAssignment(iterator.next(), group));
@@ -56,22 +83,67 @@ public class DefaultSlotAssigner implements SlotAssigner {
         return assignments;
     }
 
-    static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
-            VertexParallelism vertexParallelism, SlotSharingGroup 
slotSharingGroup) {
-        final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
-        slotSharingGroup
-                .getJobVertexIds()
-                .forEach(
-                        jobVertexId -> {
-                            int parallelism = 
vertexParallelism.getParallelism(jobVertexId);
-                            for (int subtaskIdx = 0; subtaskIdx < parallelism; 
subtaskIdx++) {
-                                sharedSlotToVertexAssignment
-                                        .computeIfAbsent(subtaskIdx, ignored 
-> new HashSet<>())
-                                        .add(new 
ExecutionVertexID(jobVertexId, subtaskIdx));
-                            }
-                        });
-        return sharedSlotToVertexAssignment.values().stream()
-                .map(ExecutionSlotSharingGroup::new)
-                .collect(Collectors.toList());
+    @VisibleForTesting
+    Collection<? extends SlotInfo> pickSlotsIfNeeded(
+            int requestExecutionSlotSharingGroups, Collection<? extends 
SlotInfo> freeSlots) {
+        Collection<? extends SlotInfo> pickedSlots = freeSlots;
+        if (APPLICATION_MODE_EXECUTION_TARGET.equalsIgnoreCase(executionTarget)
+                && minimalTaskManagerPreferred
+                // To avoid the sort-work loading.
+                && freeSlots.size() > requestExecutionSlotSharingGroups) {
+            final Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsPerTaskExecutor =
+                    getSlotsPerTaskExecutor(freeSlots);
+            pickedSlots =
+                    pickSlotsInMinimalTaskExecutors(
+                            slotsPerTaskExecutor, 
requestExecutionSlotSharingGroups);
+        }
+        return pickedSlots;
+    }
+
+    /**
+     * In order to minimize the using of task executors at the resource 
manager side in the
+     * application-mode and release more task executors in a timely manner, it 
is a good choice to
+     * prioritize selecting slots on task executors with the most available 
slots.
+     *
+     * @param slotsPerTaskExecutor The slots per task manager.
+     * @return The ordered task manager that orders by the number of free 
slots descending.
+     */
+    private Iterator<TaskManagerLocation> getSortedTaskExecutors(
+            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsPerTaskExecutor) {
+        final Comparator<TaskManagerLocation> taskExecutorComparator =
+                (leftTml, rightTml) ->
+                        Integer.compare(
+                                slotsPerTaskExecutor.get(rightTml).size(),
+                                slotsPerTaskExecutor.get(leftTml).size());
+        return 
slotsPerTaskExecutor.keySet().stream().sorted(taskExecutorComparator).iterator();
+    }
+
+    /**
+     * Pick the target slots to assign with the requested groups.
+     *
+     * @param slotsByTaskExecutor slots per task executor.
+     * @param requestedGroups the number of the request execution slot sharing 
groups.
+     * @return the target slots that are distributed on the minimal task 
executors.
+     */
+    private Collection<? extends SlotInfo> pickSlotsInMinimalTaskExecutors(
+            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsByTaskExecutor,
+            int requestedGroups) {
+        final List<SlotInfo> pickedSlots = new ArrayList<>();
+        final Iterator<TaskManagerLocation> sortedTaskExecutors =
+                getSortedTaskExecutors(slotsByTaskExecutor);
+        while (pickedSlots.size() < requestedGroups) {
+            Set<? extends SlotInfo> slotInfos = 
slotsByTaskExecutor.get(sortedTaskExecutors.next());
+            pickedSlots.addAll(slotInfos);
+        }
+        return pickedSlots;
+    }
+
+    private Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
getSlotsPerTaskExecutor(
+            Collection<? extends SlotInfo> slots) {
+        return slots.stream()
+                .collect(
+                        Collectors.groupingBy(
+                                SlotInfo::getTaskManagerLocation,
+                                Collectors.mapping(identity(), 
Collectors.toSet())));
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 4b002ced5b0..425159cfe0f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.util.ResourceCounter;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -46,6 +47,9 @@ import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.getMinimumRequiredSlots;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.getSlotSharingGroupMetaInfos;
+
 /** {@link SlotAllocator} implementation that supports slot sharing. */
 public class SlotSharingSlotAllocator implements SlotAllocator {
 
@@ -53,28 +57,38 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
     private final FreeSlotFunction freeSlotFunction;
     private final IsSlotAvailableAndFreeFunction 
isSlotAvailableAndFreeFunction;
     private final boolean localRecoveryEnabled;
+    private final @Nullable String executionTarget;
+    private final boolean minimalTaskManagerPreferred;
 
     private SlotSharingSlotAllocator(
             ReserveSlotFunction reserveSlot,
             FreeSlotFunction freeSlotFunction,
             IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
-            boolean localRecoveryEnabled) {
+            boolean localRecoveryEnabled,
+            @Nullable String executionTarget,
+            boolean minimalTaskManagerPreferred) {
         this.reserveSlotFunction = reserveSlot;
         this.freeSlotFunction = freeSlotFunction;
         this.isSlotAvailableAndFreeFunction = isSlotAvailableAndFreeFunction;
         this.localRecoveryEnabled = localRecoveryEnabled;
+        this.executionTarget = executionTarget;
+        this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
     }
 
     public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
             ReserveSlotFunction reserveSlot,
             FreeSlotFunction freeSlotFunction,
             IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
-            boolean localRecoveryEnabled) {
+            boolean localRecoveryEnabled,
+            @Nullable String executionTarget,
+            boolean minimalTaskManagerPreferred) {
         return new SlotSharingSlotAllocator(
                 reserveSlot,
                 freeSlotFunction,
                 isSlotAvailableAndFreeFunction,
-                localRecoveryEnabled);
+                localRecoveryEnabled,
+                executionTarget,
+                minimalTaskManagerPreferred);
     }
 
     @Override
@@ -93,12 +107,9 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
             JobInformation jobInformation, Collection<? extends SlotInfo> 
freeSlots) {
 
         final Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> 
slotSharingGroupMetaInfo =
-                SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
+                getSlotSharingGroupMetaInfos(jobInformation);
 
-        final int minimumRequiredSlots =
-                slotSharingGroupMetaInfo.values().stream()
-                        .map(SlotSharingGroupMetaInfo::getMaxLowerBound)
-                        .reduce(0, Integer::sum);
+        final int minimumRequiredSlots = 
getMinimumRequiredSlots(slotSharingGroupMetaInfo);
 
         if (minimumRequiredSlots > freeSlots.size()) {
             return Optional.empty();
@@ -140,7 +151,8 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
                             SlotAssigner slotAssigner =
                                     localRecoveryEnabled && 
!jobAllocationsInformation.isEmpty()
                                             ? new StateLocalitySlotAssigner()
-                                            : new DefaultSlotAssigner();
+                                            : new DefaultSlotAssigner(
+                                                    executionTarget, 
minimalTaskManagerPreferred);
                             return new JobSchedulingPlan(
                                     parallelism,
                                     slotAssigner.assignSlots(
@@ -306,7 +318,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
         }
     }
 
-    private static class SlotSharingGroupMetaInfo {
+    static class SlotSharingGroupMetaInfo {
 
         private final int minLowerBound;
         private final int maxLowerBound;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
index a9f119c5607..458b15c1041 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
@@ -43,7 +43,8 @@ import java.util.stream.Collectors;
 
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toMap;
-import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
@@ -95,11 +96,7 @@ public class StateLocalitySlotAssigner implements 
SlotAssigner {
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations) {
-        checkState(
-                freeSlots.size() >= 
jobInformation.getSlotSharingGroups().size(),
-                "Not enough slots to allocate all the slot sharing groups 
(have: %s, need: %s)",
-                freeSlots.size(),
-                jobInformation.getSlotSharingGroups().size());
+        checkMinimumRequiredSlots(jobInformation, freeSlots);
 
         final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
         for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index 5762528bb96..b6626e97ace 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -18,6 +18,8 @@
 package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobWriter;
@@ -272,7 +274,11 @@ public class AdaptiveSchedulerBuilder {
                 slotAllocator == null
                         ? 
AdaptiveSchedulerFactory.createSlotSharingSlotAllocator(
                                 declarativeSlotPool,
-                                
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY))
+                                
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY),
+                                
jobMasterConfiguration.get(DeploymentOptions.TARGET),
+                                jobMasterConfiguration.get(
+                                        JobManagerOptions
+                                                
.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED))
                         : slotAllocator,
                 executorService,
                 userCodeLoader,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
new file mode 100644
index 00000000000..60a1067efb0
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.APPLICATION_MODE_EXECUTION_TARGET;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultSlotAssigner}. */
+@ExtendWith(ParameterizedTestExtension.class)
+class DefaultSlotAssignerTest {
+
+    private static final TaskManagerLocation tml1 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml1 = new TestingSlot(tml1);
+    private static final SlotInfo slot2OfTml1 = new TestingSlot(tml1);
+    private static final SlotInfo slot3OfTml1 = new TestingSlot(tml1);
+
+    private static final TaskManagerLocation tml2 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml2 = new TestingSlot(tml2);
+    private static final SlotInfo slot2OfTml2 = new TestingSlot(tml2);
+    private static final SlotInfo slot3OfTml2 = new TestingSlot(tml2);
+
+    private static final TaskManagerLocation tml3 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml3 = new TestingSlot(tml3);
+    private static final SlotInfo slot2OfTml3 = new TestingSlot(tml3);
+
+    private static final List<SlotInfo> allSlots =
+            Arrays.asList(
+                    slot1OfTml1,
+                    slot2OfTml1,
+                    slot3OfTml1,
+                    slot1OfTml2,
+                    slot2OfTml2,
+                    slot3OfTml2,
+                    slot1OfTml3,
+                    slot2OfTml3);
+
+    @Parameter int parallelism;
+
+    @Parameter(value = 1)
+    Collection<? extends SlotInfo> freeSlots;
+
+    @Parameter(value = 2)
+    List<TaskManagerLocation> expectedTaskManagerLocations;
+
+    @TestTemplate
+    void testPickSlotsIfNeeded() {
+        final DefaultSlotAssigner slotAssigner =
+                new DefaultSlotAssigner(APPLICATION_MODE_EXECUTION_TARGET, 
true);
+        final Set<TaskManagerLocation> keptTaskExecutors =
+                slotAssigner.pickSlotsIfNeeded(parallelism, freeSlots).stream()
+                        .map(SlotInfo::getTaskManagerLocation)
+                        .collect(Collectors.toSet());
+        assertThat(expectedTaskManagerLocations)
+                .containsExactlyInAnyOrderElementsOf(keptTaskExecutors);
+    }
+
+    @Parameters(name = "parallelism={0}, freeSlots={1}, 
expectedTaskManagerLocations={2}")
+    private static Collection<Object[]> getTestingParameters() {
+        return Arrays.asList(
+                new Object[] {
+                    4,
+                    Arrays.asList(slot1OfTml1, slot2OfTml1, slot1OfTml2, 
slot2OfTml3),
+                    Arrays.asList(tml1, tml2, tml3)
+                },
+                new Object[] {
+                    2,
+                    Arrays.asList(slot1OfTml1, slot2OfTml1, slot1OfTml2, 
slot2OfTml3),
+                    singletonList(tml1)
+                },
+                new Object[] {
+                    3,
+                    Arrays.asList(slot1OfTml1, slot1OfTml2, slot2OfTml2, 
slot3OfTml2),
+                    Arrays.asList(tml2)
+                },
+                new Object[] {7, allSlots, Arrays.asList(tml1, tml2, tml3)});
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
index 9747193eac4..f6260f4fb7e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
@@ -62,6 +62,8 @@ class SlotSharingSlotAllocatorTest {
     private static final IsSlotAvailableAndFreeFunction 
TEST_IS_SLOT_FREE_FUNCTION =
             ignored -> true;
     private static final boolean DISABLE_LOCAL_RECOVERY = false;
+    private static final String NULL_EXECUTION_TARGET = null;
+    private static final boolean MINIMAL_TASK_MANAGER_PREFERRED_DISABLED = 
true;
 
     private static final SlotSharingGroup slotSharingGroup1 = new 
SlotSharingGroup();
     private static final SlotSharingGroup slotSharingGroup2 = new 
SlotSharingGroup();
@@ -79,7 +81,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
 
         final ResourceCounter resourceCounter =
                 slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1, 
vertex2, vertex3));
@@ -98,7 +102,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -118,7 +124,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -141,7 +149,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
         final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
         final JobInformation.VertexInformation vertex11 =
                 new TestVertexInformation(new JobVertexID(), 4, 
slotSharingGroup1);
@@ -175,7 +185,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -193,7 +205,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
         final JobInformation.VertexInformation vertex1 =
                 new TestVertexInformation(new JobVertexID(), 1, 8, new 
SlotSharingGroup());
         final JobInformation.VertexInformation vertex2 =
@@ -222,7 +236,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
         final JobInformation.VertexInformation vertex1 =
                 new TestVertexInformation(new JobVertexID(), 4, 4, new 
SlotSharingGroup());
         final JobInformation.VertexInformation vertex2 =
@@ -244,7 +260,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
         SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
         final JobInformation.VertexInformation vertex1 =
                 new TestVertexInformation(new JobVertexID(), 2, 2, 
slotSharingGroup);
@@ -266,7 +284,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
         final JobInformation.VertexInformation vertex1 =
                 new TestVertexInformation(new JobVertexID(), 4, 10, new 
SlotSharingGroup());
         final JobInformation.VertexInformation vertex2 =
@@ -297,7 +317,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
         final JobInformation.VertexInformation vertex1 =
                 new TestVertexInformation(new JobVertexID(), 4, 4, new 
SlotSharingGroup());
         final JobInformation.VertexInformation vertex2 =
@@ -328,7 +350,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -372,7 +396,9 @@ class SlotSharingSlotAllocatorTest {
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         ignored -> false,
-                        DISABLE_LOCAL_RECOVERY);
+                        DISABLE_LOCAL_RECOVERY,
+                        NULL_EXECUTION_TARGET,
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -438,7 +464,9 @@ class SlotSharingSlotAllocatorTest {
                                         TestingPhysicalSlot.builder().build(),
                                 (allocationID, cause, ts) -> {},
                                 id -> false,
-                                true)
+                                true,
+                                NULL_EXECUTION_TARGET,
+                                MINIMAL_TASK_MANAGER_PREFERRED_DISABLED)
                         .determineParallelismAndCalculateAssignment(
                                 new TestJobInformation(Arrays.asList(vertex1, 
vertex2, vertex3)),
                                 freeSlots,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
index 85133a83678..0976ac4881e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
@@ -29,6 +29,7 @@ public class TestingSlot implements PhysicalSlot {
 
     private final AllocationID allocationId;
     private final ResourceProfile resourceProfile;
+    private final TaskManagerLocation taskManagerLocation;
 
     public TestingSlot() {
         this(new AllocationID(), ResourceProfile.ANY);
@@ -42,9 +43,21 @@ public class TestingSlot implements PhysicalSlot {
         this(new AllocationID(), resourceProfile);
     }
 
+    public TestingSlot(TaskManagerLocation tml) {
+        this(new AllocationID(), ResourceProfile.ANY, tml);
+    }
+
     public TestingSlot(AllocationID allocationId, ResourceProfile 
resourceProfile) {
+        this(allocationId, resourceProfile, new LocalTaskManagerLocation());
+    }
+
+    private TestingSlot(
+            AllocationID allocationId,
+            ResourceProfile resourceProfile,
+            TaskManagerLocation taskManagerLocation) {
         this.allocationId = allocationId;
         this.resourceProfile = resourceProfile;
+        this.taskManagerLocation = taskManagerLocation;
     }
 
     @Override
@@ -54,7 +67,7 @@ public class TestingSlot implements PhysicalSlot {
 
     @Override
     public TaskManagerLocation getTaskManagerLocation() {
-        return new LocalTaskManagerLocation();
+        return taskManagerLocation;
     }
 
     @Override

Reply via email to