xintongsong commented on code in PR #22176:
URL: https://github.com/apache/flink/pull/22176#discussion_r1147090634
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -237,27 +239,133 @@ && canFulfillRequirement(effectiveProfile,
remainResource)) {
private static class InternalResourceInfo {
private final ResourceProfile defaultSlotProfile;
private final BiConsumer<JobID, ResourceProfile> allocationConsumer;
+ private final ResourceProfile totalProfile;
private ResourceProfile availableProfile;
+ private double utilization;
InternalResourceInfo(
ResourceProfile defaultSlotProfile,
+ ResourceProfile totalProfile,
ResourceProfile availableProfile,
BiConsumer<JobID, ResourceProfile> allocationConsumer) {
+
Preconditions.checkState(!defaultSlotProfile.equals(ResourceProfile.UNKNOWN));
+
Preconditions.checkState(!totalProfile.equals(ResourceProfile.UNKNOWN));
+
Preconditions.checkState(!availableProfile.equals(ResourceProfile.UNKNOWN));
this.defaultSlotProfile = defaultSlotProfile;
+ this.totalProfile = totalProfile;
this.availableProfile = availableProfile;
this.allocationConsumer = allocationConsumer;
+ this.utilization = updateUtilization();
+ }
+
+ boolean availableResourceMatchingRequest(ResourceProfile requirement) {
+ final ResourceProfile effectiveProfile =
+ getEffectiveResourceProfile(requirement,
defaultSlotProfile);
+ return availableProfile.allFieldsNoLessThan(effectiveProfile);
}
boolean tryAllocateSlotForJob(JobID jobId, ResourceProfile
requirement) {
final ResourceProfile effectiveProfile =
getEffectiveResourceProfile(requirement,
defaultSlotProfile);
- if (availableProfile.allFieldsNoLessThan(effectiveProfile)) {
+ if (availableResourceMatchingRequest(requirement)) {
availableProfile = availableProfile.subtract(effectiveProfile);
allocationConsumer.accept(jobId, effectiveProfile);
+ utilization = updateUtilization();
return true;
} else {
return false;
}
}
+
+ private double updateUtilization() {
+ double cpuUtilization =
+ totalProfile
+ .getCpuCores()
+ .subtract(availableProfile.getCpuCores())
+ .getValue()
+ .doubleValue()
+ /
totalProfile.getCpuCores().getValue().doubleValue();
+ double memoryUtilization =
+ (double)
+ totalProfile
+ .getTotalMemory()
+
.subtract(availableProfile.getTotalMemory())
+ .getBytes()
+ / totalProfile.getTotalMemory().getBytes();
+ return Math.max(cpuUtilization, memoryUtilization);
+ }
+ }
+
+ private interface ResourceMatchingStrategy {
+
+ int tryFulfilledRequirementWithResource(
+ List<InternalResourceInfo> internalResources,
+ int numUnfulfilled,
+ ResourceProfile requiredResource,
+ JobID jobId);
+ }
+
+ private enum AnyMatchingResourceMatchingStrategy implements
ResourceMatchingStrategy {
+ INSTANCE;
+
+ @Override
+ public int tryFulfilledRequirementWithResource(
+ List<InternalResourceInfo> internalResources,
+ int numUnfulfilled,
+ ResourceProfile requiredResource,
+ JobID jobId) {
+ final Iterator<InternalResourceInfo> internalResourceInfoItr =
+ internalResources.iterator();
+ while (numUnfulfilled > 0 && internalResourceInfoItr.hasNext()) {
+ final InternalResourceInfo currentTaskManager =
internalResourceInfoItr.next();
+ while (numUnfulfilled > 0
+ && currentTaskManager.tryAllocateSlotForJob(jobId,
requiredResource)) {
+ numUnfulfilled--;
+ }
+ if
(currentTaskManager.availableProfile.equals(ResourceProfile.ZERO)) {
+ internalResourceInfoItr.remove();
+ }
+ }
+ return numUnfulfilled;
+ }
+ }
+
+ private enum LeastUtilizationResourceMatchingStrategy implements
ResourceMatchingStrategy {
+ INSTANCE;
+
+ @Override
+ public int tryFulfilledRequirementWithResource(
+ List<InternalResourceInfo> internalResources,
+ int numUnfulfilled,
+ ResourceProfile requiredResource,
+ JobID jobId) {
+ if (internalResources.isEmpty()) {
+ return numUnfulfilled;
+ }
+
+ Queue<InternalResourceInfo> resourceInfoInUtilizationOrder =
+ new PriorityQueue<>(
+ internalResources.size(),
+ Comparator.comparingDouble(i -> i.utilization));
+ resourceInfoInUtilizationOrder.addAll(internalResources);
+
+ while (numUnfulfilled > 0 &&
!resourceInfoInUtilizationOrder.isEmpty()) {
+ final InternalResourceInfo currentTaskManager =
+ resourceInfoInUtilizationOrder.peek();
+
+ if (currentTaskManager.tryAllocateSlotForJob(jobId,
requiredResource)) {
+ numUnfulfilled--;
+
+ // ignore non resource task managers to reduce the
overhead of insert.
+ resourceInfoInUtilizationOrder.poll();
+ if
(!currentTaskManager.availableProfile.equals(ResourceProfile.ZERO)) {
+ resourceInfoInUtilizationOrder.add(currentTaskManager);
+ }
+ } else {
+ break;
+ }
Review Comment:
I think we should not break the loop here. It is possible that the head of
the queue does not match the requirement while other workers in the queue
matches. E.g., the worker with a lower utilization may run out of a specific
type (e.g., heap) of memory, while a a worker with a higher utilization has
enough memory for all the types.
I think we should always poll the worker from the queue, and only add it
back if the allocation succeed. That means workers that does not match the
requirement will be excluded, and eventually either all requirements are
fulfilled or there's no more workers in the queue.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java:
##########
@@ -162,6 +162,7 @@ public DeclarativeSlotManager build() {
declareNeededResourceDelay,
waitResultConsumedBeforeRelease,
slotMatchingStrategy,
+ false,
Review Comment:
This should be consistent with `slotMatchingStrategy`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -237,27 +239,133 @@ && canFulfillRequirement(effectiveProfile,
remainResource)) {
private static class InternalResourceInfo {
private final ResourceProfile defaultSlotProfile;
private final BiConsumer<JobID, ResourceProfile> allocationConsumer;
+ private final ResourceProfile totalProfile;
private ResourceProfile availableProfile;
+ private double utilization;
InternalResourceInfo(
ResourceProfile defaultSlotProfile,
+ ResourceProfile totalProfile,
ResourceProfile availableProfile,
BiConsumer<JobID, ResourceProfile> allocationConsumer) {
+
Preconditions.checkState(!defaultSlotProfile.equals(ResourceProfile.UNKNOWN));
+
Preconditions.checkState(!totalProfile.equals(ResourceProfile.UNKNOWN));
+
Preconditions.checkState(!availableProfile.equals(ResourceProfile.UNKNOWN));
this.defaultSlotProfile = defaultSlotProfile;
+ this.totalProfile = totalProfile;
this.availableProfile = availableProfile;
this.allocationConsumer = allocationConsumer;
+ this.utilization = updateUtilization();
+ }
+
+ boolean availableResourceMatchingRequest(ResourceProfile requirement) {
+ final ResourceProfile effectiveProfile =
+ getEffectiveResourceProfile(requirement,
defaultSlotProfile);
+ return availableProfile.allFieldsNoLessThan(effectiveProfile);
}
boolean tryAllocateSlotForJob(JobID jobId, ResourceProfile
requirement) {
final ResourceProfile effectiveProfile =
getEffectiveResourceProfile(requirement,
defaultSlotProfile);
- if (availableProfile.allFieldsNoLessThan(effectiveProfile)) {
+ if (availableResourceMatchingRequest(requirement)) {
availableProfile = availableProfile.subtract(effectiveProfile);
allocationConsumer.accept(jobId, effectiveProfile);
+ utilization = updateUtilization();
return true;
} else {
return false;
}
}
+
+ private double updateUtilization() {
+ double cpuUtilization =
+ totalProfile
+ .getCpuCores()
+ .subtract(availableProfile.getCpuCores())
+ .getValue()
+ .doubleValue()
+ /
totalProfile.getCpuCores().getValue().doubleValue();
+ double memoryUtilization =
+ (double)
+ totalProfile
+ .getTotalMemory()
+
.subtract(availableProfile.getTotalMemory())
+ .getBytes()
+ / totalProfile.getTotalMemory().getBytes();
+ return Math.max(cpuUtilization, memoryUtilization);
+ }
+ }
+
+ private interface ResourceMatchingStrategy {
+
+ int tryFulfilledRequirementWithResource(
+ List<InternalResourceInfo> internalResources,
+ int numUnfulfilled,
+ ResourceProfile requiredResource,
+ JobID jobId);
+ }
+
+ private enum AnyMatchingResourceMatchingStrategy implements
ResourceMatchingStrategy {
+ INSTANCE;
+
+ @Override
+ public int tryFulfilledRequirementWithResource(
+ List<InternalResourceInfo> internalResources,
+ int numUnfulfilled,
+ ResourceProfile requiredResource,
+ JobID jobId) {
+ final Iterator<InternalResourceInfo> internalResourceInfoItr =
+ internalResources.iterator();
+ while (numUnfulfilled > 0 && internalResourceInfoItr.hasNext()) {
+ final InternalResourceInfo currentTaskManager =
internalResourceInfoItr.next();
+ while (numUnfulfilled > 0
+ && currentTaskManager.tryAllocateSlotForJob(jobId,
requiredResource)) {
+ numUnfulfilled--;
+ }
+ if
(currentTaskManager.availableProfile.equals(ResourceProfile.ZERO)) {
+ internalResourceInfoItr.remove();
+ }
+ }
+ return numUnfulfilled;
+ }
+ }
+
+ private enum LeastUtilizationResourceMatchingStrategy implements
ResourceMatchingStrategy {
+ INSTANCE;
+
+ @Override
+ public int tryFulfilledRequirementWithResource(
+ List<InternalResourceInfo> internalResources,
+ int numUnfulfilled,
+ ResourceProfile requiredResource,
+ JobID jobId) {
+ if (internalResources.isEmpty()) {
+ return numUnfulfilled;
+ }
+
+ Queue<InternalResourceInfo> resourceInfoInUtilizationOrder =
+ new PriorityQueue<>(
+ internalResources.size(),
+ Comparator.comparingDouble(i -> i.utilization));
+ resourceInfoInUtilizationOrder.addAll(internalResources);
+
+ while (numUnfulfilled > 0 &&
!resourceInfoInUtilizationOrder.isEmpty()) {
+ final InternalResourceInfo currentTaskManager =
+ resourceInfoInUtilizationOrder.peek();
+
+ if (currentTaskManager.tryAllocateSlotForJob(jobId,
requiredResource)) {
+ numUnfulfilled--;
+
+ // ignore non resource task managers to reduce the
overhead of insert.
+ resourceInfoInUtilizationOrder.poll();
+ if
(!currentTaskManager.availableProfile.equals(ResourceProfile.ZERO)) {
+ resourceInfoInUtilizationOrder.add(currentTaskManager);
+ }
+ } else {
+ break;
+ }
Review Comment:
And we should also add a test case for the above example.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java:
##########
@@ -39,7 +39,7 @@
protected Optional<ResourceAllocationStrategy>
getResourceAllocationStrategy() {
return Optional.of(
new DefaultResourceAllocationStrategy(
- DEFAULT_TOTAL_RESOURCE_PROFILE,
DEFAULT_NUM_SLOTS_PER_WORKER));
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
DEFAULT_NUM_SLOTS_PER_WORKER, false));
Review Comment:
This creates a chance that the evenly evenly spread out configuration is
inconsistent between the resource allocation strategy and the slot manager
configuration.
I think we should pass the slot manager configuration into
`getResourceAllocationStrategy` and read the boolean from it.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -237,27 +239,133 @@ && canFulfillRequirement(effectiveProfile,
remainResource)) {
private static class InternalResourceInfo {
private final ResourceProfile defaultSlotProfile;
private final BiConsumer<JobID, ResourceProfile> allocationConsumer;
+ private final ResourceProfile totalProfile;
private ResourceProfile availableProfile;
+ private double utilization;
InternalResourceInfo(
ResourceProfile defaultSlotProfile,
+ ResourceProfile totalProfile,
ResourceProfile availableProfile,
BiConsumer<JobID, ResourceProfile> allocationConsumer) {
+
Preconditions.checkState(!defaultSlotProfile.equals(ResourceProfile.UNKNOWN));
+
Preconditions.checkState(!totalProfile.equals(ResourceProfile.UNKNOWN));
+
Preconditions.checkState(!availableProfile.equals(ResourceProfile.UNKNOWN));
this.defaultSlotProfile = defaultSlotProfile;
+ this.totalProfile = totalProfile;
this.availableProfile = availableProfile;
this.allocationConsumer = allocationConsumer;
+ this.utilization = updateUtilization();
+ }
+
+ boolean availableResourceMatchingRequest(ResourceProfile requirement) {
+ final ResourceProfile effectiveProfile =
+ getEffectiveResourceProfile(requirement,
defaultSlotProfile);
+ return availableProfile.allFieldsNoLessThan(effectiveProfile);
Review Comment:
We should not need this method. It is only used in
`tryAllocateSlotForJob()`, where the `effectiveProfile` is already calculated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]