xintongsong commented on a change in pull request #15668:
URL: https://github.com/apache/flink/pull/15668#discussion_r616446455
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
##########
@@ -49,7 +50,7 @@
*
* <p>Note: The current implementation of this strategy is non-optimal, in
terms of computation
* efficiency. In the worst case, for each requirement it checks all
registered and pending
- * resources. TODO: This will be optimized in FLINK-21174.
+ * resources.
Review comment:
I'd suggest to further update the docs here.
- "for each requirement" - > "for each distinctly profiled requirement"
- Further optimization requires complex data structures for ordering
multi-dimensional resource profiles. The complexity is not necessary.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
##########
@@ -71,12 +72,11 @@ public ResourceAllocationResult tryFulfillRequirements(
TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
final ResourceAllocationResult.Builder resultBuilder =
ResourceAllocationResult.builder();
- // Tuples of available and default slot resource for registered task
managers, indexed by
- // instanceId
- final Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>>
registeredResources =
+ // Tuples of instanceId, available and default slot resource for
registered task managers
+ final List<Tuple3<InstanceID, ResourceProfile, ResourceProfile>>
registeredResources =
getRegisteredResources(taskManagerResourceInfoProvider);
- // Available resources of pending task managers, indexed by the
pendingTaskManagerId
- final Map<PendingTaskManagerId, ResourceProfile> pendingResources =
+ // PendingTaskManagerId and available resource of pending task managers
+ final List<Tuple2<PendingTaskManagerId, ResourceProfile>>
pendingResources =
Review comment:
I would suggest to introduce an inner class, replacing the tuples.
```
class InternalResourceInfo() {
AbstractID id;
ResourceProfile default;
ResourceProfile available;
void occupy(ResourceProfile rp) {
// xxx
};
}
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
##########
@@ -142,115 +145,98 @@ private static ResourceCounter
tryFulfillRequirementsForJobWithRegisteredResourc
private static int tryFindSlotsForRequirement(
JobID jobId,
ResourceRequirement resourceRequirement,
- Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>>
registeredResources,
+ List<Tuple3<InstanceID, ResourceProfile, ResourceProfile>>
registeredResources,
ResourceAllocationResult.Builder resultBuilder) {
final ResourceProfile requiredResource =
resourceRequirement.getResourceProfile();
int numUnfulfilled = resourceRequirement.getNumberOfRequiredSlots();
- while (numUnfulfilled > 0) {
- final Optional<InstanceID> matchedTaskManager =
- findMatchingTaskManager(requiredResource,
registeredResources);
-
- if (!matchedTaskManager.isPresent()) {
- // exit loop early; we won't find a matching slot for this
requirement
- break;
- }
-
+ final Iterator<Tuple3<InstanceID, ResourceProfile, ResourceProfile>>
+ registeredResourcesItr = registeredResources.iterator();
+ while (numUnfulfilled > 0 && registeredResourcesItr.hasNext()) {
+ final Tuple3<InstanceID, ResourceProfile, ResourceProfile>
currentTaskManager =
+ registeredResourcesItr.next();
final ResourceProfile effectiveProfile =
- getEffectiveResourceProfile(
- requiredResource,
registeredResources.get(matchedTaskManager.get()).f1);
- resultBuilder.addAllocationOnRegisteredResource(
- jobId, matchedTaskManager.get(), effectiveProfile);
- deductionRegisteredResource(
- registeredResources, matchedTaskManager.get(),
effectiveProfile);
- numUnfulfilled--;
+ getEffectiveResourceProfile(requiredResource,
currentTaskManager.f2);
+ while (numUnfulfilled > 0
+ && canFulfillRequirement(effectiveProfile,
currentTaskManager.f1)) {
+ resultBuilder.addAllocationOnRegisteredResource(
+ jobId, currentTaskManager.f0, effectiveProfile);
+
currentTaskManager.setField(currentTaskManager.f1.subtract(effectiveProfile),
1);
+ numUnfulfilled--;
+ }
+ if (currentTaskManager.f1.equals(ResourceProfile.ZERO)) {
+ registeredResourcesItr.remove();
+ }
}
return numUnfulfilled;
}
- private static Optional<InstanceID> findMatchingTaskManager(
- ResourceProfile requirement,
- Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>>
registeredResources) {
- return registeredResources.entrySet().stream()
- .filter(
- taskManager ->
- canFulfillRequirement(
- getEffectiveResourceProfile(
- requirement,
taskManager.getValue().f1),
- taskManager.getValue().f0))
- .findAny()
- .map(Map.Entry::getKey);
- }
-
private static boolean canFulfillRequirement(
ResourceProfile requirement, ResourceProfile resourceProfile) {
return resourceProfile.allFieldsNoLessThan(requirement);
}
- private static void deductionRegisteredResource(
- Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>>
registeredResources,
- InstanceID instanceId,
- ResourceProfile resourceProfile) {
- registeredResources.compute(
- instanceId,
- (id, tuple2) -> {
- Preconditions.checkNotNull(tuple2);
- if
(tuple2.f0.subtract(resourceProfile).equals(ResourceProfile.ZERO)) {
- return null;
- } else {
- return Tuple2.of(tuple2.f0.subtract(resourceProfile),
tuple2.f1);
- }
- });
- }
-
- private static Optional<PendingTaskManagerId> findPendingManagerToFulfill(
- ResourceProfile resourceProfile,
- Map<PendingTaskManagerId, ResourceProfile> availableResources) {
- return availableResources.entrySet().stream()
- .filter(entry ->
entry.getValue().allFieldsNoLessThan(resourceProfile))
- .findAny()
- .map(Map.Entry::getKey);
- }
-
private void tryFulfillRequirementsForJobWithPendingResources(
JobID jobId,
ResourceCounter unfulfilledRequirements,
- Map<PendingTaskManagerId, ResourceProfile> availableResources,
+ List<Tuple2<PendingTaskManagerId, ResourceProfile>>
availableResources,
ResourceAllocationResult.Builder resultBuilder) {
for (Map.Entry<ResourceProfile, Integer> missingResource :
unfulfilledRequirements.getResourcesWithCount()) {
// for this strategy, all pending resources should have the same
default slot resource
final ResourceProfile effectiveProfile =
getEffectiveResourceProfile(
missingResource.getKey(),
defaultSlotResourceProfile);
- for (int i = 0; i < missingResource.getValue(); i++) {
- Optional<PendingTaskManagerId> matchedPendingTaskManager =
- findPendingManagerToFulfill(effectiveProfile,
availableResources);
- if (matchedPendingTaskManager.isPresent()) {
- availableResources.compute(
- matchedPendingTaskManager.get(),
- ((pendingTaskManagerId, resourceProfile) ->
- Preconditions.checkNotNull(resourceProfile)
- .subtract(effectiveProfile)));
+ final Iterator<Tuple2<PendingTaskManagerId, ResourceProfile>>
availableResourcesItr =
+ availableResources.iterator();
+ int numUnfulfilled = missingResource.getValue();
+ while (numUnfulfilled > 0 && availableResourcesItr.hasNext()) {
+ final Tuple2<PendingTaskManagerId, ResourceProfile>
pendingTaskManager =
+ availableResourcesItr.next();
+ while (numUnfulfilled > 0
+ && canFulfillRequirement(effectiveProfile,
pendingTaskManager.f1)) {
resultBuilder.addAllocationOnPendingResource(
- jobId, matchedPendingTaskManager.get(),
effectiveProfile);
- } else {
- if
(totalResourceProfile.allFieldsNoLessThan(effectiveProfile)) {
- // Add new pending task manager
- final PendingTaskManager pendingTaskManager =
- new PendingTaskManager(totalResourceProfile,
numSlotsPerWorker);
-
resultBuilder.addPendingTaskManagerAllocate(pendingTaskManager);
- resultBuilder.addAllocationOnPendingResource(
- jobId,
- pendingTaskManager.getPendingTaskManagerId(),
- effectiveProfile);
- availableResources.put(
- pendingTaskManager.getPendingTaskManagerId(),
-
totalResourceProfile.subtract(effectiveProfile));
- } else {
- resultBuilder.addUnfulfillableJob(jobId);
- break;
- }
+ jobId, pendingTaskManager.f0, effectiveProfile);
+ numUnfulfilled--;
+ pendingTaskManager.setField(
+ pendingTaskManager.f1.subtract(effectiveProfile),
1);
+ }
+ if (pendingTaskManager.f1.equals(ResourceProfile.ZERO)) {
+ availableResourcesItr.remove();
+ }
+ }
Review comment:
I wonder if it make sense to further deduplicate the matching logics for
registered/pending resources. As far as I can see, they are very similar. The
only differences are:
- Action to take when resource and requirement are matched. We can make it a
function parameter.
- Handling of unfulfilled requirements (allocating more pending TMs and
report unfulfillable job), which is the very last step.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]