This is an automated email from the ASF dual-hosted git repository.
weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b6f0eb6517d [FLINK-38622][runtime] Enhance the requests and slots
balanced allocation logic in DefaultScheduler (#27189)
b6f0eb6517d is described below
commit b6f0eb6517d1fad186eb654e81aa358e564c6784
Author: Yuepeng Pan <[email protected]>
AuthorDate: Thu Nov 13 17:44:30 2025 +0800
[FLINK-38622][runtime] Enhance the requests and slots balanced allocation
logic in DefaultScheduler (#27189)
- Introduce ResourceRequestPreMappings to compute the resource matching
relationships when allocating all slots in bulk for balanced scheduling of
streaming jobs in the default scheduler.
- Introduce the test cases for ResourceRequestPreMappings.
- Adapt the calculation logic of the
TasksBalancedRequestSlotMatchingStrategy for bulk slot allocation using
ResourceRequestPreMappings, in order to prevent job scheduling timeouts caused
by untimely updates to the relationships between all requests and resources in
load-balancing scenarios
- Introduce TasksBalancedRequestSlotMatchingStrategyTest for enhancing the
TasksBalancedRequestSlotMatchingStrategy testing.
---
.../slotpool/ResourceRequestPreMappings.java | 412 +++++++++++++++++++++
.../TasksBalancedRequestSlotMatchingStrategy.java | 103 ++++--
...dAllocationRequestSlotMatchingStrategyTest.java | 2 +-
.../slotpool/ResourceRequestPreMappingsTest.java | 381 +++++++++++++++++++
...sksBalancedRequestSlotMatchingStrategyTest.java | 114 ++++++
5 files changed, 970 insertions(+), 42 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappings.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappings.java
new file mode 100644
index 00000000000..7aec0effe23
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappings.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This class is designed to handle the pre-matching of resource requests in
the context of balanced
+ * task scheduling for streaming jobs. During the batch allocation of
resources, where resource
+ * requests are allocated in a single, non-interleaved operation, it is
impossible to make immediate
+ * individual adjustments to unmatched resource requests. This may lead to
situations where not all
+ * resource requests can be successfully fulfilled. For example:
+ *
+ * <pre>
+ * resource requests:
+ * - resource request-1: ResourceProfile-1(UNKNOWN)
+ * - resource request-2: ResourceProfile-2(cpu=2 core, memory=2G)
+ *
+ * available slots:
+ * - slot-a: ResourceProfile-a(cpu=1 core, memory=1G)
+ * - slot-b: ResourceProfile-b(cpu=2 core, memory=2G)
+ * </pre>
+ *
+ * When the strategy {@link TasksBalancedRequestSlotMatchingStrategy} performs
resource allocation,
+ * the following matching mapping might occur, preventing all slot requests
from being successfully
+ * assigned in a consistent manner and thus hindering the scheduling of the
entire job:
+ *
+ * <pre>
+ * the unexpected mapping case:
+ * - resource request-1: ResourceProfile-1(UNKNOWN) was matched with slot-b:
ResourceProfile-b(cpu=2 core, memory=2G)
+ * - resource request-2: ResourceProfile-2(cpu=2 core, memory=2G) was not
matched
+ * </pre>
+ *
+ * Therefore, it is crucial to determine how ResourceProfiles should match
before the batch
+ * allocation of resource requests, aiming to assure the allocation
successfully at least. An ideal
+ * matching relationship would be:
+ *
+ * <pre>
+ * - ResourceProfile-1(UNKNOWN) -> ResourceProfile-a(cpu=1 core,
memory=1G)
+ * - ResourceProfile-2(cpu=2 core, memory=2G) -> ResourceProfile-b(cpu=2 core,
memory=2G)
+ * </pre>
+ *
+ * This is the motivation for introducing the current class.
+ */
+final class ResourceRequestPreMappings {
+
+ private final boolean matchingFulfilled;
+ // The variable to keep base mappings result related information, which
can assure that
+ // the allocation for all requests could be run successfully at least.
+ private final Map<ResourceProfile, Map<ResourceProfile, Integer>>
+ baseRequiredResourcePreMappings;
+ // The variable to keep the remaining available flexible resources besides
the
+ // baseRequiredResourcePreMappings.
+ private final Map<ResourceProfile, Integer> remainingFlexibleResources;
+
+ private ResourceRequestPreMappings(
+ boolean matchingFulfilled,
+ final Map<ResourceProfile, Map<ResourceProfile, Integer>>
+ baseRequiredResourcePreMappings,
+ final Map<ResourceProfile, Integer> remainingFlexibleResources) {
+ this.matchingFulfilled = matchingFulfilled;
+
+ this.baseRequiredResourcePreMappings =
+
CollectionUtil.newHashMapWithExpectedSize(baseRequiredResourcePreMappings.size());
+
this.baseRequiredResourcePreMappings.putAll(baseRequiredResourcePreMappings);
+
+ this.remainingFlexibleResources =
+
CollectionUtil.newHashMapWithExpectedSize(remainingFlexibleResources.size());
+ this.remainingFlexibleResources.putAll(remainingFlexibleResources);
+ }
+
+ static ResourceRequestPreMappings createFrom(
+ Collection<PendingRequest> pendingRequests, Collection<? extends
PhysicalSlot> slots) {
+ return new ResourceRequestPreMappingsBuilder(pendingRequests,
slots).build();
+ }
+
+ boolean isMatchingFulfilled() {
+ return matchingFulfilled;
+ }
+
+ boolean hasAvailableProfile(
+ ResourceProfile requiredResourceProfile, ResourceProfile
acquirableResourceProfile) {
+ // Check for base mappings first
+ Map<ResourceProfile, Integer> basePreMapping =
+ baseRequiredResourcePreMappings.getOrDefault(
+ requiredResourceProfile, new HashMap<>());
+ Integer remainingCnt =
basePreMapping.getOrDefault(acquirableResourceProfile, 0);
+
+ if (remainingCnt > 0) {
+ return true;
+ } else {
+ return
remainingFlexibleResources.getOrDefault(acquirableResourceProfile, 0) > 0;
+ }
+ }
+
+ void decrease(
+ ResourceProfile requiredResourceProfile, ResourceProfile
acquiredResourceProfile) {
+ Map<ResourceProfile, Integer> basePreMapping =
+ baseRequiredResourcePreMappings.getOrDefault(
+ requiredResourceProfile, new HashMap<>());
+ Integer remainingCntOfBaseMappings =
+ basePreMapping.getOrDefault(acquiredResourceProfile, 0);
+ Integer remainingCntOfFlexibleResources =
+
remainingFlexibleResources.getOrDefault(acquiredResourceProfile, 0);
+
+ Preconditions.checkState(
+ remainingCntOfBaseMappings > 0 ||
remainingCntOfFlexibleResources > 0,
+ "Remaining acquired resource profile %s to match %s is not
enough.",
+ acquiredResourceProfile,
+ requiredResourceProfile);
+
+ if (remainingCntOfBaseMappings > 0) {
+ basePreMapping.put(acquiredResourceProfile,
remainingCntOfBaseMappings - 1);
+ return;
+ }
+
+ if (remainingCntOfFlexibleResources > 0) {
+ remainingFlexibleResources.put(
+ acquiredResourceProfile, remainingCntOfFlexibleResources -
1);
+ // release a resource back to remainingFlexibleResources.
+ adjustBaseToRemainingFlexibleResources(basePreMapping);
+ }
+ }
+
+ private void adjustBaseToRemainingFlexibleResources(
+ Map<ResourceProfile, Integer> basePreMapping) {
+ Optional<Map.Entry<ResourceProfile, Integer>>
releasableOptOfBaseMappings =
+ basePreMapping.entrySet().stream()
+ .filter(entry -> entry.getValue() > 0)
+ .findFirst();
+ Preconditions.checkState(
+ releasableOptOfBaseMappings.isPresent(),
+ "No releasable mapping found in the base mappings between
resources and requests.");
+ Map.Entry<ResourceProfile, Integer> releasable =
releasableOptOfBaseMappings.get();
+ ResourceProfile releasableResourceProfile = releasable.getKey();
+
+ basePreMapping.put(releasableResourceProfile, releasable.getValue() -
1);
+
+ remainingFlexibleResources.compute(
+ releasableResourceProfile,
+ (resourceProfile, oldValue) -> oldValue == null ? 1 : oldValue
+ 1);
+ }
+
+ @VisibleForTesting
+ static ResourceRequestPreMappings createFrom(
+ boolean allMatchable,
+ final Map<ResourceProfile, Map<ResourceProfile, Integer>>
+ baseRequiredResourcePreMappings,
+ final Map<ResourceProfile, Integer> remainingFlexibleResources) {
+ return new ResourceRequestPreMappings(
+ allMatchable, baseRequiredResourcePreMappings,
remainingFlexibleResources);
+ }
+
+ @VisibleForTesting
+ Map<ResourceProfile, Map<ResourceProfile, Integer>>
getBaseRequiredResourcePreMappings() {
+ return Collections.unmodifiableMap(baseRequiredResourcePreMappings);
+ }
+
+ @VisibleForTesting
+ int getAvailableResourceCntOfBasePreMappings(
+ ResourceProfile requiredResourceProfile, ResourceProfile
acquirableResourceProfile) {
+ return baseRequiredResourcePreMappings
+ .getOrDefault(requiredResourceProfile, new HashMap<>())
+ .getOrDefault(acquirableResourceProfile, 0);
+ }
+
+ @VisibleForTesting
+ Map<ResourceProfile, Integer> getRemainingFlexibleResources() {
+ return Collections.unmodifiableMap(remainingFlexibleResources);
+ }
+
+ @VisibleForTesting
+ int getAvailableResourceCntOfRemainingFlexibleMapping(
+ ResourceProfile availableResourceProfile) {
+ return
remainingFlexibleResources.getOrDefault(availableResourceProfile, 0);
+ }
+
+ private static final class ResourceRequestPreMappingsBuilder {
+
+ private final Map<ResourceProfile, Integer> unfulfilledRequired;
+ private final Map<ResourceProfile, Integer> availableResources;
+
+ // The variable to maintain the base mappings result related
information, which can
+ // assure that the allocation for all requests could be run
successfully at least.
+ private final Map<ResourceProfile, Map<ResourceProfile, Integer>>
+ baseRequiredResourcePreMappings;
+
+ private ResourceRequestPreMappingsBuilder(
+ Collection<PendingRequest> pendingRequests,
+ Collection<? extends PhysicalSlot> slots) {
+ this.unfulfilledRequired =
+ pendingRequests.stream()
+ .collect(
+ Collectors.groupingBy(
+ PendingRequest::getResourceProfile,
+ Collectors.summingInt(ignored ->
1)));
+ this.unfulfilledRequired
+ .keySet()
+ .forEach(
+ rp ->
+ Preconditions.checkState(
+ !rp.equals(ResourceProfile.ZERO)
+ &&
!rp.equals(ResourceProfile.ANY),
+ "The required resource must not be
ResourceProfile.ZERO and ResourceProfile.ANY."));
+ this.availableResources =
+ slots.stream()
+ .collect(
+ Collectors.groupingBy(
+ PhysicalSlot::getResourceProfile,
+ Collectors.summingInt(ignored ->
1)));
+ this.availableResources
+ .keySet()
+ .forEach(
+ rp ->
+ Preconditions.checkState(
+ !rp.equals(ResourceProfile.UNKNOWN)
+ &&
!rp.equals(ResourceProfile.ZERO),
+ "The resource profile of a slot
must not be ResourceProfile.UNKNOWN and ResourceProfile.ZERO."));
+ this.baseRequiredResourcePreMappings =
+ CollectionUtil.newHashMapWithExpectedSize(slots.size());
+ }
+
+ private ResourceRequestPreMappings build() {
+ if (unfulfilledRequired.isEmpty()
+ || availableResources.isEmpty()
+ || !canFulfillDesiredResources()) {
+ return currentPreMappings(false);
+ }
+
+ buildFineGrainedRequestFulfilledExactMapping();
+ if (isMatchingFulfilled()) {
+ return currentPreMappings(true);
+ }
+
+ buildRemainingFineGrainedRequestFulfilledAnyMapping();
+ if (isMatchingFulfilled()) {
+ return currentPreMappings(true);
+ }
+
+ buildUnknownRequestFulfilledMapping();
+ return currentPreMappings(isMatchingFulfilled());
+ }
+
+ private void buildFineGrainedRequestFulfilledExactMapping() {
+ for (Map.Entry<ResourceProfile, Integer> unfulfilledEntry :
+ new HashMap<>(unfulfilledRequired).entrySet()) {
+ ResourceProfile requiredFineGrainedResourceProfile =
unfulfilledEntry.getKey();
+ if
(ResourceProfile.UNKNOWN.equals(requiredFineGrainedResourceProfile)) {
+ continue;
+ }
+ // checking fine-grained
+ int unfulfilledFineGrainedRequiredCnt =
unfulfilledEntry.getValue();
+ int availableFineGrainedResourceCnt =
+
availableResources.getOrDefault(requiredFineGrainedResourceProfile, 0);
+ if (unfulfilledFineGrainedRequiredCnt <= 0
+ || availableFineGrainedResourceCnt <= 0) {
+ continue;
+ }
+
+ int diff = unfulfilledFineGrainedRequiredCnt -
availableFineGrainedResourceCnt;
+
+ Map<ResourceProfile, Integer> fulfilledProfileCount =
+ baseRequiredResourcePreMappings.computeIfAbsent(
+ requiredFineGrainedResourceProfile, ignored ->
new HashMap<>());
+ fulfilledProfileCount.put(
+ requiredFineGrainedResourceProfile,
+ diff > 0
+ ? availableFineGrainedResourceCnt
+ : unfulfilledFineGrainedRequiredCnt);
+
+ int newUnfulfilledFineGrainedRequiredCnt = Math.max(diff, 0);
+ int unAvailableFineGrainedResourceCnt = Math.max(-diff, 0);
+ availableResources.put(
+ requiredFineGrainedResourceProfile,
unAvailableFineGrainedResourceCnt);
+ unfulfilledRequired.put(
+ requiredFineGrainedResourceProfile,
newUnfulfilledFineGrainedRequiredCnt);
+ }
+ }
+
+ private void buildRemainingFineGrainedRequestFulfilledAnyMapping() {
+ Integer availableResourceProfileANYCount =
+ availableResources.getOrDefault(ResourceProfile.ANY, 0);
+ if (availableResourceProfileANYCount <= 0) {
+ return;
+ }
+
+ for (Map.Entry<ResourceProfile, Integer> unfulfilledEntry :
+ new HashMap<>(unfulfilledRequired).entrySet()) {
+ availableResourceProfileANYCount =
+ availableResources.getOrDefault(ResourceProfile.ANY,
0);
+
+ if (availableResourceProfileANYCount <= 0) {
+ return;
+ }
+ ResourceProfile fineGrainedRequestResourceProfile =
unfulfilledEntry.getKey();
+ if
(ResourceProfile.UNKNOWN.equals(fineGrainedRequestResourceProfile)) {
+ continue;
+ }
+ // checking fine-grained
+ int unfulfilledFineGrainedRequiredCnt =
+
unfulfilledRequired.getOrDefault(fineGrainedRequestResourceProfile, 0);
+ if (unfulfilledFineGrainedRequiredCnt <= 0) {
+ continue;
+ }
+
+ int diff = unfulfilledFineGrainedRequiredCnt -
availableResourceProfileANYCount;
+
+ Map<ResourceProfile, Integer> fulfilledProfileCount =
+ baseRequiredResourcePreMappings.computeIfAbsent(
+ fineGrainedRequestResourceProfile, ignored ->
new HashMap<>());
+ fulfilledProfileCount.put(
+ ResourceProfile.ANY,
+ diff > 0
+ ? availableResourceProfileANYCount
+ : unfulfilledFineGrainedRequiredCnt);
+
+ int newUnfulfilledFineGrainedRequiredCnt = Math.max(diff, 0);
+ int newAvailableResourceProfileANYCount = Math.max(-diff, 0);
+ availableResources.put(ResourceProfile.ANY,
newAvailableResourceProfileANYCount);
+ unfulfilledRequired.put(
+ fineGrainedRequestResourceProfile,
newUnfulfilledFineGrainedRequiredCnt);
+ }
+ }
+
+ private void buildUnknownRequestFulfilledMapping() {
+ if (unfulfilledRequired.getOrDefault(ResourceProfile.UNKNOWN, 0)
<= 0) {
+ return;
+ }
+
+ for (Map.Entry<ResourceProfile, Integer> availableResourceEntry :
+ new HashMap<>(availableResources).entrySet()) {
+ Integer unfulfilledUnknownRequiredCnt =
+
unfulfilledRequired.getOrDefault(ResourceProfile.UNKNOWN, 0);
+ ResourceProfile availableResourceProfile =
availableResourceEntry.getKey();
+ int availableResourceCnt =
+
availableResources.getOrDefault(availableResourceProfile, 0);
+ if (availableResourceCnt <= 0) {
+ continue;
+ }
+ if (unfulfilledUnknownRequiredCnt <= 0) {
+ return;
+ }
+ int diff = unfulfilledUnknownRequiredCnt -
availableResourceCnt;
+ Map<ResourceProfile, Integer> fulfilledProfileCount =
+ baseRequiredResourcePreMappings.computeIfAbsent(
+ ResourceProfile.UNKNOWN, ignored -> new
HashMap<>());
+ fulfilledProfileCount.put(
+ availableResourceProfile,
+ diff > 0 ? availableResourceCnt :
unfulfilledUnknownRequiredCnt);
+
+ int newUnfulfilledUnknownRequiredCnt = Math.max(diff, 0);
+ int newAvailableResourceCnt = Math.max(-diff, 0);
+ availableResources.put(availableResourceProfile,
newAvailableResourceCnt);
+ unfulfilledRequired.put(ResourceProfile.UNKNOWN,
newUnfulfilledUnknownRequiredCnt);
+ }
+ }
+
+ private ResourceRequestPreMappings currentPreMappings(boolean
matchingFulfilled) {
+ if (!matchingFulfilled) {
+ return new ResourceRequestPreMappings(false, new HashMap<>(),
new HashMap<>());
+ }
+ return new ResourceRequestPreMappings(
+ true,
+
Collections.unmodifiableMap(baseRequiredResourcePreMappings),
+ Collections.unmodifiableMap(availableResources));
+ }
+
+ private boolean isMatchingFulfilled() {
+ for (ResourceProfile unfulfilledProfile :
unfulfilledRequired.keySet()) {
+ Integer unfulfilled =
unfulfilledRequired.getOrDefault(unfulfilledProfile, 0);
+ if (unfulfilled > 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean canFulfillDesiredResources() {
+ Integer totalUnfulfilledCnt =
+ unfulfilledRequired.values().stream().reduce(0,
Integer::sum);
+ Integer totalAvailableCnt =
+ availableResources.values().stream().reduce(0,
Integer::sum);
+ return totalAvailableCnt >= totalUnfulfilledCnt;
+ }
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategy.java
index 8b2fb88c637..c70f15ad17e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategy.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategy.java
@@ -67,12 +67,10 @@ public enum TasksBalancedRequestSlotMatchingStrategy
implements RequestSlotMatch
public int compare(PhysicalSlotElement left, PhysicalSlotElement
right) {
final LoadingWeight leftLoad =
taskExecutorsLoading.getOrDefault(
-
left.physicalSlot.getTaskManagerLocation().getResourceID(),
- DefaultLoadingWeight.EMPTY);
+ left.getResourceID(), DefaultLoadingWeight.EMPTY);
final LoadingWeight rightLoad =
taskExecutorsLoading.getOrDefault(
-
right.physicalSlot.getTaskManagerLocation().getResourceID(),
- DefaultLoadingWeight.EMPTY);
+ right.getResourceID(), DefaultLoadingWeight.EMPTY);
return leftLoad.compareTo(rightLoad);
}
}
@@ -97,6 +95,14 @@ public enum TasksBalancedRequestSlotMatchingStrategy
implements RequestSlotMatch
return false;
}
+ public ResourceID getResourceID() {
+ return physicalSlot.getTaskManagerLocation().getResourceID();
+ }
+
+ public ResourceProfile getResourceProfile() {
+ return physicalSlot.getResourceProfile();
+ }
+
@Override
public int hashCode() {
return physicalSlot.hashCode();
@@ -125,17 +131,17 @@ public enum TasksBalancedRequestSlotMatchingStrategy
implements RequestSlotMatch
Collection<? extends PhysicalSlot> slots,
Collection<PendingRequest> pendingRequests,
Map<ResourceID, LoadingWeight> taskExecutorsLoad) {
- if (pendingRequests.isEmpty()) {
+ ResourceRequestPreMappings resourceRequestPreMappings =
+ ResourceRequestPreMappings.createFrom(pendingRequests, slots);
+ if (!resourceRequestPreMappings.isMatchingFulfilled()) {
return Collections.emptyList();
}
final Collection<RequestSlotMatch> resultingMatches = new
ArrayList<>();
final List<PendingRequest> sortedRequests =
sortByLoadingDescend(pendingRequests);
- LOG.debug(
- "Available slots: {}, sortedRequests: {}, taskExecutorsLoad:
{}",
- slots,
- sortedRequests,
- taskExecutorsLoad);
+
+ logDebugInfo(slots, taskExecutorsLoad, sortedRequests);
+
Collection<PhysicalSlotElement> slotElements =
slots.stream().map(PhysicalSlotElement::new).collect(Collectors.toList());
final Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>>
profileSlots =
@@ -143,33 +149,51 @@ public enum TasksBalancedRequestSlotMatchingStrategy
implements RequestSlotMatch
final Map<ResourceID, Set<PhysicalSlotElement>> taskExecutorSlots =
groupSlotsByTaskExecutor(slotElements);
for (PendingRequest request : sortedRequests) {
- Optional<PhysicalSlotElement> bestSlotEle =
- tryMatchPhysicalSlot(request, profileSlots,
taskExecutorsLoad);
- if (bestSlotEle.isPresent()) {
- PhysicalSlotElement slotElement = bestSlotEle.get();
- updateReferenceAfterMatching(
- profileSlots,
- taskExecutorsLoad,
- taskExecutorSlots,
- slotElement,
- request.getLoading());
+ ResourceProfile requestProfile = request.getResourceProfile();
+ Optional<PhysicalSlotElement> bestSlotEleOpt =
+ tryMatchPhysicalSlot(
+ request, profileSlots, taskExecutorsLoad,
resourceRequestPreMappings);
+ if (bestSlotEleOpt.isPresent()) {
+ PhysicalSlotElement slotElement = bestSlotEleOpt.get();
+ updateTaskExecutorsLoad(taskExecutorsLoad, request,
slotElement);
+ updateReferenceRemainingSlots(profileSlots, taskExecutorSlots,
slotElement);
+ resourceRequestPreMappings.decrease(
+ requestProfile, slotElement.getResourceProfile());
resultingMatches.add(RequestSlotMatch.createFor(request,
slotElement.physicalSlot));
}
}
return resultingMatches;
}
+ private static void updateTaskExecutorsLoad(
+ Map<ResourceID, LoadingWeight> taskExecutorsLoad,
+ PendingRequest request,
+ PhysicalSlotElement slotElement) {
+ taskExecutorsLoad.compute(
+ slotElement.getResourceID(),
+ (ignoredId, oldLoading) ->
+ Objects.isNull(oldLoading)
+ ? request.getLoading()
+ : oldLoading.merge(request.getLoading()));
+ }
+
+ private static void logDebugInfo(
+ Collection<? extends PhysicalSlot> slots,
+ Map<ResourceID, LoadingWeight> taskExecutorsLoad,
+ List<PendingRequest> sortedRequests) {
+ LOG.debug(
+ "Available slots: {}, sortedRequests: {}, taskExecutorsLoad:
{}",
+ slots,
+ sortedRequests,
+ taskExecutorsLoad);
+ }
+
private Map<ResourceID, Set<PhysicalSlotElement>> groupSlotsByTaskExecutor(
Collection<PhysicalSlotElement> slotElements) {
return slotElements.stream()
.collect(
Collectors.groupingBy(
- physicalSlot ->
- physicalSlot
- .physicalSlot
- .getTaskManagerLocation()
- .getResourceID(),
- Collectors.toSet()));
+ PhysicalSlotElement::getResourceID,
Collectors.toSet()));
}
private Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>>
getSlotCandidatesByProfile(
@@ -180,7 +204,7 @@ public enum TasksBalancedRequestSlotMatchingStrategy
implements RequestSlotMatch
new PhysicalSlotElementPriorityComparator(taskExecutorsLoad);
for (PhysicalSlotElement slotEle : slotElements) {
result.compute(
- slotEle.physicalSlot.getResourceProfile(),
+ slotEle.getResourceProfile(),
(resourceProfile, oldSlots) -> {
HeapPriorityQueue<PhysicalSlotElement> values =
Objects.isNull(oldSlots)
@@ -197,12 +221,17 @@ public enum TasksBalancedRequestSlotMatchingStrategy
implements RequestSlotMatch
private Optional<PhysicalSlotElement> tryMatchPhysicalSlot(
PendingRequest request,
Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>>
profileToSlotMap,
- Map<ResourceID, LoadingWeight> taskExecutorsLoad) {
+ Map<ResourceID, LoadingWeight> taskExecutorsLoad,
+ ResourceRequestPreMappings resourceRequestPreMappings) {
final ResourceProfile requestProfile = request.getResourceProfile();
final Set<ResourceProfile> candidateProfiles =
profileToSlotMap.keySet().stream()
- .filter(slotProfile ->
slotProfile.isMatching(requestProfile))
+ .filter(
+ slotProfile ->
+ slotProfile.isMatching(requestProfile)
+ &&
resourceRequestPreMappings.hasAvailableProfile(
+ requestProfile,
slotProfile))
.collect(Collectors.toSet());
return candidateProfiles.stream()
@@ -216,25 +245,17 @@ public enum TasksBalancedRequestSlotMatchingStrategy
implements RequestSlotMatch
.min(new PhysicalSlotElementComparator(taskExecutorsLoad));
}
- private void updateReferenceAfterMatching(
+ private void updateReferenceRemainingSlots(
Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>>
profileSlots,
- Map<ResourceID, LoadingWeight> taskExecutorsLoad,
Map<ResourceID, Set<PhysicalSlotElement>> taskExecutorSlots,
- PhysicalSlotElement targetSlotElement,
- LoadingWeight loading) {
- final ResourceID tmID =
-
targetSlotElement.physicalSlot.getTaskManagerLocation().getResourceID();
- // Update the loading for the target task executor.
- taskExecutorsLoad.compute(
- tmID,
- (ignoredId, oldLoading) ->
- Objects.isNull(oldLoading) ? loading :
oldLoading.merge(loading));
+ PhysicalSlotElement targetSlotElement) {
+ final ResourceID tmID = targetSlotElement.getResourceID();
// Update the sorted set for slots that is located on the same task
executor as targetSlot.
// Use Map#remove to avoid the ConcurrentModifyException.
final Set<PhysicalSlotElement> slotToReSort =
taskExecutorSlots.remove(tmID);
for (PhysicalSlotElement slotEle : slotToReSort) {
HeapPriorityQueue<PhysicalSlotElement> slotsOfProfile =
-
profileSlots.get(slotEle.physicalSlot.getResourceProfile());
+ profileSlots.get(slotEle.getResourceProfile());
// Re-add for the latest order.
slotsOfProfile.remove(slotEle);
if (!slotEle.equals(targetSlotElement)) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java
index cbb8d2cda4a..3a047d8b45b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java
@@ -221,7 +221,7 @@ class PreferredAllocationRequestSlotMatchingStrategyTest {
return createSlot(finedGrainProfile, new AllocationID(), tmLocation);
}
- private static TestingPhysicalSlot createSlot(
+ static TestingPhysicalSlot createSlot(
ResourceProfile profile, AllocationID allocationId,
TaskManagerLocation tmLocation) {
return TestingPhysicalSlot.builder()
.withAllocationID(allocationId)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappingsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappingsTest.java
new file mode 100644
index 00000000000..c2bf3fd691b
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappingsTest.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
+import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ResourceRequestPreMappings}. */
+class ResourceRequestPreMappingsTest {
+
+ private static final ResourceProfile smallFineGrainedResourceProfile =
+ ResourceProfile.newBuilder().setManagedMemoryMB(10).build();
+
+ private static final ResourceProfile bigGrainedResourceProfile =
+ ResourceProfile.newBuilder().setManagedMemoryMB(20).build();
+
+ @Test
+ void testIncludeInvalidProfileOfRequestOrResource() {
+ // For invalid resource.
+ ResourceProfile[] profiles =
+ new ResourceProfile[] {ResourceProfile.UNKNOWN,
ResourceProfile.ZERO};
+ for (ResourceProfile profile : profiles) {
+ assertThatThrownBy(
+ () ->
+ ResourceRequestPreMappings.createFrom(
+ Collections.emptyList(),
newTestingSlots(profile)))
+ .isInstanceOf(IllegalStateException.class);
+ }
+
+ // For invalid request.
+ profiles = new ResourceProfile[] {ResourceProfile.ANY,
ResourceProfile.ZERO};
+ for (ResourceProfile profile : profiles) {
+ assertThatThrownBy(
+ () ->
+ ResourceRequestPreMappings.createFrom(
+ newPendingRequests(profile),
Collections.emptyList()))
+ .isInstanceOf(IllegalStateException.class);
+ }
+ }
+
+ @Test
+ void testBuildWhenUnavailableTotalResourcesOrEmptyRequestsResources() {
+ // Testing for unavailable total resource
+ ResourceRequestPreMappings preMappings =
+ ResourceRequestPreMappings.createFrom(
+ newPendingRequests(ResourceProfile.UNKNOWN),
Collections.emptyList());
+ assertThat(preMappings.isMatchingFulfilled()).isFalse();
+ assertThat(preMappings.getBaseRequiredResourcePreMappings()).isEmpty();
+
+ // Testing for empty slots or requests
+ preMappings =
+ ResourceRequestPreMappings.createFrom(
+ Collections.emptyList(), Collections.emptyList());
+ assertNotMatchable(preMappings);
+ }
+
+ @Test
+ void testBuildWhenMissingResourceToMatchFineGrainedRequest() {
+
+ // Testing for missing available fine-grained resources when only
fine-grained request
+ ResourceRequestPreMappings preMappings =
+ ResourceRequestPreMappings.createFrom(
+ newPendingRequests(
+ smallFineGrainedResourceProfile,
+ smallFineGrainedResourceProfile,
+ bigGrainedResourceProfile),
+ newTestingSlots(
+ smallFineGrainedResourceProfile,
+ smallFineGrainedResourceProfile,
+ smallFineGrainedResourceProfile));
+ assertNotMatchable(preMappings);
+
+ // Testing for missing available fine-grained resources when
fine-grained and unknown
+ // requests.
+ preMappings =
+ ResourceRequestPreMappings.createFrom(
+ newPendingRequests(
+ ResourceProfile.UNKNOWN,
+ smallFineGrainedResourceProfile,
+ bigGrainedResourceProfile),
+ newTestingSlots(
+ smallFineGrainedResourceProfile,
+ smallFineGrainedResourceProfile,
+ smallFineGrainedResourceProfile));
+ assertNotMatchable(preMappings);
+ }
+
+ @Test
+ void testBuildSuccessfullyThatFinedGrainedMatchedExactly() {
+ ResourceRequestPreMappings preMappings =
+ ResourceRequestPreMappings.createFrom(
+ newPendingRequests(
+ smallFineGrainedResourceProfile,
+ smallFineGrainedResourceProfile,
+ bigGrainedResourceProfile),
+ newTestingSlots(
+ bigGrainedResourceProfile,
+ smallFineGrainedResourceProfile,
+ smallFineGrainedResourceProfile,
+ smallFineGrainedResourceProfile));
+ assertThat(preMappings.isMatchingFulfilled()).isTrue();
+ assertThat(preMappings.getBaseRequiredResourcePreMappings())
+ .hasSize(2)
+ .contains(
+ new AbstractMap.SimpleEntry<>(
+ smallFineGrainedResourceProfile,
+ new HashMap<>() {
+ {
+ put(smallFineGrainedResourceProfile,
2);
+ }
+ }),
+ new AbstractMap.SimpleEntry<>(
+ bigGrainedResourceProfile,
+ new HashMap<>() {
+ {
+ put(bigGrainedResourceProfile, 1);
+ }
+ }));
+ assertThat(preMappings.getRemainingFlexibleResources())
+ .contains(new
AbstractMap.SimpleEntry<>(smallFineGrainedResourceProfile, 1));
+ }
+
+ @Test
+ void testBuildSuccessfullyThatFinedGrainedToMatchedUnknownRequests() {
+
+ // Testing for available all resources and no UNKNOWN required
resource.
+ ResourceRequestPreMappings preMappings =
+ ResourceRequestPreMappings.createFrom(
+ newPendingRequests(
+ ResourceProfile.UNKNOWN,
+ ResourceProfile.UNKNOWN,
+ smallFineGrainedResourceProfile,
+ bigGrainedResourceProfile),
+ newTestingSlots(
+ bigGrainedResourceProfile,
+ bigGrainedResourceProfile,
+ bigGrainedResourceProfile,
+ ResourceProfile.ANY,
+ smallFineGrainedResourceProfile,
+ smallFineGrainedResourceProfile));
+ assertThat(preMappings.isMatchingFulfilled()).isTrue();
+ assertThat(preMappings.getBaseRequiredResourcePreMappings())
+ .hasSize(3)
+ .contains(
+ new AbstractMap.SimpleEntry<>(
+ smallFineGrainedResourceProfile,
+ new HashMap<>() {
+ {
+ put(smallFineGrainedResourceProfile,
1);
+ }
+ }),
+ new AbstractMap.SimpleEntry<>(
+ bigGrainedResourceProfile,
+ new HashMap<>() {
+ {
+ put(bigGrainedResourceProfile, 1);
+ }
+ }));
+ Map<ResourceProfile, Integer> unknownBaseMapping =
+
preMappings.getBaseRequiredResourcePreMappings().get(ResourceProfile.UNKNOWN);
+ assertThat(unknownBaseMapping.values().stream().reduce(0,
Integer::sum)).isEqualTo(2);
+ assertThat(
+
preMappings.getRemainingFlexibleResources().values().stream()
+ .reduce(0, Integer::sum))
+ .isEqualTo(2);
+ }
+
+ @Test
+ void testBuildSuccessfullyThatAnyToMatchedUnknownAndFineGrainedRequests() {
+
+ // Testing for available all resources and no UNKNOWN required
resource.
+ ResourceRequestPreMappings preMappings =
+ ResourceRequestPreMappings.createFrom(
+ newPendingRequests(
+ ResourceProfile.UNKNOWN,
+ ResourceProfile.UNKNOWN,
+ smallFineGrainedResourceProfile,
+ smallFineGrainedResourceProfile,
+ bigGrainedResourceProfile,
+ bigGrainedResourceProfile),
+ newTestingSlots(
+ bigGrainedResourceProfile,
+ smallFineGrainedResourceProfile,
+ ResourceProfile.ANY,
+ ResourceProfile.ANY,
+ ResourceProfile.ANY,
+ ResourceProfile.ANY));
+ assertThat(preMappings.isMatchingFulfilled()).isTrue();
+ assertThat(preMappings.getBaseRequiredResourcePreMappings())
+ .hasSize(3)
+ .contains(
+ new AbstractMap.SimpleEntry<>(
+ smallFineGrainedResourceProfile,
+ new HashMap<>() {
+ {
+ put(smallFineGrainedResourceProfile,
1);
+ put(ResourceProfile.ANY, 1);
+ }
+ }),
+ new AbstractMap.SimpleEntry<>(
+ bigGrainedResourceProfile,
+ new HashMap<>() {
+ {
+ put(bigGrainedResourceProfile, 1);
+ put(ResourceProfile.ANY, 1);
+ }
+ }),
+ new AbstractMap.SimpleEntry<>(
+ ResourceProfile.UNKNOWN,
+ new HashMap<>() {
+ {
+ put(ResourceProfile.ANY, 2);
+ }
+ }));
+ assertThat(
+
preMappings.getRemainingFlexibleResources().values().stream()
+ .reduce(0, Integer::sum))
+ .isZero();
+ }
+
+ @Test
+ void testHasAvailableProfile() {
+ ResourceRequestPreMappings mappings =
+ ResourceRequestPreMappings.createFrom(
+ newPendingRequests(ResourceProfile.UNKNOWN,
ResourceProfile.UNKNOWN),
+ newTestingSlots(
+ smallFineGrainedResourceProfile,
+ smallFineGrainedResourceProfile,
+ smallFineGrainedResourceProfile));
+
+ // Testing available resource in flexible resources
+ assertThat(
+ mappings.hasAvailableProfile(
+ smallFineGrainedResourceProfile,
smallFineGrainedResourceProfile))
+ .isTrue();
+ assertThat(
+ mappings.hasAvailableProfile(
+ smallFineGrainedResourceProfile,
bigGrainedResourceProfile))
+ .isFalse();
+
+ // Testing available resource in base mapping resources
+ assertThat(
+ mappings.hasAvailableProfile(
+ ResourceProfile.UNKNOWN,
smallFineGrainedResourceProfile))
+ .isTrue();
+ assertThat(mappings.hasAvailableProfile(ResourceProfile.UNKNOWN,
bigGrainedResourceProfile))
+ .isFalse();
+ }
+
+ @Test
+ void testDecrease() {
+ // Testing decrease resource in base mapping
+ ResourceRequestPreMappings mappings =
+ ResourceRequestPreMappings.createFrom(
+ newPendingRequests(ResourceProfile.UNKNOWN,
ResourceProfile.UNKNOWN),
+ newTestingSlots(
+ smallFineGrainedResourceProfile,
+ smallFineGrainedResourceProfile,
+ smallFineGrainedResourceProfile));
+
+ // Testing decrease resource in base mapping resources successfully
+ mappings.decrease(ResourceProfile.UNKNOWN,
smallFineGrainedResourceProfile);
+ assertThat(
+ mappings.getAvailableResourceCntOfBasePreMappings(
+ ResourceProfile.UNKNOWN,
smallFineGrainedResourceProfile))
+ .isOne();
+ // Testing decrease resource in base mapping resources failed
+ assertThatThrownBy(
+ () ->
+ mappings.decrease(
+ smallFineGrainedResourceProfile,
+ smallFineGrainedResourceProfile))
+ .isInstanceOf(IllegalStateException.class);
+
+ // Testing decrease resource in flexible resources
+ ResourceRequestPreMappings mappings2 =
+ ResourceRequestPreMappings.createFrom(
+ true,
+ new HashMap<>() {
+ {
+ put(
+ ResourceProfile.UNKNOWN,
+ new HashMap<>() {
+ {
+
put(smallFineGrainedResourceProfile, 2);
+ }
+ });
+ }
+ },
+ new HashMap<>() {
+ {
+ put(smallFineGrainedResourceProfile, 1);
+ put(bigGrainedResourceProfile, 2);
+ }
+ });
+ // Testing decrease resource in flexible resources successfully
+ mappings2.decrease(ResourceProfile.UNKNOWN, bigGrainedResourceProfile);
+ assertThat(
+
mappings2.getAvailableResourceCntOfRemainingFlexibleMapping(
+ bigGrainedResourceProfile))
+ .isOne();
+ assertThat(
+ mappings2.getAvailableResourceCntOfBasePreMappings(
+ ResourceProfile.UNKNOWN,
smallFineGrainedResourceProfile))
+ .isOne();
+ assertThat(
+
mappings2.getAvailableResourceCntOfRemainingFlexibleMapping(
+ smallFineGrainedResourceProfile))
+ .isEqualTo(2);
+
+ // Testing decrease resource in flexible resources failed
+ mappings2.decrease(ResourceProfile.UNKNOWN,
smallFineGrainedResourceProfile);
+ assertThatThrownBy(
+ () ->
+ mappings2.decrease(
+ ResourceProfile.UNKNOWN,
smallFineGrainedResourceProfile))
+ .isInstanceOf(IllegalStateException.class);
+ }
+
+ private List<PendingRequest> newPendingRequests(ResourceProfile...
requiredProfiles) {
+ ArrayList<PendingRequest> pendingRequests = new ArrayList<>();
+ if (requiredProfiles == null || requiredProfiles.length == 0) {
+ return pendingRequests;
+ }
+ for (ResourceProfile requiredProfile : requiredProfiles) {
+ pendingRequests.add(
+ PendingRequest.createNormalRequest(
+ new SlotRequestId(),
+ Preconditions.checkNotNull(requiredProfile),
+ DefaultLoadingWeight.EMPTY,
+ Collections.emptyList()));
+ }
+ return pendingRequests;
+ }
+
+ private List<TestingSlot> newTestingSlots(ResourceProfile... slotProfiles)
{
+ ArrayList<TestingSlot> slots = new ArrayList<>();
+ if (slotProfiles == null || slotProfiles.length == 0) {
+ return slots;
+ }
+ for (ResourceProfile slotProfile : slotProfiles) {
+ slots.add(new
TestingSlot(Preconditions.checkNotNull(slotProfile)));
+ }
+ return slots;
+ }
+
+ private void assertNotMatchable(ResourceRequestPreMappings preMappings) {
+ assertThat(preMappings.isMatchingFulfilled()).isFalse();
+ assertThat(preMappings.getBaseRequiredResourcePreMappings()).isEmpty();
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategyTest.java
new file mode 100644
index 00000000000..706c3c8fefe
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategyTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static
org.apache.flink.runtime.jobmaster.slotpool.PreferredAllocationRequestSlotMatchingStrategyTest.createSlot;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Testing for {@link TasksBalancedRequestSlotMatchingStrategy}. */
+class TasksBalancedRequestSlotMatchingStrategyTest {
+
+ private static final ResourceProfile smallFineGrainedProfile =
+ ResourceProfile.newBuilder().setCpuCores(1d).build();
+ private static final ResourceProfile bigFineGrainedProfile =
+ ResourceProfile.newBuilder().setCpuCores(2d).build();
+
+ private static final TaskManagerLocation tmLocation1 = new
LocalTaskManagerLocation();
+ private static final TaskManagerLocation tmLocation2 = new
LocalTaskManagerLocation();
+
+ @Test
+ void
testMatchRequestsAndSlotsRiskOfFineGrainedResourcesMatchedToUnknownProfile() {
+ // The case is aiming to check when the numbers of requests and
resources are equals but
+ // having the risk of matching resources that would be matched with
fine-grained request
+ // with ResourceProfile>UNKNOWN.
+ final Collection<PendingRequest> pendingRequests =
+ Arrays.asList(
+ createRequest(ResourceProfile.UNKNOWN, 100),
+ createRequest(bigFineGrainedProfile, 1));
+ List<TestingPhysicalSlot> slots =
+ Arrays.asList(
+ createSlot(bigFineGrainedProfile, new AllocationID(),
tmLocation1),
+ createSlot(smallFineGrainedProfile, new
AllocationID(), tmLocation2));
+ final Collection<RequestSlotMatchingStrategy.RequestSlotMatch>
requestSlotMatches =
+
TasksBalancedRequestSlotMatchingStrategy.INSTANCE.matchRequestsAndSlots(
+ slots,
+ pendingRequests,
+ new HashMap<>() {
+ {
+ put(tmLocation1.getResourceID(),
DefaultLoadingWeight.EMPTY);
+ put(tmLocation2.getResourceID(), new
DefaultLoadingWeight(9));
+ }
+ });
+ assertThat(requestSlotMatches).hasSize(2);
+ }
+
+ @Test
+ void testMatchRequestsAndSlotsMissingFineGrainedResources() {
+
+ PendingRequest requestWithBigProfile =
createRequest(bigFineGrainedProfile, 6);
+ PendingRequest requestWithUnknownProfile =
createRequest(ResourceProfile.UNKNOWN, 6);
+ PendingRequest requestWithSmallProfile =
createRequest(smallFineGrainedProfile, 6);
+
+ final Collection<PendingRequest> pendingRequests =
+ Arrays.asList(
+ requestWithSmallProfile, requestWithUnknownProfile,
requestWithBigProfile);
+ List<TestingPhysicalSlot> slots =
+ Arrays.asList(
+ createSlot(
+ bigFineGrainedProfile,
+ new AllocationID(),
+ new LocalTaskManagerLocation()),
+ createSlot(
+ bigFineGrainedProfile,
+ new AllocationID(),
+ new LocalTaskManagerLocation()),
+ createSlot(
+ bigFineGrainedProfile,
+ new AllocationID(),
+ new LocalTaskManagerLocation()));
+ final Collection<RequestSlotMatchingStrategy.RequestSlotMatch>
requestSlotMatches =
+
TasksBalancedRequestSlotMatchingStrategy.INSTANCE.matchRequestsAndSlots(
+ slots, pendingRequests, new HashMap<>());
+ assertThat(requestSlotMatches).isEmpty();
+ }
+
+ private static PendingRequest createRequest(ResourceProfile
requestProfile, float loading) {
+ return PendingRequest.createNormalRequest(
+ new SlotRequestId(),
+ requestProfile,
+ new DefaultLoadingWeight(loading),
+ Collections.emptyList());
+ }
+}