This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ab49fae9960 [FLINK-38799][runtime] Rename `LoadingWeight` to 
`TaskExecutionLoad` (#27333)
ab49fae9960 is described below

commit ab49fae99606b87d8facc7eb404bda1b27bf9c47
Author: Ferenc Csaky <[email protected]>
AuthorDate: Fri Jan 23 10:41:53 2026 +0100

    [FLINK-38799][runtime] Rename `LoadingWeight` to `TaskExecutionLoad` 
(#27333)
---
 .../slotpool/DeclarativeSlotPoolBridge.java        | 21 +++----
 .../runtime/jobmaster/slotpool/PendingRequest.java | 28 ++++-----
 .../jobmaster/slotpool/PhysicalSlotRequest.java    | 20 +++----
 ...erredAllocationRequestSlotMatchingStrategy.java | 12 ++--
 .../slotpool/RequestSlotMatchingStrategy.java      |  6 +-
 .../SimpleRequestSlotMatchingStrategy.java         |  4 +-
 .../TasksBalancedRequestSlotMatchingStrategy.java  | 70 ++++++++++++----------
 .../scheduler/ExecutionSlotSharingGroup.java       | 16 ++---
 .../scheduler/SimpleExecutionSlotAllocator.java    |  4 +-
 .../SlotSharingExecutionSlotAllocator.java         |  2 +-
 .../allocator/SlotSharingSlotAllocator.java        | 12 ++--
 .../TasksBalancedSlotMatchingResolver.java         | 39 ++++++------
 .../runtime/scheduler/loading/LoadingWeight.java   | 43 -------------
 .../DefaultTaskExecutionLoad.java}                 | 51 ++++++++--------
 .../HasTaskExecutionLoad.java}                     | 22 ++++---
 .../scheduler/taskexecload/TaskExecutionLoad.java  | 65 ++++++++++++++++++++
 .../slotpool/PhysicalSlotProviderExtension.java    |  4 +-
 ...lSlotProviderImplWithSpreadOutStrategyTest.java |  4 +-
 .../slotpool/PhysicalSlotRequestUtils.java         |  6 +-
 ...dAllocationRequestSlotMatchingStrategyTest.java |  8 +--
 .../slotpool/ResourceRequestPreMappingsTest.java   |  4 +-
 .../SimpleRequestSlotMatchingStrategyTest.java     | 10 ++--
 ...sksBalancedRequestSlotMatchingStrategyTest.java |  8 +--
 .../slotpool/TestingDeclarativeSlotPool.java       |  8 +--
 .../TestingDeclarativeSlotPoolBuilder.java         |  6 +-
 .../AbstractSlotMatchingResolverTest.java          | 12 ++--
 .../DefaultTaskExecutionLoadTest.java}             | 22 +++++--
 27 files changed, 273 insertions(+), 234 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index a55e8c80b2f..2fdd45b5018 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -66,13 +66,13 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
     private static final class FulfilledAllocation {
         final AllocationID allocationID;
         final ResourceID taskExecutorID;
-        final LoadingWeight loadingWeight;
+        final TaskExecutionLoad taskExecutionLoad;
 
-        FulfilledAllocation(PhysicalSlot slot, LoadingWeight loadingWeight) {
+        FulfilledAllocation(PhysicalSlot slot, TaskExecutionLoad 
taskExecutionLoad) {
             this.allocationID = 
Preconditions.checkNotNull(slot.getAllocationId());
             this.taskExecutorID =
                     
Preconditions.checkNotNull(slot.getTaskManagerLocation().getResourceID());
-            this.loadingWeight = Preconditions.checkNotNull(loadingWeight);
+            this.taskExecutionLoad = 
Preconditions.checkNotNull(taskExecutionLoad);
         }
 
         @Override
@@ -83,7 +83,7 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
             FulfilledAllocation that = (FulfilledAllocation) o;
             return Objects.equals(allocationID, that.allocationID)
                     && Objects.equals(taskExecutorID, that.taskExecutorID)
-                    && Objects.equals(loadingWeight, that.loadingWeight);
+                    && Objects.equals(taskExecutionLoad, 
that.taskExecutionLoad);
         }
     }
 
@@ -261,16 +261,16 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
         }
     }
 
-    private Map<ResourceID, LoadingWeight> getTaskExecutorsLoadingView() {
-        final Map<ResourceID, LoadingWeight> result = new HashMap<>();
+    private Map<ResourceID, TaskExecutionLoad> getTaskExecutorsLoadingView() {
+        final Map<ResourceID, TaskExecutionLoad> result = new HashMap<>();
         Collection<FulfilledAllocation> fulfilledAllocations = 
fulfilledRequests.values();
         for (FulfilledAllocation allocation : fulfilledAllocations) {
             result.compute(
                     allocation.taskExecutorID,
                     (ignoredID, oldLoading) ->
                             Objects.isNull(oldLoading)
-                                    ? allocation.loadingWeight
-                                    : 
oldLoading.merge(allocation.loadingWeight));
+                                    ? allocation.taskExecutionLoad
+                                    : 
oldLoading.merge(allocation.taskExecutionLoad));
         }
         return result;
     }
@@ -353,7 +353,8 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
                 getDeclarativeSlotPool()
                         .reserveFreeSlot(allocationId, 
pendingRequest.getResourceProfile());
         fulfilledRequests.put(
-                slotRequestId, new FulfilledAllocation(slot, 
pendingRequest.getLoading()));
+                slotRequestId,
+                new FulfilledAllocation(slot, 
pendingRequest.getTaskExecutionLoad()));
         return slot;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java
