This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ab49fae9960 [FLINK-38799][runtime] Rename `LoadingWeight` to
`TaskExecutionLoad` (#27333)
ab49fae9960 is described below
commit ab49fae99606b87d8facc7eb404bda1b27bf9c47
Author: Ferenc Csaky <[email protected]>
AuthorDate: Fri Jan 23 10:41:53 2026 +0100
[FLINK-38799][runtime] Rename `LoadingWeight` to `TaskExecutionLoad`
(#27333)
---
.../slotpool/DeclarativeSlotPoolBridge.java | 21 +++----
.../runtime/jobmaster/slotpool/PendingRequest.java | 28 ++++-----
.../jobmaster/slotpool/PhysicalSlotRequest.java | 20 +++----
...erredAllocationRequestSlotMatchingStrategy.java | 12 ++--
.../slotpool/RequestSlotMatchingStrategy.java | 6 +-
.../SimpleRequestSlotMatchingStrategy.java | 4 +-
.../TasksBalancedRequestSlotMatchingStrategy.java | 70 ++++++++++++----------
.../scheduler/ExecutionSlotSharingGroup.java | 16 ++---
.../scheduler/SimpleExecutionSlotAllocator.java | 4 +-
.../SlotSharingExecutionSlotAllocator.java | 2 +-
.../allocator/SlotSharingSlotAllocator.java | 12 ++--
.../TasksBalancedSlotMatchingResolver.java | 39 ++++++------
.../runtime/scheduler/loading/LoadingWeight.java | 43 -------------
.../DefaultTaskExecutionLoad.java} | 51 ++++++++--------
.../HasTaskExecutionLoad.java} | 22 ++++---
.../scheduler/taskexecload/TaskExecutionLoad.java | 65 ++++++++++++++++++++
.../slotpool/PhysicalSlotProviderExtension.java | 4 +-
...lSlotProviderImplWithSpreadOutStrategyTest.java | 4 +-
.../slotpool/PhysicalSlotRequestUtils.java | 6 +-
...dAllocationRequestSlotMatchingStrategyTest.java | 8 +--
.../slotpool/ResourceRequestPreMappingsTest.java | 4 +-
.../SimpleRequestSlotMatchingStrategyTest.java | 10 ++--
...sksBalancedRequestSlotMatchingStrategyTest.java | 8 +--
.../slotpool/TestingDeclarativeSlotPool.java | 8 +--
.../TestingDeclarativeSlotPoolBuilder.java | 6 +-
.../AbstractSlotMatchingResolverTest.java | 12 ++--
.../DefaultTaskExecutionLoadTest.java} | 22 +++++--
27 files changed, 273 insertions(+), 234 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index a55e8c80b2f..2fdd45b5018 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -28,7 +28,7 @@ import
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -66,13 +66,13 @@ public class DeclarativeSlotPoolBridge extends
DeclarativeSlotPoolService implem
private static final class FulfilledAllocation {
final AllocationID allocationID;
final ResourceID taskExecutorID;
- final LoadingWeight loadingWeight;
+ final TaskExecutionLoad taskExecutionLoad;
- FulfilledAllocation(PhysicalSlot slot, LoadingWeight loadingWeight) {
+ FulfilledAllocation(PhysicalSlot slot, TaskExecutionLoad
taskExecutionLoad) {
this.allocationID =
Preconditions.checkNotNull(slot.getAllocationId());
this.taskExecutorID =
Preconditions.checkNotNull(slot.getTaskManagerLocation().getResourceID());
- this.loadingWeight = Preconditions.checkNotNull(loadingWeight);
+ this.taskExecutionLoad =
Preconditions.checkNotNull(taskExecutionLoad);
}
@Override
@@ -83,7 +83,7 @@ public class DeclarativeSlotPoolBridge extends
DeclarativeSlotPoolService implem
FulfilledAllocation that = (FulfilledAllocation) o;
return Objects.equals(allocationID, that.allocationID)
&& Objects.equals(taskExecutorID, that.taskExecutorID)
- && Objects.equals(loadingWeight, that.loadingWeight);
+ && Objects.equals(taskExecutionLoad,
that.taskExecutionLoad);
}
}
@@ -261,16 +261,16 @@ public class DeclarativeSlotPoolBridge extends
DeclarativeSlotPoolService implem
}
}
- private Map<ResourceID, LoadingWeight> getTaskExecutorsLoadingView() {
- final Map<ResourceID, LoadingWeight> result = new HashMap<>();
+ private Map<ResourceID, TaskExecutionLoad> getTaskExecutorsLoadingView() {
+ final Map<ResourceID, TaskExecutionLoad> result = new HashMap<>();
Collection<FulfilledAllocation> fulfilledAllocations =
fulfilledRequests.values();
for (FulfilledAllocation allocation : fulfilledAllocations) {
result.compute(
allocation.taskExecutorID,
(ignoredID, oldLoading) ->
Objects.isNull(oldLoading)
- ? allocation.loadingWeight
- :
oldLoading.merge(allocation.loadingWeight));
+ ? allocation.taskExecutionLoad
+ :
oldLoading.merge(allocation.taskExecutionLoad));
}
return result;
}
@@ -353,7 +353,8 @@ public class DeclarativeSlotPoolBridge extends
DeclarativeSlotPoolService implem
getDeclarativeSlotPool()
.reserveFreeSlot(allocationId,
pendingRequest.getResourceProfile());
fulfilledRequests.put(
- slotRequestId, new FulfilledAllocation(slot,
pendingRequest.getLoading()));
+ slotRequestId,
+ new FulfilledAllocation(slot,
pendingRequest.getTaskExecutionLoad()));
return slot;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java
index 339db7f575f..565be31b8f6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
+import org.apache.flink.runtime.scheduler.taskexecload.HasTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
@@ -32,13 +32,13 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-public final class PendingRequest implements WeightLoadable {
+public final class PendingRequest implements HasTaskExecutionLoad {
private final SlotRequestId slotRequestId;
private final ResourceProfile resourceProfile;
- private final LoadingWeight loadingWeight;
+ private final TaskExecutionLoad taskExecutionLoad;
private final HashSet<AllocationID> preferredAllocations;
@@ -51,12 +51,12 @@ public final class PendingRequest implements WeightLoadable
{
private PendingRequest(
SlotRequestId slotRequestId,
ResourceProfile resourceProfile,
- LoadingWeight loadingWeight,
+ TaskExecutionLoad taskExecutionLoad,
Collection<AllocationID> preferredAllocations,
boolean isBatchRequest) {
this.slotRequestId = slotRequestId;
this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
- this.loadingWeight = Preconditions.checkNotNull(loadingWeight);
+ this.taskExecutionLoad = Preconditions.checkNotNull(taskExecutionLoad);
this.preferredAllocations = new HashSet<>(preferredAllocations);
this.isBatchRequest = isBatchRequest;
this.slotFuture = new CompletableFuture<>();
@@ -66,19 +66,19 @@ public final class PendingRequest implements WeightLoadable
{
static PendingRequest createBatchRequest(
SlotRequestId slotRequestId,
ResourceProfile resourceProfile,
- LoadingWeight loadingWeight,
+ TaskExecutionLoad taskExecutionLoad,
Collection<AllocationID> preferredAllocations) {
return new PendingRequest(
- slotRequestId, resourceProfile, loadingWeight,
preferredAllocations, true);
+ slotRequestId, resourceProfile, taskExecutionLoad,
preferredAllocations, true);
}
public static PendingRequest createNormalRequest(
SlotRequestId slotRequestId,
ResourceProfile resourceProfile,
- LoadingWeight loadingWeight,
+ TaskExecutionLoad taskExecutionLoad,
Collection<AllocationID> preferredAllocations) {
return new PendingRequest(
- slotRequestId, resourceProfile, loadingWeight,
preferredAllocations, false);
+ slotRequestId, resourceProfile, taskExecutionLoad,
preferredAllocations, false);
}
SlotRequestId getSlotRequestId() {
@@ -134,8 +134,8 @@ public final class PendingRequest implements WeightLoadable
{
+ slotRequestId
+ ", resourceProfile="
+ resourceProfile
- + ", loadingWeight="
- + loadingWeight
+ + ", taskExecutionLoad="
+ + taskExecutionLoad
+ ", preferredAllocations="
+ preferredAllocations
+ ", isBatchRequest="
@@ -146,7 +146,7 @@ public final class PendingRequest implements WeightLoadable
{
}
@Override
- public @Nonnull LoadingWeight getLoading() {
- return loadingWeight;
+ public @Nonnull TaskExecutionLoad getTaskExecutionLoad() {
+ return taskExecutionLoad;
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
index 08a9bfe5dbf..1a48cefadee 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
@@ -21,31 +21,31 @@ package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
+import org.apache.flink.runtime.scheduler.taskexecload.HasTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
import javax.annotation.Nonnull;
/** Represents a request for a physical slot. */
-public class PhysicalSlotRequest implements WeightLoadable {
+public class PhysicalSlotRequest implements HasTaskExecutionLoad {
private final SlotRequestId slotRequestId;
private final SlotProfile slotProfile;
- private final LoadingWeight loadingWeight;
+ private final TaskExecutionLoad taskExecutionLoad;
private final boolean slotWillBeOccupiedIndefinitely;
public PhysicalSlotRequest(
final SlotRequestId slotRequestId,
final SlotProfile slotProfile,
- final LoadingWeight loadingWeight,
+ final TaskExecutionLoad taskExecutionLoad,
final boolean slotWillBeOccupiedIndefinitely) {
this.slotRequestId = slotRequestId;
this.slotProfile = slotProfile;
- this.loadingWeight = loadingWeight;
+ this.taskExecutionLoad = taskExecutionLoad;
this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
}
@@ -70,18 +70,18 @@ public class PhysicalSlotRequest implements WeightLoadable {
? PendingRequest.createNormalRequest(
slotRequestId,
slotProfile.getPhysicalSlotResourceProfile(),
- loadingWeight,
+ taskExecutionLoad,
slotProfile.getPreferredAllocations())
: PendingRequest.createBatchRequest(
slotRequestId,
slotProfile.getPhysicalSlotResourceProfile(),
- loadingWeight,
+ taskExecutionLoad,
slotProfile.getPreferredAllocations());
}
@Override
- public @Nonnull LoadingWeight getLoading() {
- return loadingWeight;
+ public @Nonnull TaskExecutionLoad getTaskExecutionLoad() {
+ return taskExecutionLoad;
}
/** Result of a {@link PhysicalSlotRequest}. */
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java
index d6654b7a254..7756e932002 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
@@ -58,7 +58,7 @@ public class PreferredAllocationRequestSlotMatchingStrategy
implements RequestSl
public Collection<RequestSlotMatch> matchRequestsAndSlots(
Collection<? extends PhysicalSlot> slots,
Collection<PendingRequest> pendingRequests,
- Map<ResourceID, LoadingWeight> taskExecutorsLoadingWeight) {
+ Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap) {
final Collection<RequestSlotMatch> requestSlotMatches = new
ArrayList<>();
final Map<AllocationID, PhysicalSlot> freeSlots =
@@ -97,11 +97,11 @@ public class PreferredAllocationRequestSlotMatchingStrategy
implements RequestSl
.getPreferredAllocations()
.contains(freeSlot.getAllocationId())) {
requestSlotMatches.add(RequestSlotMatch.createFor(pendingRequest, freeSlot));
- LoadingWeight deltaLoading = pendingRequest.getLoading();
- taskExecutorsLoadingWeight.compute(
+ TaskExecutionLoad deltaLoad =
pendingRequest.getTaskExecutionLoad();
+ taskExecutionLoadMap.compute(
freeSlot.getTaskManagerLocation().getResourceID(),
(ignoredId, oldLoad) ->
- oldLoad == null ? deltaLoading :
deltaLoading.merge(oldLoad));
+ oldLoad == null ? deltaLoad :
deltaLoad.merge(oldLoad));
pendingRequestIterator.remove();
freeSlotsIterator.remove();
break;
@@ -113,7 +113,7 @@ public class PreferredAllocationRequestSlotMatchingStrategy
implements RequestSl
if (!freeSlots.isEmpty() && !unmatchedRequests.isEmpty()) {
requestSlotMatches.addAll(
rollbackStrategy.matchRequestsAndSlots(
- freeSlots.values(), unmatchedRequests,
taskExecutorsLoadingWeight));
+ freeSlots.values(), unmatchedRequests,
taskExecutionLoadMap));
}
return requestSlotMatches;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java
index d4774d72f5d..fe0dec37bfe 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
import java.util.Collection;
import java.util.Map;
@@ -32,13 +32,13 @@ public interface RequestSlotMatchingStrategy {
*
* @param slots slots to match
* @param pendingRequests slot requests to match
- * @param taskExecutorsLoadingWeight current task executors loading weight
information
+ * @param taskExecutionLoadMap task execution load information by resource
* @return resulting matches of this operation
*/
Collection<RequestSlotMatch> matchRequestsAndSlots(
Collection<? extends PhysicalSlot> slots,
Collection<PendingRequest> pendingRequests,
- Map<ResourceID, LoadingWeight> taskExecutorsLoadingWeight);
+ Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap);
/** Result class representing matches. */
final class RequestSlotMatch {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java
index cf354afbcad..660d6d95d87 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
import java.util.ArrayList;
import java.util.Collection;
@@ -38,7 +38,7 @@ public enum SimpleRequestSlotMatchingStrategy implements
RequestSlotMatchingStra
public Collection<RequestSlotMatch> matchRequestsAndSlots(
Collection<? extends PhysicalSlot> slots,
Collection<PendingRequest> pendingRequests,
- Map<ResourceID, LoadingWeight> taskExecutorsLoadingWeight) {
+ Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap) {
final Collection<RequestSlotMatch> resultingMatches = new
ArrayList<>();
// if pendingRequests has a special order, then let's preserve it
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategy.java
index c70f15ad17e..7119e43fab7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategy.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategy.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueue;
@@ -42,7 +42,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
-import static
org.apache.flink.runtime.scheduler.loading.WeightLoadable.sortByLoadingDescend;
+import static
org.apache.flink.runtime.scheduler.taskexecload.HasTaskExecutionLoad.sortByTaskExecutionLoadDesc;
/**
* The tasks balanced based implementation of {@link
RequestSlotMatchingStrategy} that matches the
@@ -57,20 +57,20 @@ public enum TasksBalancedRequestSlotMatchingStrategy
implements RequestSlotMatch
/** The {@link PhysicalSlotElement} comparator to compare loading. */
static final class PhysicalSlotElementComparator implements
Comparator<PhysicalSlotElement> {
- private final Map<ResourceID, LoadingWeight> taskExecutorsLoading;
+ private final Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap;
- PhysicalSlotElementComparator(Map<ResourceID, LoadingWeight>
taskExecutorsLoading) {
- this.taskExecutorsLoading =
Preconditions.checkNotNull(taskExecutorsLoading);
+ PhysicalSlotElementComparator(Map<ResourceID, TaskExecutionLoad>
taskExecutionLoadMap) {
+ this.taskExecutionLoadMap =
Preconditions.checkNotNull(taskExecutionLoadMap);
}
@Override
public int compare(PhysicalSlotElement left, PhysicalSlotElement
right) {
- final LoadingWeight leftLoad =
- taskExecutorsLoading.getOrDefault(
- left.getResourceID(), DefaultLoadingWeight.EMPTY);
- final LoadingWeight rightLoad =
- taskExecutorsLoading.getOrDefault(
- right.getResourceID(), DefaultLoadingWeight.EMPTY);
+ final TaskExecutionLoad leftLoad =
+ taskExecutionLoadMap.getOrDefault(
+ left.getResourceID(),
DefaultTaskExecutionLoad.EMPTY);
+ final TaskExecutionLoad rightLoad =
+ taskExecutionLoadMap.getOrDefault(
+ right.getResourceID(),
DefaultTaskExecutionLoad.EMPTY);
return leftLoad.compareTo(rightLoad);
}
}
@@ -115,9 +115,10 @@ public enum TasksBalancedRequestSlotMatchingStrategy
implements RequestSlotMatch
private final PhysicalSlotElementComparator
physicalSlotElementComparator;
- PhysicalSlotElementPriorityComparator(Map<ResourceID, LoadingWeight>
taskExecutorsLoading) {
+ PhysicalSlotElementPriorityComparator(
+ Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap) {
this.physicalSlotElementComparator =
- new PhysicalSlotElementComparator(taskExecutorsLoading);
+ new PhysicalSlotElementComparator(taskExecutionLoadMap);
}
@Override
@@ -130,7 +131,7 @@ public enum TasksBalancedRequestSlotMatchingStrategy
implements RequestSlotMatch
public Collection<RequestSlotMatch> matchRequestsAndSlots(
Collection<? extends PhysicalSlot> slots,
Collection<PendingRequest> pendingRequests,
- Map<ResourceID, LoadingWeight> taskExecutorsLoad) {
+ Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap) {
ResourceRequestPreMappings resourceRequestPreMappings =
ResourceRequestPreMappings.createFrom(pendingRequests, slots);
if (!resourceRequestPreMappings.isMatchingFulfilled()) {
@@ -138,24 +139,27 @@ public enum TasksBalancedRequestSlotMatchingStrategy
implements RequestSlotMatch
}
final Collection<RequestSlotMatch> resultingMatches = new
ArrayList<>();
- final List<PendingRequest> sortedRequests =
sortByLoadingDescend(pendingRequests);
+ final List<PendingRequest> sortedRequests =
sortByTaskExecutionLoadDesc(pendingRequests);
- logDebugInfo(slots, taskExecutorsLoad, sortedRequests);
+ logDebugInfo(slots, taskExecutionLoadMap, sortedRequests);
Collection<PhysicalSlotElement> slotElements =
slots.stream().map(PhysicalSlotElement::new).collect(Collectors.toList());
final Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>>
profileSlots =
- getSlotCandidatesByProfile(slotElements, taskExecutorsLoad);
+ getSlotCandidatesByProfile(slotElements, taskExecutionLoadMap);
final Map<ResourceID, Set<PhysicalSlotElement>> taskExecutorSlots =
groupSlotsByTaskExecutor(slotElements);
for (PendingRequest request : sortedRequests) {
ResourceProfile requestProfile = request.getResourceProfile();
Optional<PhysicalSlotElement> bestSlotEleOpt =
tryMatchPhysicalSlot(
- request, profileSlots, taskExecutorsLoad,
resourceRequestPreMappings);
+ request,
+ profileSlots,
+ taskExecutionLoadMap,
+ resourceRequestPreMappings);
if (bestSlotEleOpt.isPresent()) {
PhysicalSlotElement slotElement = bestSlotEleOpt.get();
- updateTaskExecutorsLoad(taskExecutorsLoad, request,
slotElement);
+ updateTaskExecutorsLoad(taskExecutionLoadMap, request,
slotElement);
updateReferenceRemainingSlots(profileSlots, taskExecutorSlots,
slotElement);
resourceRequestPreMappings.decrease(
requestProfile, slotElement.getResourceProfile());
@@ -166,26 +170,26 @@ public enum TasksBalancedRequestSlotMatchingStrategy
implements RequestSlotMatch
}
private static void updateTaskExecutorsLoad(
- Map<ResourceID, LoadingWeight> taskExecutorsLoad,
+ Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap,
PendingRequest request,
PhysicalSlotElement slotElement) {
- taskExecutorsLoad.compute(
+ taskExecutionLoadMap.compute(
slotElement.getResourceID(),
- (ignoredId, oldLoading) ->
- Objects.isNull(oldLoading)
- ? request.getLoading()
- : oldLoading.merge(request.getLoading()));
+ (ignoredId, oldLoad) ->
+ Objects.isNull(oldLoad)
+ ? request.getTaskExecutionLoad()
+ :
oldLoad.merge(request.getTaskExecutionLoad()));
}
private static void logDebugInfo(
Collection<? extends PhysicalSlot> slots,
- Map<ResourceID, LoadingWeight> taskExecutorsLoad,
+ Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap,
List<PendingRequest> sortedRequests) {
LOG.debug(
- "Available slots: {}, sortedRequests: {}, taskExecutorsLoad:
{}",
+ "Available slots: {}, sortedRequests: {},
taskExecutionLoadMap: {}",
slots,
sortedRequests,
- taskExecutorsLoad);
+ taskExecutionLoadMap);
}
private Map<ResourceID, Set<PhysicalSlotElement>> groupSlotsByTaskExecutor(
@@ -198,10 +202,10 @@ public enum TasksBalancedRequestSlotMatchingStrategy
implements RequestSlotMatch
private Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>>
getSlotCandidatesByProfile(
Collection<PhysicalSlotElement> slotElements,
- Map<ResourceID, LoadingWeight> taskExecutorsLoad) {
+ Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap) {
final Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>>
result = new HashMap<>();
final PhysicalSlotElementPriorityComparator
physicalSlotElementPriorityComparator =
- new PhysicalSlotElementPriorityComparator(taskExecutorsLoad);
+ new
PhysicalSlotElementPriorityComparator(taskExecutionLoadMap);
for (PhysicalSlotElement slotEle : slotElements) {
result.compute(
slotEle.getResourceProfile(),
@@ -221,7 +225,7 @@ public enum TasksBalancedRequestSlotMatchingStrategy
implements RequestSlotMatch
private Optional<PhysicalSlotElement> tryMatchPhysicalSlot(
PendingRequest request,
Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>>
profileToSlotMap,
- Map<ResourceID, LoadingWeight> taskExecutorsLoad,
+ Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap,
ResourceRequestPreMappings resourceRequestPreMappings) {
final ResourceProfile requestProfile = request.getResourceProfile();
@@ -242,7 +246,7 @@ public enum TasksBalancedRequestSlotMatchingStrategy
implements RequestSlotMatch
return Objects.isNull(slots) ? null : slots.peek();
})
.filter(Objects::nonNull)
- .min(new PhysicalSlotElementComparator(taskExecutorsLoad));
+ .min(new PhysicalSlotElementComparator(taskExecutionLoadMap));
}
private void updateReferenceRemainingSlots(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
index 7f3dc35d5ed..33d9bbc21f0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
@@ -20,10 +20,10 @@ package org.apache.flink.runtime.scheduler;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.HasTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
@@ -33,7 +33,7 @@ import java.util.HashSet;
import java.util.Set;
/** Represents execution vertices that will run the same shared slot. */
-public class ExecutionSlotSharingGroup implements WeightLoadable {
+public class ExecutionSlotSharingGroup implements HasTaskExecutionLoad {
private final Set<ExecutionVertexID> executionVertexIds;
@@ -69,14 +69,14 @@ public class ExecutionSlotSharingGroup implements
WeightLoadable {
+ executionVertexIds
+ ", slotSharingGroup="
+ slotSharingGroup
- + ", loadingWeight="
- + getLoading()
+ + ", taskExecutionLoad="
+ + getTaskExecutionLoad()
+ '}';
}
@Nonnull
@Override
- public LoadingWeight getLoading() {
- return new DefaultLoadingWeight(executionVertexIds.size());
+ public TaskExecutionLoad getTaskExecutionLoad() {
+ return new DefaultTaskExecutionLoad(executionVertexIds.size());
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
index 0a5e79777b2..74d46e82b40 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
@@ -28,7 +28,7 @@ import
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.DualKeyLinkedMap;
import org.apache.flink.util.FlinkException;
@@ -109,7 +109,7 @@ public class SimpleExecutionSlotAllocator implements
ExecutionSlotAllocator {
new PhysicalSlotRequest(
slotRequestId,
slotProfile,
- DefaultLoadingWeight.EMPTY,
+ DefaultTaskExecutionLoad.EMPTY,
slotWillBeOccupiedIndefinitely);
physicalSlotRequests.add(request);
remainingExecutionsToSlotRequest.put(slotRequestId,
executionAttemptId);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
index 00a8ebd020c..7f926444390 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
@@ -272,7 +272,7 @@ class SlotSharingExecutionSlotAllocator implements
ExecutionSlotAllocator {
new PhysicalSlotRequest(
physicalSlotRequestId,
slotProfile,
- group.getLoading(),
+ group.getTaskExecutionLoad(),
slotWillBeOccupiedIndefinitely);
slotRequests.add(request);
requestToGroup.put(physicalSlotRequestId, group);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 39bfa7902a3..7d8149efdf9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -28,10 +28,10 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.HasTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.Preconditions;
@@ -344,7 +344,7 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
}
/** The execution slot sharing group for adaptive scheduler. */
- public static class ExecutionSlotSharingGroup implements WeightLoadable {
+ public static class ExecutionSlotSharingGroup implements
HasTaskExecutionLoad {
private final String id;
private final SlotSharingGroup slotSharingGroup;
private final Set<ExecutionVertexID> containedExecutionVertices;
@@ -382,8 +382,8 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
@Nonnull
@Override
- public LoadingWeight getLoading() {
- return new DefaultLoadingWeight(containedExecutionVertices.size());
+ public TaskExecutionLoad getTaskExecutionLoad() {
+ return new
DefaultTaskExecutionLoad(containedExecutionVertices.size());
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java
index d4ee374227e..35623e2c77f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
import org.apache.flink.util.CollectionUtil;
import java.util.ArrayList;
@@ -35,7 +35,7 @@ import java.util.TreeMap;
import static
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
import static
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
-import static
org.apache.flink.runtime.scheduler.loading.WeightLoadable.sortByLoadingDescend;
+import static
org.apache.flink.runtime.scheduler.taskexecload.HasTaskExecutionLoad.sortByTaskExecutionLoadDesc;
/** The tasks balanced request slot matching resolver implementation. */
public enum TasksBalancedSlotMatchingResolver implements SlotMatchingResolver {
@@ -49,17 +49,17 @@ public enum TasksBalancedSlotMatchingResolver implements
SlotMatchingResolver {
new ArrayList<>(requestGroups.size());
final Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor =
AllocatorUtil.getSlotsPerTaskExecutor(freeSlots);
- final TreeMap<LoadingWeight, Set<PhysicalSlot>> loadingSlotsMap =
- getLoadingSlotsMap(freeSlots);
+ final TreeMap<TaskExecutionLoad, Set<PhysicalSlot>> loadingSlotsMap =
+ getSlotsByTaskExecutionLoad(freeSlots);
- SlotTaskExecutorWeight<LoadingWeight> best;
- for (ExecutionSlotSharingGroup requestGroup :
sortByLoadingDescend(requestGroups)) {
- best = getTheBestSlotTaskExecutorLoading(loadingSlotsMap);
+ SlotTaskExecutorWeight<TaskExecutionLoad> best;
+ for (ExecutionSlotSharingGroup requestGroup :
sortByTaskExecutionLoadDesc(requestGroups)) {
+ best = getLeastLoadedTaskSlot(loadingSlotsMap);
slotAssignments.add(new SlotAssignment(best.physicalSlot,
requestGroup));
// Update the references
- final LoadingWeight newLoading =
- best.taskExecutorWeight.merge(requestGroup.getLoading());
+ final TaskExecutionLoad newLoading =
+
best.taskExecutorWeight.merge(requestGroup.getTaskExecutionLoad());
updateSlotsPerTaskExecutor(slotsPerTaskExecutor, best);
Set<PhysicalSlot> physicalSlots =
slotsPerTaskExecutor.get(best.getResourceID());
updateLoadingSlotsMap(loadingSlotsMap, best, physicalSlots,
newLoading);
@@ -68,10 +68,10 @@ public enum TasksBalancedSlotMatchingResolver implements
SlotMatchingResolver {
}
private static void updateLoadingSlotsMap(
- Map<LoadingWeight, Set<PhysicalSlot>> loadingSlotsMap,
- SlotTaskExecutorWeight<LoadingWeight> best,
+ Map<TaskExecutionLoad, Set<PhysicalSlot>> loadingSlotsMap,
+ SlotTaskExecutorWeight<TaskExecutionLoad> best,
Set<PhysicalSlot> slotsToAdjust,
- LoadingWeight newLoading) {
+ TaskExecutionLoad newLoading) {
Set<PhysicalSlot> physicalSlots =
loadingSlotsMap.get(best.taskExecutorWeight);
if (!CollectionUtil.isNullOrEmpty(physicalSlots)) {
physicalSlots.remove(best.physicalSlot);
@@ -96,7 +96,7 @@ public enum TasksBalancedSlotMatchingResolver implements
SlotMatchingResolver {
private static void updateSlotsPerTaskExecutor(
Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor,
- SlotTaskExecutorWeight<LoadingWeight> best) {
+ SlotTaskExecutorWeight<TaskExecutionLoad> best) {
Set<PhysicalSlot> slots =
slotsPerTaskExecutor.get(best.getResourceID());
if (Objects.nonNull(slots)) {
slots.remove(best.physicalSlot);
@@ -106,21 +106,22 @@ public enum TasksBalancedSlotMatchingResolver implements
SlotMatchingResolver {
}
}
- private static TreeMap<LoadingWeight, Set<PhysicalSlot>>
getLoadingSlotsMap(
+ private static TreeMap<TaskExecutionLoad, Set<PhysicalSlot>>
getSlotsByTaskExecutionLoad(
Collection<PhysicalSlot> slots) {
return new TreeMap<>() {
{
HashSet<PhysicalSlot> slotsValue =
CollectionUtil.newHashSetWithExpectedSize(slots.size());
slotsValue.addAll(slots);
- put(DefaultLoadingWeight.EMPTY, slotsValue);
+ put(DefaultTaskExecutionLoad.EMPTY, slotsValue);
}
};
}
- private static SlotTaskExecutorWeight<LoadingWeight>
getTheBestSlotTaskExecutorLoading(
- TreeMap<LoadingWeight, Set<PhysicalSlot>> slotsByLoading) {
- final Map.Entry<LoadingWeight, Set<PhysicalSlot>> firstEntry =
slotsByLoading.firstEntry();
+ private static SlotTaskExecutorWeight<TaskExecutionLoad>
getLeastLoadedTaskSlot(
+ TreeMap<TaskExecutionLoad, Set<PhysicalSlot>>
slotsByTaskExecutionLoad) {
+ final Map.Entry<TaskExecutionLoad, Set<PhysicalSlot>> firstEntry =
+ slotsByTaskExecutionLoad.firstEntry();
if (firstEntry == null
|| firstEntry.getKey() == null
|| CollectionUtil.isNullOrEmpty(firstEntry.getValue())) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java
deleted file mode 100644
index 80ec766cc6a..00000000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.scheduler.loading;
-
-import org.apache.flink.annotation.Internal;
-
-import java.io.Serializable;
-
-/** The class is used to represent the loading weight abstraction. */
-@Internal
-public interface LoadingWeight extends Comparable<LoadingWeight>, Serializable
{
-
- /**
- * Get the loading value.
- *
- * @return A float represented the loading.
- */
- float getLoading();
-
- /**
- * Merge the other loading weight and this one into a new object.
- *
- * @param other A loading weight object.
- * @return The new merged {@link LoadingWeight}.
- */
- LoadingWeight merge(LoadingWeight other);
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/DefaultTaskExecutionLoad.java
similarity index 52%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/DefaultTaskExecutionLoad.java
index c7c41505ae2..2b41275e0e5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/DefaultTaskExecutionLoad.java
@@ -16,44 +16,47 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.scheduler.loading;
+package org.apache.flink.runtime.scheduler.taskexecload;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nonnull;
import java.util.Objects;
-/** The default implementation of {@link LoadingWeight}. */
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The default implementation of {@link TaskExecutionLoad}. */
@Internal
-public class DefaultLoadingWeight implements LoadingWeight {
+public class DefaultTaskExecutionLoad implements TaskExecutionLoad {
- public static final LoadingWeight EMPTY = new DefaultLoadingWeight(0f);
+ public static final TaskExecutionLoad EMPTY = new
DefaultTaskExecutionLoad(0f);
- private final float loading;
+ private final float loadValue;
- public DefaultLoadingWeight(float loading) {
- Preconditions.checkArgument(loading >= 0.0f);
- this.loading = loading;
+ public DefaultTaskExecutionLoad(float loadValue) {
+ checkArgument(loadValue >= 0.0f);
+ this.loadValue = loadValue;
}
@Override
- public float getLoading() {
- return loading;
+ public float getLoadValue() {
+ return loadValue;
}
@Override
- public LoadingWeight merge(LoadingWeight other) {
- if (other == null) {
- return this;
- }
- return new DefaultLoadingWeight(loading + other.getLoading());
+ public int getLoadValueAsInt() {
+ return (int) loadValue;
+ }
+
+ @Override
+ public TaskExecutionLoad merge(TaskExecutionLoad other) {
+ return other == null
+ ? this
+ : new DefaultTaskExecutionLoad(loadValue +
other.getLoadValue());
}
@Override
- public int compareTo(@Nonnull LoadingWeight o) {
- return Float.compare(loading, o.getLoading());
+ public int compareTo(TaskExecutionLoad o) {
+ return Float.compare(loadValue, o.getLoadValue());
}
@Override
@@ -64,17 +67,17 @@ public class DefaultLoadingWeight implements LoadingWeight {
if (o == null || getClass() != o.getClass()) {
return false;
}
- DefaultLoadingWeight that = (DefaultLoadingWeight) o;
- return Float.compare(loading, that.loading) == 0f;
+ DefaultTaskExecutionLoad that = (DefaultTaskExecutionLoad) o;
+ return compareTo(that) == 0f;
}
@Override
public int hashCode() {
- return Objects.hash(loading);
+ return Objects.hash(loadValue);
}
@Override
public String toString() {
- return "DefaultLoadingWeight{loading=" + loading + '}';
+ return "DefaultTaskExecutionLoad{load=" + loadValue + '}';
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/WeightLoadable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/HasTaskExecutionLoad.java
similarity index 65%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/WeightLoadable.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/HasTaskExecutionLoad.java
index 72f8b994ab2..2f6d93590eb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/WeightLoadable.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/HasTaskExecutionLoad.java
@@ -16,36 +16,34 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.scheduler.loading;
+package org.apache.flink.runtime.scheduler.taskexecload;
import org.apache.flink.annotation.Internal;
-import javax.annotation.Nonnull;
-
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
- * The interface that holds the {@link LoadingWeight} getter is required for
corresponding
+ * The interface that holds the {@link TaskExecutionLoad} getter is required
for corresponding
* abstractions.
*/
@Internal
-public interface WeightLoadable {
+public interface HasTaskExecutionLoad {
/**
- * Get the loading weight.
+ * Retrieves the task execution load.
*
- * @return An implementation object of {@link LoadingWeight}.
+ * @return a {@link TaskExecutionLoad} instance
*/
- @Nonnull
- LoadingWeight getLoading();
+ TaskExecutionLoad getTaskExecutionLoad();
- static <T extends WeightLoadable> List<T>
sortByLoadingDescend(Collection<T> weightLoadables) {
- return weightLoadables.stream()
+ static <T extends HasTaskExecutionLoad> List<T>
sortByTaskExecutionLoadDesc(Collection<T> c) {
+ return c.stream()
.sorted(
(leftReq, rightReq) ->
-
rightReq.getLoading().compareTo(leftReq.getLoading()))
+ rightReq.getTaskExecutionLoad()
+
.compareTo(leftReq.getTaskExecutionLoad()))
.collect(Collectors.toList());
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/TaskExecutionLoad.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/TaskExecutionLoad.java
new file mode 100644
index 00000000000..842cb865c52
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/TaskExecutionLoad.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.taskexecload;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+
+/**
+ * Abstraction describing the amount of {@code task execution} assigned to a
scheduling target.
+ *
+ * <p>This is a scheduler-facing metric that supports comparing and
aggregating placement choices.
+ * It is <b>not</b> a direct measurement of runtime resource pressure (for
example CPU or memory
+ * utilization) on a {@code TaskExecutor}.
+ *
+ * <p>During scheduling, a load value may be associated with different
scheduler concepts on both
+ * sides of a placement decision:
+ *
+ * <ul>
+ * <li><b>Resource requests</b> (for example slot-sharing related groupings
in the {@code
+ * AdaptiveScheduler} or pending requests in the default scheduler)
+ * <li><b>Resource entities</b> (for example {@code TaskManager} instances)
+ * </ul>
+ */
+@Internal
+public interface TaskExecutionLoad extends Comparable<TaskExecutionLoad>,
Serializable {
+
+ /**
+ * Returns the task execution load value.
+ *
+ * @return the current task execution load value
+ */
+ float getLoadValue();
+
+ /**
+ * Returns the task execution load value as an integer, truncated if
necessary.
+ *
+ * @return the current resource unit count
+ */
+ int getLoadValueAsInt();
+
+ /**
+ * Merge another task execution load and this one into a new object.
+ *
+ * @param other another task execution load object
+ * @return the new merged {@link TaskExecutionLoad}.
+ */
+ TaskExecutionLoad merge(TaskExecutionLoad other);
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java
index 3e1e3dfa2cb..f3652a51a2c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java
@@ -23,7 +23,7 @@ import
org.apache.flink.runtime.clusterframework.types.SlotProfileTestingUtils;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
@@ -102,7 +102,7 @@ public class PhysicalSlotProviderExtension implements
BeforeEachCallback, AfterE
return new PhysicalSlotRequest(
new SlotRequestId(),
SlotProfileTestingUtils.noLocality(ResourceProfile.UNKNOWN),
- DefaultLoadingWeight.EMPTY,
+ DefaultTaskExecutionLoad.EMPTY,
false);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
index 00d42d98848..001a7fccc1e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfileTestingUtils;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.junit.jupiter.api.Test;
@@ -85,7 +85,7 @@ class PhysicalSlotProviderImplWithSpreadOutStrategyTest {
SlotProfileTestingUtils.preferredLocality(
ResourceProfile.ANY,
Collections.singleton(preferredTaskManagerLocation)),
- DefaultLoadingWeight.EMPTY,
+ DefaultTaskExecutionLoad.EMPTY,
false);
PhysicalSlotRequest.Result result1 =
physicalSlotProviderExtension.allocateSlot(request1).get();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestUtils.java
index 7462f07c679..239e7990371 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestUtils.java
@@ -22,7 +22,7 @@ import
org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
import java.util.Collection;
import java.util.Collections;
@@ -44,7 +44,7 @@ class PhysicalSlotRequestUtils {
Collections.emptyList(),
preferredAllocations,
Collections.emptySet()),
- DefaultLoadingWeight.EMPTY,
+ DefaultTaskExecutionLoad.EMPTY,
true);
}
@@ -71,7 +71,7 @@ class PhysicalSlotRequestUtils {
Collections.emptyList(),
Collections.emptyList(),
Collections.emptySet()),
- DefaultLoadingWeight.EMPTY,
+ DefaultTaskExecutionLoad.EMPTY,
false);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java
index 3a047d8b45b..5df1164d0b6 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java
@@ -22,7 +22,7 @@ import
org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLoggerExtension;
@@ -103,12 +103,12 @@ class PreferredAllocationRequestSlotMatchingStrategyTest {
PendingRequest.createNormalRequest(
new SlotRequestId(),
ResourceProfile.UNKNOWN,
- DefaultLoadingWeight.EMPTY,
+ DefaultTaskExecutionLoad.EMPTY,
Collections.singleton(allocationId2)),
PendingRequest.createNormalRequest(
new SlotRequestId(),
ResourceProfile.UNKNOWN,
- DefaultLoadingWeight.EMPTY,
+ DefaultTaskExecutionLoad.EMPTY,
Collections.singleton(allocationId1)));
final Collection<RequestSlotMatchingStrategy.RequestSlotMatch>
requestSlotMatches =
@@ -204,7 +204,7 @@ class PreferredAllocationRequestSlotMatchingStrategyTest {
return PendingRequest.createNormalRequest(
new SlotRequestId(),
requestProfile,
- new DefaultLoadingWeight(loading),
+ new DefaultTaskExecutionLoad(loading),
preferAllocationIds);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappingsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappingsTest.java
index c2bf3fd691b..a9d3dfe6ab1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappingsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappingsTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
import org.apache.flink.util.Preconditions;
import org.junit.jupiter.api.Test;
@@ -357,7 +357,7 @@ class ResourceRequestPreMappingsTest {
PendingRequest.createNormalRequest(
new SlotRequestId(),
Preconditions.checkNotNull(requiredProfile),
- DefaultLoadingWeight.EMPTY,
+ DefaultTaskExecutionLoad.EMPTY,
Collections.emptyList()));
}
return pendingRequests;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategyTest.java
index 32f834e6fcf..93c34ae2dd9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategyTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategyTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.shaded.guava33.com.google.common.collect.Iterators;
@@ -50,13 +50,13 @@ public class SimpleRequestSlotMatchingStrategyTest {
PendingRequest.createNormalRequest(
new SlotRequestId(),
ResourceProfile.UNKNOWN,
- DefaultLoadingWeight.EMPTY,
+ DefaultTaskExecutionLoad.EMPTY,
Collections.emptyList());
final PendingRequest pendingRequest2 =
PendingRequest.createNormalRequest(
new SlotRequestId(),
ResourceProfile.UNKNOWN,
- DefaultLoadingWeight.EMPTY,
+ DefaultTaskExecutionLoad.EMPTY,
Collections.emptyList());
final Collection<PendingRequest> pendingRequests =
Arrays.asList(pendingRequest1, pendingRequest2);
@@ -90,13 +90,13 @@ public class SimpleRequestSlotMatchingStrategyTest {
PendingRequest.createNormalRequest(
new SlotRequestId(),
large,
- DefaultLoadingWeight.EMPTY,
+ DefaultTaskExecutionLoad.EMPTY,
Collections.emptyList());
final PendingRequest pendingRequest2 =
PendingRequest.createNormalRequest(
new SlotRequestId(),
small,
- DefaultLoadingWeight.EMPTY,
+ DefaultTaskExecutionLoad.EMPTY,
Collections.emptyList());
final Collection<PendingRequest> pendingRequests =
Arrays.asList(pendingRequest1, pendingRequest2);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategyTest.java
index 706c3c8fefe..1b86f92e3fc 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategyTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategyTest.java
@@ -22,7 +22,7 @@ import
org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -67,8 +67,8 @@ class TasksBalancedRequestSlotMatchingStrategyTest {
pendingRequests,
new HashMap<>() {
{
- put(tmLocation1.getResourceID(),
DefaultLoadingWeight.EMPTY);
- put(tmLocation2.getResourceID(), new
DefaultLoadingWeight(9));
+ put(tmLocation1.getResourceID(),
DefaultTaskExecutionLoad.EMPTY);
+ put(tmLocation2.getResourceID(), new
DefaultTaskExecutionLoad(9));
}
});
assertThat(requestSlotMatches).hasSize(2);
@@ -108,7 +108,7 @@ class TasksBalancedRequestSlotMatchingStrategyTest {
return PendingRequest.createNormalRequest(
new SlotRequestId(),
requestProfile,
- new DefaultLoadingWeight(loading),
+ new DefaultTaskExecutionLoad(loading),
Collections.emptyList());
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
index 6a1028fb26a..bca552bc185 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
@@ -23,7 +23,7 @@ import
org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -89,7 +89,7 @@ final class TestingDeclarativeSlotPool implements
DeclarativeSlotPool {
private final Consumer<ResourceCounter> setResourceRequirementsConsumer;
- private final Supplier<Map<ResourceID, LoadingWeight>>
taskExecutorsLoadingWeightSupplier;
+ private final Supplier<Map<ResourceID, TaskExecutionLoad>>
taskExecutionLoadMapSupplier;
TestingDeclarativeSlotPool(
Consumer<ResourceCounter> increaseResourceRequirementsByConsumer,
@@ -120,7 +120,7 @@ final class TestingDeclarativeSlotPool implements
DeclarativeSlotPool {
Function<AllocationID, Boolean> containsFreeSlotFunction,
LongConsumer releaseIdleSlotsConsumer,
Consumer<ResourceCounter> setResourceRequirementsConsumer,
- Supplier<Map<ResourceID, LoadingWeight>>
taskExecutorsLoadingWeightSupplier) {
+ Supplier<Map<ResourceID, TaskExecutionLoad>>
taskExecutionLoadMapSupplier) {
this.increaseResourceRequirementsByConsumer =
increaseResourceRequirementsByConsumer;
this.decreaseResourceRequirementsByConsumer =
decreaseResourceRequirementsByConsumer;
this.getResourceRequirementsSupplier = getResourceRequirementsSupplier;
@@ -137,7 +137,7 @@ final class TestingDeclarativeSlotPool implements
DeclarativeSlotPool {
this.containsFreeSlotFunction = containsFreeSlotFunction;
this.releaseIdleSlotsConsumer = releaseIdleSlotsConsumer;
this.setResourceRequirementsConsumer = setResourceRequirementsConsumer;
- this.taskExecutorsLoadingWeightSupplier =
taskExecutorsLoadingWeightSupplier;
+ this.taskExecutionLoadMapSupplier = taskExecutionLoadMapSupplier;
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
index 6b5960ee067..1435ace092c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
@@ -23,7 +23,7 @@ import
org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -83,7 +83,7 @@ public class TestingDeclarativeSlotPoolBuilder {
Collection<SlotOffer>>
registerSlotsFunction =
(slotOffers, ignoredB, ignoredC, ignoredD) -> new
ArrayList<>(slotOffers);
- private Supplier<Map<ResourceID, LoadingWeight>>
taskExecutorsLoadingWeightSupplier =
+ private Supplier<Map<ResourceID, TaskExecutionLoad>>
taskExecutionLoadMapSupplier =
HashMap::new;
public TestingDeclarativeSlotPoolBuilder
setIncreaseResourceRequirementsByConsumer(
@@ -213,6 +213,6 @@ public class TestingDeclarativeSlotPoolBuilder {
containsFreeSlotFunction,
returnIdleSlotsConsumer,
setResourceRequirementsConsumer,
- taskExecutorsLoadingWeightSupplier);
+ taskExecutionLoadMapSupplier);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
index e2be038d345..5a3771dfe66 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
@@ -22,9 +22,9 @@ import
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -158,11 +158,11 @@ class TasksBalancedSlotMatchingResolverTest extends
AbstractSlotMatchingResolver
s.getTargetAs(
ExecutionSlotSharingGroup
.class)
-
.getLoading())
+
.getTaskExecutionLoad())
.reduce(
-
DefaultLoadingWeight.EMPTY,
-
LoadingWeight::merge)
- .getLoading())
+
DefaultTaskExecutionLoad.EMPTY,
+
TaskExecutionLoad::merge)
+ .getLoadValue())
.isGreaterThanOrEqualTo(9f);
});
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeightTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/taskexecload/DefaultTaskExecutionLoadTest.java
similarity index 60%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeightTest.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/taskexecload/DefaultTaskExecutionLoadTest.java
index 4d4203dc668..57cf1e82f76 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeightTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/taskexecload/DefaultTaskExecutionLoadTest.java
@@ -16,26 +16,36 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.scheduler.loading;
+package org.apache.flink.runtime.scheduler.taskexecload;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Test for {@link DefaultLoadingWeight}. */
-class DefaultLoadingWeightTest {
+/** Test for {@link DefaultTaskExecutionLoad}. */
+class DefaultTaskExecutionLoadTest {
@Test
void testInvalidLoading() {
- assertThatThrownBy(() -> new DefaultLoadingWeight(-1f))
+ assertThatThrownBy(() -> new DefaultTaskExecutionLoad(-1f))
.isInstanceOf(IllegalArgumentException.class);
}
@Test
void testMerge() {
- assertThat(new
DefaultLoadingWeight(0).merge(null).getLoading()).isZero();
- assertThat(new DefaultLoadingWeight(0).merge(new
DefaultLoadingWeight(1.2f)).getLoading())
+ assertThat(new
DefaultTaskExecutionLoad(0).merge(null).getLoadValue()).isZero();
+ assertThat(
+ new DefaultTaskExecutionLoad(0)
+ .merge(new DefaultTaskExecutionLoad(1.2f))
+ .getLoadValue())
.isEqualTo(1.2f);
}
+
+ @Test
+ void testGetLoadValueAsInt() {
+ assertThat(new
DefaultTaskExecutionLoad(2.9f).getLoadValueAsInt()).isEqualTo(2);
+ assertThat(new DefaultTaskExecutionLoad(3e10f).getLoadValueAsInt())
+ .isEqualTo(Integer.MAX_VALUE);
+ }
}