index 339db7f575f..565be31b8f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
+import org.apache.flink.runtime.scheduler.taskexecload.HasTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
@@ -32,13 +32,13 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
-public final class PendingRequest implements WeightLoadable {
+public final class PendingRequest implements HasTaskExecutionLoad {
 
     private final SlotRequestId slotRequestId;
 
     private final ResourceProfile resourceProfile;
 
-    private final LoadingWeight loadingWeight;
+    private final TaskExecutionLoad taskExecutionLoad;
 
     private final HashSet<AllocationID> preferredAllocations;
 
@@ -51,12 +51,12 @@ public final class PendingRequest implements WeightLoadable 
{
     private PendingRequest(
             SlotRequestId slotRequestId,
             ResourceProfile resourceProfile,
-            LoadingWeight loadingWeight,
+            TaskExecutionLoad taskExecutionLoad,
             Collection<AllocationID> preferredAllocations,
             boolean isBatchRequest) {
         this.slotRequestId = slotRequestId;
         this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
-        this.loadingWeight = Preconditions.checkNotNull(loadingWeight);
+        this.taskExecutionLoad = Preconditions.checkNotNull(taskExecutionLoad);
         this.preferredAllocations = new HashSet<>(preferredAllocations);
         this.isBatchRequest = isBatchRequest;
         this.slotFuture = new CompletableFuture<>();
@@ -66,19 +66,19 @@ public final class PendingRequest implements WeightLoadable 
{
     static PendingRequest createBatchRequest(
             SlotRequestId slotRequestId,
             ResourceProfile resourceProfile,
-            LoadingWeight loadingWeight,
+            TaskExecutionLoad taskExecutionLoad,
             Collection<AllocationID> preferredAllocations) {
         return new PendingRequest(
-                slotRequestId, resourceProfile, loadingWeight, 
preferredAllocations, true);
+                slotRequestId, resourceProfile, taskExecutionLoad, 
preferredAllocations, true);
     }
 
     public static PendingRequest createNormalRequest(
             SlotRequestId slotRequestId,
             ResourceProfile resourceProfile,
-            LoadingWeight loadingWeight,
+            TaskExecutionLoad taskExecutionLoad,
             Collection<AllocationID> preferredAllocations) {
         return new PendingRequest(
-                slotRequestId, resourceProfile, loadingWeight, 
preferredAllocations, false);
+                slotRequestId, resourceProfile, taskExecutionLoad, 
preferredAllocations, false);
     }
 
     SlotRequestId getSlotRequestId() {
@@ -134,8 +134,8 @@ public final class PendingRequest implements WeightLoadable 
{
                 + slotRequestId
                 + ", resourceProfile="
                 + resourceProfile
-                + ", loadingWeight="
-                + loadingWeight
+                + ", taskExecutionLoad="
+                + taskExecutionLoad
                 + ", preferredAllocations="
                 + preferredAllocations
                 + ", isBatchRequest="
@@ -146,7 +146,7 @@ public final class PendingRequest implements WeightLoadable 
{
     }
 
     @Override
-    public @Nonnull LoadingWeight getLoading() {
-        return loadingWeight;
+    public @Nonnull TaskExecutionLoad getTaskExecutionLoad() {
+        return taskExecutionLoad;
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
index 08a9bfe5dbf..1a48cefadee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
@@ -21,31 +21,31 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
+import org.apache.flink.runtime.scheduler.taskexecload.HasTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
 
 import javax.annotation.Nonnull;
 
 /** Represents a request for a physical slot. */
-public class PhysicalSlotRequest implements WeightLoadable {
+public class PhysicalSlotRequest implements HasTaskExecutionLoad {
 
     private final SlotRequestId slotRequestId;
 
     private final SlotProfile slotProfile;
 
-    private final LoadingWeight loadingWeight;
+    private final TaskExecutionLoad taskExecutionLoad;
 
     private final boolean slotWillBeOccupiedIndefinitely;
 
     public PhysicalSlotRequest(
             final SlotRequestId slotRequestId,
             final SlotProfile slotProfile,
-            final LoadingWeight loadingWeight,
+            final TaskExecutionLoad taskExecutionLoad,
             final boolean slotWillBeOccupiedIndefinitely) {
 
         this.slotRequestId = slotRequestId;
         this.slotProfile = slotProfile;
-        this.loadingWeight = loadingWeight;
+        this.taskExecutionLoad = taskExecutionLoad;
         this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
     }
 
@@ -70,18 +70,18 @@ public class PhysicalSlotRequest implements WeightLoadable {
                 ? PendingRequest.createNormalRequest(
                         slotRequestId,
                         slotProfile.getPhysicalSlotResourceProfile(),
-                        loadingWeight,
+                        taskExecutionLoad,
                         slotProfile.getPreferredAllocations())
                 : PendingRequest.createBatchRequest(
                         slotRequestId,
                         slotProfile.getPhysicalSlotResourceProfile(),
-                        loadingWeight,
+                        taskExecutionLoad,
                         slotProfile.getPreferredAllocations());
     }
 
     @Override
-    public @Nonnull LoadingWeight getLoading() {
-        return loadingWeight;
+    public @Nonnull TaskExecutionLoad getTaskExecutionLoad() {
+        return taskExecutionLoad;
     }
 
     /** Result of a {@link PhysicalSlotRequest}. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java
index d6654b7a254..7756e932002 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
@@ -58,7 +58,7 @@ public class PreferredAllocationRequestSlotMatchingStrategy 
implements RequestSl
     public Collection<RequestSlotMatch> matchRequestsAndSlots(
             Collection<? extends PhysicalSlot> slots,
             Collection<PendingRequest> pendingRequests,
-            Map<ResourceID, LoadingWeight> taskExecutorsLoadingWeight) {
+            Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap) {
         final Collection<RequestSlotMatch> requestSlotMatches = new 
ArrayList<>();
 
         final Map<AllocationID, PhysicalSlot> freeSlots =
@@ -97,11 +97,11 @@ public class PreferredAllocationRequestSlotMatchingStrategy 
implements RequestSl
                                 .getPreferredAllocations()
                                 .contains(freeSlot.getAllocationId())) {
                     
requestSlotMatches.add(RequestSlotMatch.createFor(pendingRequest, freeSlot));
-                    LoadingWeight deltaLoading = pendingRequest.getLoading();
-                    taskExecutorsLoadingWeight.compute(
+                    TaskExecutionLoad deltaLoad = 
pendingRequest.getTaskExecutionLoad();
+                    taskExecutionLoadMap.compute(
                             freeSlot.getTaskManagerLocation().getResourceID(),
                             (ignoredId, oldLoad) ->
-                                    oldLoad == null ? deltaLoading : 
deltaLoading.merge(oldLoad));
+                                    oldLoad == null ? deltaLoad : 
deltaLoad.merge(oldLoad));
                     pendingRequestIterator.remove();
                     freeSlotsIterator.remove();
                     break;
@@ -113,7 +113,7 @@ public class PreferredAllocationRequestSlotMatchingStrategy 
implements RequestSl
         if (!freeSlots.isEmpty() && !unmatchedRequests.isEmpty()) {
             requestSlotMatches.addAll(
                     rollbackStrategy.matchRequestsAndSlots(
-                            freeSlots.values(), unmatchedRequests, 
taskExecutorsLoadingWeight));
+                            freeSlots.values(), unmatchedRequests, 
taskExecutionLoadMap));
         }
 
         return requestSlotMatches;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java
index d4774d72f5d..fe0dec37bfe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
 
 import java.util.Collection;
 import java.util.Map;
@@ -32,13 +32,13 @@ public interface RequestSlotMatchingStrategy {
      *
      * @param slots slots to match
      * @param pendingRequests slot requests to match
-     * @param taskExecutorsLoadingWeight current task executors loading weight 
information
+     * @param taskExecutionLoadMap task execution load information by resource
      * @return resulting matches of this operation
      */
     Collection<RequestSlotMatch> matchRequestsAndSlots(
             Collection<? extends PhysicalSlot> slots,
             Collection<PendingRequest> pendingRequests,
-            Map<ResourceID, LoadingWeight> taskExecutorsLoadingWeight);
+            Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap);
 
     /** Result class representing matches. */
     final class RequestSlotMatch {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java
index cf354afbcad..660d6d95d87 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -38,7 +38,7 @@ public enum SimpleRequestSlotMatchingStrategy implements 
RequestSlotMatchingStra
     public Collection<RequestSlotMatch> matchRequestsAndSlots(
             Collection<? extends PhysicalSlot> slots,
             Collection<PendingRequest> pendingRequests,
-            Map<ResourceID, LoadingWeight> taskExecutorsLoadingWeight) {
+            Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap) {
         final Collection<RequestSlotMatch> resultingMatches = new 
ArrayList<>();
 
         // if pendingRequests has a special order, then let's preserve it
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategy.java
index c70f15ad17e..7119e43fab7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategy.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import 
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
 import org.apache.flink.runtime.state.PriorityComparator;
 import org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueue;
@@ -42,7 +42,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.runtime.scheduler.loading.WeightLoadable.sortByLoadingDescend;
+import static 
org.apache.flink.runtime.scheduler.taskexecload.HasTaskExecutionLoad.sortByTaskExecutionLoadDesc;
 
 /**
  * The tasks balanced based implementation of {@link 
RequestSlotMatchingStrategy} that matches the
@@ -57,20 +57,20 @@ public enum TasksBalancedRequestSlotMatchingStrategy 
implements RequestSlotMatch
     /** The {@link PhysicalSlotElement} comparator to compare loading. */
     static final class PhysicalSlotElementComparator implements 
Comparator<PhysicalSlotElement> {
 
-        private final Map<ResourceID, LoadingWeight> taskExecutorsLoading;
+        private final Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap;
 
-        PhysicalSlotElementComparator(Map<ResourceID, LoadingWeight> 
taskExecutorsLoading) {
-            this.taskExecutorsLoading = 
Preconditions.checkNotNull(taskExecutorsLoading);
+        PhysicalSlotElementComparator(Map<ResourceID, TaskExecutionLoad> 
taskExecutionLoadMap) {
+            this.taskExecutionLoadMap = 
Preconditions.checkNotNull(taskExecutionLoadMap);
         }
 
         @Override
         public int compare(PhysicalSlotElement left, PhysicalSlotElement 
right) {
-            final LoadingWeight leftLoad =
-                    taskExecutorsLoading.getOrDefault(
-                            left.getResourceID(), DefaultLoadingWeight.EMPTY);
-            final LoadingWeight rightLoad =
-                    taskExecutorsLoading.getOrDefault(
-                            right.getResourceID(), DefaultLoadingWeight.EMPTY);
+            final TaskExecutionLoad leftLoad =
+                    taskExecutionLoadMap.getOrDefault(
+                            left.getResourceID(), 
DefaultTaskExecutionLoad.EMPTY);
+            final TaskExecutionLoad rightLoad =
+                    taskExecutionLoadMap.getOrDefault(
+                            right.getResourceID(), 
DefaultTaskExecutionLoad.EMPTY);
             return leftLoad.compareTo(rightLoad);
         }
     }
@@ -115,9 +115,10 @@ public enum TasksBalancedRequestSlotMatchingStrategy 
implements RequestSlotMatch
 
         private final PhysicalSlotElementComparator 
physicalSlotElementComparator;
 
-        PhysicalSlotElementPriorityComparator(Map<ResourceID, LoadingWeight> 
taskExecutorsLoading) {
+        PhysicalSlotElementPriorityComparator(
+                Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap) {
             this.physicalSlotElementComparator =
-                    new PhysicalSlotElementComparator(taskExecutorsLoading);
+                    new PhysicalSlotElementComparator(taskExecutionLoadMap);
         }
 
         @Override
@@ -130,7 +131,7 @@ public enum TasksBalancedRequestSlotMatchingStrategy 
implements RequestSlotMatch
     public Collection<RequestSlotMatch> matchRequestsAndSlots(
             Collection<? extends PhysicalSlot> slots,
             Collection<PendingRequest> pendingRequests,
-            Map<ResourceID, LoadingWeight> taskExecutorsLoad) {
+            Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap) {
         ResourceRequestPreMappings resourceRequestPreMappings =
                 ResourceRequestPreMappings.createFrom(pendingRequests, slots);
         if (!resourceRequestPreMappings.isMatchingFulfilled()) {
@@ -138,24 +139,27 @@ public enum TasksBalancedRequestSlotMatchingStrategy 
implements RequestSlotMatch
         }
 
         final Collection<RequestSlotMatch> resultingMatches = new 
ArrayList<>();
-        final List<PendingRequest> sortedRequests = 
sortByLoadingDescend(pendingRequests);
+        final List<PendingRequest> sortedRequests = 
sortByTaskExecutionLoadDesc(pendingRequests);
 
-        logDebugInfo(slots, taskExecutorsLoad, sortedRequests);
+        logDebugInfo(slots, taskExecutionLoadMap, sortedRequests);
 
         Collection<PhysicalSlotElement> slotElements =
                 
slots.stream().map(PhysicalSlotElement::new).collect(Collectors.toList());
         final Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> 
profileSlots =
-                getSlotCandidatesByProfile(slotElements, taskExecutorsLoad);
+                getSlotCandidatesByProfile(slotElements, taskExecutionLoadMap);
         final Map<ResourceID, Set<PhysicalSlotElement>> taskExecutorSlots =
                 groupSlotsByTaskExecutor(slotElements);
         for (PendingRequest request : sortedRequests) {
             ResourceProfile requestProfile = request.getResourceProfile();
             Optional<PhysicalSlotElement> bestSlotEleOpt =
                     tryMatchPhysicalSlot(
-                            request, profileSlots, taskExecutorsLoad, 
resourceRequestPreMappings);
+                            request,
+                            profileSlots,
+                            taskExecutionLoadMap,
+                            resourceRequestPreMappings);
             if (bestSlotEleOpt.isPresent()) {
                 PhysicalSlotElement slotElement = bestSlotEleOpt.get();
-                updateTaskExecutorsLoad(taskExecutorsLoad, request, 
slotElement);
+                updateTaskExecutorsLoad(taskExecutionLoadMap, request, 
slotElement);
                 updateReferenceRemainingSlots(profileSlots, taskExecutorSlots, 
slotElement);
                 resourceRequestPreMappings.decrease(
                         requestProfile, slotElement.getResourceProfile());
@@ -166,26 +170,26 @@ public enum TasksBalancedRequestSlotMatchingStrategy 
implements RequestSlotMatch
     }
 
     private static void updateTaskExecutorsLoad(
-            Map<ResourceID, LoadingWeight> taskExecutorsLoad,
+            Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap,
             PendingRequest request,
             PhysicalSlotElement slotElement) {
-        taskExecutorsLoad.compute(
+        taskExecutionLoadMap.compute(
                 slotElement.getResourceID(),
-                (ignoredId, oldLoading) ->
-                        Objects.isNull(oldLoading)
-                                ? request.getLoading()
-                                : oldLoading.merge(request.getLoading()));
+                (ignoredId, oldLoad) ->
+                        Objects.isNull(oldLoad)
+                                ? request.getTaskExecutionLoad()
+                                : 
oldLoad.merge(request.getTaskExecutionLoad()));
     }
 
     private static void logDebugInfo(
             Collection<? extends PhysicalSlot> slots,
-            Map<ResourceID, LoadingWeight> taskExecutorsLoad,
+            Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap,
             List<PendingRequest> sortedRequests) {
         LOG.debug(
-                "Available slots: {}, sortedRequests: {}, taskExecutorsLoad: 
{}",
+                "Available slots: {}, sortedRequests: {}, 
taskExecutionLoadMap: {}",
                 slots,
                 sortedRequests,
-                taskExecutorsLoad);
+                taskExecutionLoadMap);
     }
 
     private Map<ResourceID, Set<PhysicalSlotElement>> groupSlotsByTaskExecutor(
@@ -198,10 +202,10 @@ public enum TasksBalancedRequestSlotMatchingStrategy 
implements RequestSlotMatch
 
     private Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> 
getSlotCandidatesByProfile(
             Collection<PhysicalSlotElement> slotElements,
-            Map<ResourceID, LoadingWeight> taskExecutorsLoad) {
+            Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap) {
         final Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> 
result = new HashMap<>();
         final PhysicalSlotElementPriorityComparator 
physicalSlotElementPriorityComparator =
-                new PhysicalSlotElementPriorityComparator(taskExecutorsLoad);
+                new 
PhysicalSlotElementPriorityComparator(taskExecutionLoadMap);
         for (PhysicalSlotElement slotEle : slotElements) {
             result.compute(
                     slotEle.getResourceProfile(),
@@ -221,7 +225,7 @@ public enum TasksBalancedRequestSlotMatchingStrategy 
implements RequestSlotMatch
     private Optional<PhysicalSlotElement> tryMatchPhysicalSlot(
             PendingRequest request,
             Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> 
profileToSlotMap,
-            Map<ResourceID, LoadingWeight> taskExecutorsLoad,
+            Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap,
             ResourceRequestPreMappings resourceRequestPreMappings) {
         final ResourceProfile requestProfile = request.getResourceProfile();
 
@@ -242,7 +246,7 @@ public enum TasksBalancedRequestSlotMatchingStrategy 
implements RequestSlotMatch
                             return Objects.isNull(slots) ? null : slots.peek();
                         })
                 .filter(Objects::nonNull)
-                .min(new PhysicalSlotElementComparator(taskExecutorsLoad));
+                .min(new PhysicalSlotElementComparator(taskExecutionLoadMap));
     }
 
     private void updateReferenceRemainingSlots(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
index 7f3dc35d5ed..33d9bbc21f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
@@ -20,10 +20,10 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import 
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.HasTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
@@ -33,7 +33,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 /** Represents execution vertices that will run the same shared slot. */
-public class ExecutionSlotSharingGroup implements WeightLoadable {
+public class ExecutionSlotSharingGroup implements HasTaskExecutionLoad {
 
     private final Set<ExecutionVertexID> executionVertexIds;
 
@@ -69,14 +69,14 @@ public class ExecutionSlotSharingGroup implements 
WeightLoadable {
                 + executionVertexIds
                 + ", slotSharingGroup="
                 + slotSharingGroup
-                + ", loadingWeight="
-                + getLoading()
+                + ", taskExecutionLoad="
+                + getTaskExecutionLoad()
                 + '}';
     }
 
     @Nonnull
     @Override
-    public LoadingWeight getLoading() {
-        return new DefaultLoadingWeight(executionVertexIds.size());
+    public TaskExecutionLoad getTaskExecutionLoad() {
+        return new DefaultTaskExecutionLoad(executionVertexIds.size());
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
index 0a5e79777b2..74d46e82b40 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
 import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import 
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.DualKeyLinkedMap;
 import org.apache.flink.util.FlinkException;
@@ -109,7 +109,7 @@ public class SimpleExecutionSlotAllocator implements 
ExecutionSlotAllocator {
                         new PhysicalSlotRequest(
                                 slotRequestId,
                                 slotProfile,
-                                DefaultLoadingWeight.EMPTY,
+                                DefaultTaskExecutionLoad.EMPTY,
                                 slotWillBeOccupiedIndefinitely);
                 physicalSlotRequests.add(request);
                 remainingExecutionsToSlotRequest.put(slotRequestId, 
executionAttemptId);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
index 00a8ebd020c..7f926444390 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
@@ -272,7 +272,7 @@ class SlotSharingExecutionSlotAllocator implements 
ExecutionSlotAllocator {
                     new PhysicalSlotRequest(
                             physicalSlotRequestId,
                             slotProfile,
-                            group.getLoading(),
+                            group.getTaskExecutionLoad(),
                             slotWillBeOccupiedIndefinitely);
             slotRequests.add(request);
             requestToGroup.put(physicalSlotRequestId, group);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 39bfa7902a3..7d8149efdf9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -28,10 +28,10 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import 
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.HasTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.Preconditions;
 
@@ -344,7 +344,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
     }
 
     /** The execution slot sharing group for adaptive scheduler. */
-    public static class ExecutionSlotSharingGroup implements WeightLoadable {
+    public static class ExecutionSlotSharingGroup implements 
HasTaskExecutionLoad {
         private final String id;
         private final SlotSharingGroup slotSharingGroup;
         private final Set<ExecutionVertexID> containedExecutionVertices;
@@ -382,8 +382,8 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
 
         @Nonnull
         @Override
-        public LoadingWeight getLoading() {
-            return new DefaultLoadingWeight(containedExecutionVertices.size());
+        public TaskExecutionLoad getTaskExecutionLoad() {
+            return new 
DefaultTaskExecutionLoad(containedExecutionVertices.size());
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java
index d4ee374227e..35623e2c77f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import 
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
 import org.apache.flink.util.CollectionUtil;
 
 import java.util.ArrayList;
@@ -35,7 +35,7 @@ import java.util.TreeMap;
 
 import static 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
 import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
-import static 
org.apache.flink.runtime.scheduler.loading.WeightLoadable.sortByLoadingDescend;
+import static 
org.apache.flink.runtime.scheduler.taskexecload.HasTaskExecutionLoad.sortByTaskExecutionLoadDesc;
 
 /** The tasks balanced request slot matching resolver implementation. */
 public enum TasksBalancedSlotMatchingResolver implements SlotMatchingResolver {
@@ -49,17 +49,17 @@ public enum TasksBalancedSlotMatchingResolver implements 
SlotMatchingResolver {
                 new ArrayList<>(requestGroups.size());
         final Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor =
                 AllocatorUtil.getSlotsPerTaskExecutor(freeSlots);
-        final TreeMap<LoadingWeight, Set<PhysicalSlot>> loadingSlotsMap =
-                getLoadingSlotsMap(freeSlots);
+        final TreeMap<TaskExecutionLoad, Set<PhysicalSlot>> loadingSlotsMap =
+                getSlotsByTaskExecutionLoad(freeSlots);
 
-        SlotTaskExecutorWeight<LoadingWeight> best;
-        for (ExecutionSlotSharingGroup requestGroup : 
sortByLoadingDescend(requestGroups)) {
-            best = getTheBestSlotTaskExecutorLoading(loadingSlotsMap);
+        SlotTaskExecutorWeight<TaskExecutionLoad> best;
+        for (ExecutionSlotSharingGroup requestGroup : 
sortByTaskExecutionLoadDesc(requestGroups)) {
+            best = getLeastLoadedTaskSlot(loadingSlotsMap);
             slotAssignments.add(new SlotAssignment(best.physicalSlot, 
requestGroup));
 
             // Update the references
-            final LoadingWeight newLoading =
-                    best.taskExecutorWeight.merge(requestGroup.getLoading());
+            final TaskExecutionLoad newLoading =
+                    
best.taskExecutorWeight.merge(requestGroup.getTaskExecutionLoad());
             updateSlotsPerTaskExecutor(slotsPerTaskExecutor, best);
             Set<PhysicalSlot> physicalSlots = 
slotsPerTaskExecutor.get(best.getResourceID());
             updateLoadingSlotsMap(loadingSlotsMap, best, physicalSlots, 
newLoading);
@@ -68,10 +68,10 @@ public enum TasksBalancedSlotMatchingResolver implements 
SlotMatchingResolver {
     }
 
     private static void updateLoadingSlotsMap(
-            Map<LoadingWeight, Set<PhysicalSlot>> loadingSlotsMap,
-            SlotTaskExecutorWeight<LoadingWeight> best,
+            Map<TaskExecutionLoad, Set<PhysicalSlot>> loadingSlotsMap,
+            SlotTaskExecutorWeight<TaskExecutionLoad> best,
             Set<PhysicalSlot> slotsToAdjust,
-            LoadingWeight newLoading) {
+            TaskExecutionLoad newLoading) {
         Set<PhysicalSlot> physicalSlots = 
loadingSlotsMap.get(best.taskExecutorWeight);
         if (!CollectionUtil.isNullOrEmpty(physicalSlots)) {
             physicalSlots.remove(best.physicalSlot);
@@ -96,7 +96,7 @@ public enum TasksBalancedSlotMatchingResolver implements 
SlotMatchingResolver {
 
     private static void updateSlotsPerTaskExecutor(
             Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor,
-            SlotTaskExecutorWeight<LoadingWeight> best) {
+            SlotTaskExecutorWeight<TaskExecutionLoad> best) {
         Set<PhysicalSlot> slots = 
slotsPerTaskExecutor.get(best.getResourceID());
         if (Objects.nonNull(slots)) {
             slots.remove(best.physicalSlot);
@@ -106,21 +106,22 @@ public enum TasksBalancedSlotMatchingResolver implements 
SlotMatchingResolver {
         }
     }
 
-    private static TreeMap<LoadingWeight, Set<PhysicalSlot>> 
getLoadingSlotsMap(
+    private static TreeMap<TaskExecutionLoad, Set<PhysicalSlot>> 
getSlotsByTaskExecutionLoad(
             Collection<PhysicalSlot> slots) {
         return new TreeMap<>() {
             {
                 HashSet<PhysicalSlot> slotsValue =
                         
CollectionUtil.newHashSetWithExpectedSize(slots.size());
                 slotsValue.addAll(slots);
-                put(DefaultLoadingWeight.EMPTY, slotsValue);
+                put(DefaultTaskExecutionLoad.EMPTY, slotsValue);
             }
         };
     }
 
-    private static SlotTaskExecutorWeight<LoadingWeight> 
getTheBestSlotTaskExecutorLoading(
-            TreeMap<LoadingWeight, Set<PhysicalSlot>> slotsByLoading) {
-        final Map.Entry<LoadingWeight, Set<PhysicalSlot>> firstEntry = 
slotsByLoading.firstEntry();
+    private static SlotTaskExecutorWeight<TaskExecutionLoad> 
getLeastLoadedTaskSlot(
+            TreeMap<TaskExecutionLoad, Set<PhysicalSlot>> 
slotsByTaskExecutionLoad) {
+        final Map.Entry<TaskExecutionLoad, Set<PhysicalSlot>> firstEntry =
+                slotsByTaskExecutionLoad.firstEntry();
         if (firstEntry == null
                 || firstEntry.getKey() == null
                 || CollectionUtil.isNullOrEmpty(firstEntry.getValue())) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java
deleted file mode 100644
index 80ec766cc6a..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.scheduler.loading;
-
-import org.apache.flink.annotation.Internal;
-
-import java.io.Serializable;
-
-/** The class is used to represent the loading weight abstraction. */
-@Internal
-public interface LoadingWeight extends Comparable<LoadingWeight>, Serializable 
{
-
-    /**
-     * Get the loading value.
-     *
-     * @return A float represented the loading.
-     */
-    float getLoading();
-
-    /**
-     * Merge the other loading weight and this one into a new object.
-     *
-     * @param other A loading weight object.
-     * @return The new merged {@link LoadingWeight}.
-     */
-    LoadingWeight merge(LoadingWeight other);
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/DefaultTaskExecutionLoad.java
similarity index 52%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/DefaultTaskExecutionLoad.java
index c7c41505ae2..2b41275e0e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/DefaultTaskExecutionLoad.java
@@ -16,44 +16,47 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.scheduler.loading;
+package org.apache.flink.runtime.scheduler.taskexecload;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nonnull;
 
 import java.util.Objects;
 
-/** The default implementation of {@link LoadingWeight}. */
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The default implementation of {@link TaskExecutionLoad}. */
 @Internal
-public class DefaultLoadingWeight implements LoadingWeight {
+public class DefaultTaskExecutionLoad implements TaskExecutionLoad {
 
-    public static final LoadingWeight EMPTY = new DefaultLoadingWeight(0f);
+    public static final TaskExecutionLoad EMPTY = new 
DefaultTaskExecutionLoad(0f);
 
-    private final float loading;
+    private final float loadValue;
 
-    public DefaultLoadingWeight(float loading) {
-        Preconditions.checkArgument(loading >= 0.0f);
-        this.loading = loading;
+    public DefaultTaskExecutionLoad(float loadValue) {
+        checkArgument(loadValue >= 0.0f);
+        this.loadValue = loadValue;
     }
 
     @Override
-    public float getLoading() {
-        return loading;
+    public float getLoadValue() {
+        return loadValue;
     }
 
     @Override
-    public LoadingWeight merge(LoadingWeight other) {
-        if (other == null) {
-            return this;
-        }
-        return new DefaultLoadingWeight(loading + other.getLoading());
+    public int getLoadValueAsInt() {
+        return (int) loadValue;
+    }
+
+    @Override
+    public TaskExecutionLoad merge(TaskExecutionLoad other) {
+        return other == null
+                ? this
+                : new DefaultTaskExecutionLoad(loadValue + 
other.getLoadValue());
     }
 
     @Override
-    public int compareTo(@Nonnull LoadingWeight o) {
-        return Float.compare(loading, o.getLoading());
+    public int compareTo(TaskExecutionLoad o) {
+        return Float.compare(loadValue, o.getLoadValue());
     }
 
     @Override
@@ -64,17 +67,17 @@ public class DefaultLoadingWeight implements LoadingWeight {
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        DefaultLoadingWeight that = (DefaultLoadingWeight) o;
-        return Float.compare(loading, that.loading) == 0f;
+        DefaultTaskExecutionLoad that = (DefaultTaskExecutionLoad) o;
+        return compareTo(that) == 0f;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(loading);
+        return Objects.hash(loadValue);
     }
 
     @Override
     public String toString() {
-        return "DefaultLoadingWeight{loading=" + loading + '}';
+        return "DefaultTaskExecutionLoad{load=" + loadValue + '}';
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/WeightLoadable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/HasTaskExecutionLoad.java
similarity index 65%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/WeightLoadable.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/HasTaskExecutionLoad.java
index 72f8b994ab2..2f6d93590eb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/WeightLoadable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/HasTaskExecutionLoad.java
@@ -16,36 +16,34 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.scheduler.loading;
+package org.apache.flink.runtime.scheduler.taskexecload;
 
 import org.apache.flink.annotation.Internal;
 
-import javax.annotation.Nonnull;
-
 import java.util.Collection;
 import java.util.List;
 import java.util.stream.Collectors;
 
 /**
- * The interface that holds the {@link LoadingWeight} getter is required for 
corresponding
+ * The interface that holds the {@link TaskExecutionLoad} getter is required 
for corresponding
  * abstractions.
  */
 @Internal
-public interface WeightLoadable {
+public interface HasTaskExecutionLoad {
 
     /**
-     * Get the loading weight.
+     * Retrieves the task execution load.
      *
-     * @return An implementation object of {@link LoadingWeight}.
+     * @return a {@link TaskExecutionLoad} instance
      */
-    @Nonnull
-    LoadingWeight getLoading();
+    TaskExecutionLoad getTaskExecutionLoad();
 
-    static <T extends WeightLoadable> List<T> 
sortByLoadingDescend(Collection<T> weightLoadables) {
-        return weightLoadables.stream()
+    static <T extends HasTaskExecutionLoad> List<T> 
sortByTaskExecutionLoadDesc(Collection<T> c) {
+        return c.stream()
                 .sorted(
                         (leftReq, rightReq) ->
-                                
rightReq.getLoading().compareTo(leftReq.getLoading()))
+                                rightReq.getTaskExecutionLoad()
+                                        
.compareTo(leftReq.getTaskExecutionLoad()))
                 .collect(Collectors.toList());
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/TaskExecutionLoad.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/TaskExecutionLoad.java
new file mode 100644
index 00000000000..842cb865c52
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/taskexecload/TaskExecutionLoad.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.taskexecload;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+
+/**
+ * Abstraction describing the amount of {@code task execution} assigned to a 
scheduling target.
+ *
+ * <p>This is a scheduler-facing metric that supports comparing and 
aggregating placement choices.
+ * It is <b>not</b> a direct measurement of runtime resource pressure (for 
example CPU or memory
+ * utilization) on a {@code TaskExecutor}.
+ *
+ * <p>During scheduling, a load value may be associated with different 
scheduler concepts on both
+ * sides of a placement decision:
+ *
+ * <ul>
+ *   <li><b>Resource requests</b> (for example slot-sharing related groupings 
in the {@code
+ *       AdaptiveScheduler} or pending requests in the default scheduler)
+ *   <li><b>Resource entities</b> (for example {@code TaskManager} instances)
+ * </ul>
+ */
+@Internal
+public interface TaskExecutionLoad extends Comparable<TaskExecutionLoad>, 
Serializable {
+
+    /**
+     * Returns the task execution load value.
+     *
+     * @return the current task execution load value
+     */
+    float getLoadValue();
+
+    /**
+     * Returns the task execution load value as an integer, truncated if 
necessary.
+     *
+     * @return the current resource unit count
+     */
+    int getLoadValueAsInt();
+
+    /**
+     * Merge another task execution load and this one into a new object.
+     *
+     * @param other another task execution load object
+     * @return the new merged {@link TaskExecutionLoad}.
+     */
+    TaskExecutionLoad merge(TaskExecutionLoad other);
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java
index 3e1e3dfa2cb..f3652a51a2c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.runtime.clusterframework.types.SlotProfileTestingUtils;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import 
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
 
 import org.junit.jupiter.api.extension.AfterEachCallback;
 import org.junit.jupiter.api.extension.BeforeEachCallback;
@@ -102,7 +102,7 @@ public class PhysicalSlotProviderExtension implements 
BeforeEachCallback, AfterE
         return new PhysicalSlotRequest(
                 new SlotRequestId(),
                 SlotProfileTestingUtils.noLocality(ResourceProfile.UNKNOWN),
-                DefaultLoadingWeight.EMPTY,
+                DefaultTaskExecutionLoad.EMPTY,
                 false);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
index 00d42d98848..001a7fccc1e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfileTestingUtils;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import 
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import org.junit.jupiter.api.Test;
@@ -85,7 +85,7 @@ class PhysicalSlotProviderImplWithSpreadOutStrategyTest {
                         SlotProfileTestingUtils.preferredLocality(
                                 ResourceProfile.ANY,
                                 
Collections.singleton(preferredTaskManagerLocation)),
-                        DefaultLoadingWeight.EMPTY,
+                        DefaultTaskExecutionLoad.EMPTY,
                         false);
         PhysicalSlotRequest.Result result1 =
                 physicalSlotProviderExtension.allocateSlot(request1).get();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestUtils.java
index 7462f07c679..239e7990371 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestUtils.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import 
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -44,7 +44,7 @@ class PhysicalSlotRequestUtils {
                         Collections.emptyList(),
                         preferredAllocations,
                         Collections.emptySet()),
-                DefaultLoadingWeight.EMPTY,
+                DefaultTaskExecutionLoad.EMPTY,
                 true);
     }
 
@@ -71,7 +71,7 @@ class PhysicalSlotRequestUtils {
                         Collections.emptyList(),
                         Collections.emptyList(),
                         Collections.emptySet()),
-                DefaultLoadingWeight.EMPTY,
+                DefaultTaskExecutionLoad.EMPTY,
                 false);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java
index 3a047d8b45b..5df1164d0b6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import 
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLoggerExtension;
@@ -103,12 +103,12 @@ class PreferredAllocationRequestSlotMatchingStrategyTest {
                         PendingRequest.createNormalRequest(
                                 new SlotRequestId(),
                                 ResourceProfile.UNKNOWN,
-                                DefaultLoadingWeight.EMPTY,
+                                DefaultTaskExecutionLoad.EMPTY,
                                 Collections.singleton(allocationId2)),
                         PendingRequest.createNormalRequest(
                                 new SlotRequestId(),
                                 ResourceProfile.UNKNOWN,
-                                DefaultLoadingWeight.EMPTY,
+                                DefaultTaskExecutionLoad.EMPTY,
                                 Collections.singleton(allocationId1)));
 
         final Collection<RequestSlotMatchingStrategy.RequestSlotMatch> 
requestSlotMatches =
@@ -204,7 +204,7 @@ class PreferredAllocationRequestSlotMatchingStrategyTest {
         return PendingRequest.createNormalRequest(
                 new SlotRequestId(),
                 requestProfile,
-                new DefaultLoadingWeight(loading),
+                new DefaultTaskExecutionLoad(loading),
                 preferAllocationIds);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappingsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappingsTest.java
index c2bf3fd691b..a9d3dfe6ab1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappingsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappingsTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import 
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
 import org.apache.flink.util.Preconditions;
 
 import org.junit.jupiter.api.Test;
@@ -357,7 +357,7 @@ class ResourceRequestPreMappingsTest {
                     PendingRequest.createNormalRequest(
                             new SlotRequestId(),
                             Preconditions.checkNotNull(requiredProfile),
-                            DefaultLoadingWeight.EMPTY,
+                            DefaultTaskExecutionLoad.EMPTY,
                             Collections.emptyList()));
         }
         return pendingRequests;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategyTest.java
index 32f834e6fcf..93c34ae2dd9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategyTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import 
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
 import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.flink.shaded.guava33.com.google.common.collect.Iterators;
@@ -50,13 +50,13 @@ public class SimpleRequestSlotMatchingStrategyTest {
                 PendingRequest.createNormalRequest(
                         new SlotRequestId(),
                         ResourceProfile.UNKNOWN,
-                        DefaultLoadingWeight.EMPTY,
+                        DefaultTaskExecutionLoad.EMPTY,
                         Collections.emptyList());
         final PendingRequest pendingRequest2 =
                 PendingRequest.createNormalRequest(
                         new SlotRequestId(),
                         ResourceProfile.UNKNOWN,
-                        DefaultLoadingWeight.EMPTY,
+                        DefaultTaskExecutionLoad.EMPTY,
                         Collections.emptyList());
         final Collection<PendingRequest> pendingRequests =
                 Arrays.asList(pendingRequest1, pendingRequest2);
@@ -90,13 +90,13 @@ public class SimpleRequestSlotMatchingStrategyTest {
                 PendingRequest.createNormalRequest(
                         new SlotRequestId(),
                         large,
-                        DefaultLoadingWeight.EMPTY,
+                        DefaultTaskExecutionLoad.EMPTY,
                         Collections.emptyList());
         final PendingRequest pendingRequest2 =
                 PendingRequest.createNormalRequest(
                         new SlotRequestId(),
                         small,
-                        DefaultLoadingWeight.EMPTY,
+                        DefaultTaskExecutionLoad.EMPTY,
                         Collections.emptyList());
         final Collection<PendingRequest> pendingRequests =
                 Arrays.asList(pendingRequest1, pendingRequest2);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategyTest.java
index 706c3c8fefe..1b86f92e3fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategyTest.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import 
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
@@ -67,8 +67,8 @@ class TasksBalancedRequestSlotMatchingStrategyTest {
                         pendingRequests,
                         new HashMap<>() {
                             {
-                                put(tmLocation1.getResourceID(), 
DefaultLoadingWeight.EMPTY);
-                                put(tmLocation2.getResourceID(), new 
DefaultLoadingWeight(9));
+                                put(tmLocation1.getResourceID(), 
DefaultTaskExecutionLoad.EMPTY);
+                                put(tmLocation2.getResourceID(), new 
DefaultTaskExecutionLoad(9));
                             }
                         });
         assertThat(requestSlotMatches).hasSize(2);
@@ -108,7 +108,7 @@ class TasksBalancedRequestSlotMatchingStrategyTest {
         return PendingRequest.createNormalRequest(
                 new SlotRequestId(),
                 requestProfile,
-                new DefaultLoadingWeight(loading),
+                new DefaultTaskExecutionLoad(loading),
                 Collections.emptyList());
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
index 6a1028fb26a..bca552bc185 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -89,7 +89,7 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
 
     private final Consumer<ResourceCounter> setResourceRequirementsConsumer;
 
-    private final Supplier<Map<ResourceID, LoadingWeight>> 
taskExecutorsLoadingWeightSupplier;
+    private final Supplier<Map<ResourceID, TaskExecutionLoad>> 
taskExecutionLoadMapSupplier;
 
     TestingDeclarativeSlotPool(
             Consumer<ResourceCounter> increaseResourceRequirementsByConsumer,
@@ -120,7 +120,7 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
             Function<AllocationID, Boolean> containsFreeSlotFunction,
             LongConsumer releaseIdleSlotsConsumer,
             Consumer<ResourceCounter> setResourceRequirementsConsumer,
-            Supplier<Map<ResourceID, LoadingWeight>> 
taskExecutorsLoadingWeightSupplier) {
+            Supplier<Map<ResourceID, TaskExecutionLoad>> 
taskExecutionLoadMapSupplier) {
         this.increaseResourceRequirementsByConsumer = 
increaseResourceRequirementsByConsumer;
         this.decreaseResourceRequirementsByConsumer = 
decreaseResourceRequirementsByConsumer;
         this.getResourceRequirementsSupplier = getResourceRequirementsSupplier;
@@ -137,7 +137,7 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
         this.containsFreeSlotFunction = containsFreeSlotFunction;
         this.releaseIdleSlotsConsumer = releaseIdleSlotsConsumer;
         this.setResourceRequirementsConsumer = setResourceRequirementsConsumer;
-        this.taskExecutorsLoadingWeightSupplier = 
taskExecutorsLoadingWeightSupplier;
+        this.taskExecutionLoadMapSupplier = taskExecutionLoadMapSupplier;
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
index 6b5960ee067..1435ace092c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -83,7 +83,7 @@ public class TestingDeclarativeSlotPoolBuilder {
                     Collection<SlotOffer>>
             registerSlotsFunction =
                     (slotOffers, ignoredB, ignoredC, ignoredD) -> new 
ArrayList<>(slotOffers);
-    private Supplier<Map<ResourceID, LoadingWeight>> 
taskExecutorsLoadingWeightSupplier =
+    private Supplier<Map<ResourceID, TaskExecutionLoad>> 
taskExecutionLoadMapSupplier =
             HashMap::new;
 
     public TestingDeclarativeSlotPoolBuilder 
setIncreaseResourceRequirementsByConsumer(
@@ -213,6 +213,6 @@ public class TestingDeclarativeSlotPoolBuilder {
                 containsFreeSlotFunction,
                 returnIdleSlotsConsumer,
                 setResourceRequirementsConsumer,
-                taskExecutorsLoadingWeightSupplier);
+                taskExecutionLoadMapSupplier);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
index e2be038d345..5a3771dfe66 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
@@ -22,9 +22,9 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
-import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
-import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import 
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
+import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
@@ -158,11 +158,11 @@ class TasksBalancedSlotMatchingResolverTest extends 
AbstractSlotMatchingResolver
                                                                     
s.getTargetAs(
                                                                                
     ExecutionSlotSharingGroup
                                                                                
             .class)
-                                                                            
.getLoading())
+                                                                            
.getTaskExecutionLoad())
                                                     .reduce(
-                                                            
DefaultLoadingWeight.EMPTY,
-                                                            
LoadingWeight::merge)
-                                                    .getLoading())
+                                                            
DefaultTaskExecutionLoad.EMPTY,
+                                                            
TaskExecutionLoad::merge)
+                                                    .getLoadValue())
                                     .isGreaterThanOrEqualTo(9f);
                         });
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeightTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/taskexecload/DefaultTaskExecutionLoadTest.java
similarity index 60%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeightTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/taskexecload/DefaultTaskExecutionLoadTest.java
index 4d4203dc668..57cf1e82f76 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeightTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/taskexecload/DefaultTaskExecutionLoadTest.java
@@ -16,26 +16,36 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.scheduler.loading;
+package org.apache.flink.runtime.scheduler.taskexecload;
 
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Test for {@link DefaultLoadingWeight}. */
-class DefaultLoadingWeightTest {
+/** Test for {@link DefaultTaskExecutionLoad}. */
+class DefaultTaskExecutionLoadTest {
 
     @Test
     void testInvalidLoading() {
-        assertThatThrownBy(() -> new DefaultLoadingWeight(-1f))
+        assertThatThrownBy(() -> new DefaultTaskExecutionLoad(-1f))
                 .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
     void testMerge() {
-        assertThat(new 
DefaultLoadingWeight(0).merge(null).getLoading()).isZero();
-        assertThat(new DefaultLoadingWeight(0).merge(new 
DefaultLoadingWeight(1.2f)).getLoading())
+        assertThat(new 
DefaultTaskExecutionLoad(0).merge(null).getLoadValue()).isZero();
+        assertThat(
+                        new DefaultTaskExecutionLoad(0)
+                                .merge(new DefaultTaskExecutionLoad(1.2f))
+                                .getLoadValue())
                 .isEqualTo(1.2f);
     }
+
+    @Test
+    void testGetLoadValueAsInt() {
+        assertThat(new 
DefaultTaskExecutionLoad(2.9f).getLoadValueAsInt()).isEqualTo(2);
+        assertThat(new DefaultTaskExecutionLoad(3e10f).getLoadValueAsInt())
+                .isEqualTo(Integer.MAX_VALUE);
+    }
 }

Reply via email to