xintongsong commented on code in PR #22305:
URL: https://github.com/apache/flink/pull/22305#discussion_r1168233660


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -315,6 +318,7 @@ public void 
processResourceRequirements(ResourceRequirements resourceRequirement
             jobMasterTargetAddresses.remove(resourceRequirements.getJobId());
             if (resourceAllocator.isSupported()) {
                 
taskManagerTracker.clearPendingAllocationsOfJob(resourceRequirements.getJobId());
+                checkTaskManagerReleasable();

Review Comment:
   This is probably not necessary, because `checkResourceRequirementsWithDelay` 
will be called anyway by the end of this method.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -236,6 +323,52 @@ && canFulfillRequirement(effectiveProfile, 
remainResource)) {
         }
     }
 
+    private void tryFulFillRedundantResources(
+            ResourceProfile requiredRedundantResource,
+            List<InternalResourceInfo> availableRegisteredResources,
+            List<InternalResourceInfo> availablePendingResources,
+            ResourceAllocationResult.Builder resultBuilder) {
+        ResourceProfile totalAvailableResources =
+                Stream.concat(
+                                availableRegisteredResources.stream(),
+                                availablePendingResources.stream())
+                        .map(internalResourceInfo -> 
internalResourceInfo.availableProfile)
+                        .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
+
+        while (!canFulfillRequirement(requiredRedundantResource, 
totalAvailableResources)) {
+            PendingTaskManager pendingTaskManager =
+                    new PendingTaskManager(totalResourceProfile, 
numSlotsPerWorker);
+            resultBuilder.addPendingTaskManagerAllocate(pendingTaskManager);
+            totalAvailableResources = 
totalAvailableResources.merge(totalResourceProfile);
+        }
+    }
+
+    private ResourceProfile 
getAvailableResourceOfTaskManagers(List<TaskManagerInfo> taskManagers) {
+        return taskManagers.stream()
+                .map(TaskManagerInfo::getAvailableResource)
+                .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
+    }
+
+    private ResourceProfile getAvailableResourceOfPendingTaskManagers(
+            List<PendingTaskManager> pendingTaskManagers,
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+        return pendingTaskManagers.stream()
+                .map(
+                        pendingTaskManager -> {
+                            ResourceProfile usedResource =
+                                    taskManagerResourceInfoProvider
+                                            
.getPendingAllocationsOfPendingTaskManager(
+                                                    
pendingTaskManager.getPendingTaskManagerId())
+                                            .values().stream()
+                                            
.map(ResourceCounter::getTotalResource)
+                                            .reduce(ResourceProfile.ZERO, 
ResourceProfile::merge);

Review Comment:
   This is expensive. We probably should maintain the available resources, or 
total allocated resources, in `PendingTaskManager`, like what we do with 
`FineGrainedTaskManagerRegistration`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java:
##########
@@ -46,4 +46,15 @@ ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
             TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
             BlockedTaskManagerChecker blockedTaskManagerChecker);
+
+    /**
+     * Try to make a release decision to release useless PendingTaskManagers 
and TaskManagers.
+     *
+     * @param taskManagerResourceInfoProvider provide the registered/pending 
resources of the
+     *     current cluster
+     * @return a {@link ResourceReleaseResult} based on the current status, 
which contains the
+     *     actions to take
+     */
+    ResourceReleaseResult tryReleaseUselessResources(

Review Comment:
   I think we should explain how this is different from 
`tryFulfillRequirements` in JavaDoc. I.e., only consider empty registered / 
pending workers, assume all requirements are fulfilled by registered / pending 
workers, more light weighted, etc.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java:
##########
@@ -46,4 +46,15 @@ ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
             TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
             BlockedTaskManagerChecker blockedTaskManagerChecker);
+
+    /**
+     * Try to make a release decision to release useless PendingTaskManagers 
and TaskManagers.
+     *
+     * @param taskManagerResourceInfoProvider provide the registered/pending 
resources of the
+     *     current cluster
+     * @return a {@link ResourceReleaseResult} based on the current status, 
which contains the
+     *     actions to take
+     */
+    ResourceReleaseResult tryReleaseUselessResources(

Review Comment:
   ```suggestion
       ResourceReleaseResult tryReleaseUnusedResources(
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -288,6 +290,7 @@ public void clearResourceRequirements(JobID jobId) {
         resourceTracker.notifyResourceRequirements(jobId, 
Collections.emptyList());
         if (resourceAllocator.isSupported()) {
             taskManagerTracker.clearPendingAllocationsOfJob(jobId);
+            checkTaskManagerReleasable();

Review Comment:
   It's a bit unclear to me what is the relationship between 
`checkTaskManagerReleasable` and `declareNeededResourcesWithDelay`.
   - The pattern appears 3 times that `declareNeededResourcesWithDelay` is 
called right after `checkTaskManagerReleasable`.
   - Inside `checkTaskManagerReleasable`, it also calls 
`declareNeededResourcesWithDelay`, in the if branch and in 
`releaseIdleTaskExecutor`.
   
   Would it be better that `checkTaskManagerReleasable` only mark releasable 
workers as unwanted, and make the declaration afterwards. That would make 
`releaseIdleTaskExecutorIfPossible` the only exception where we would need a 
declaration after the `canBeRelease` future completes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -110,9 +122,84 @@ public ResourceAllocationResult tryFulfillRequirements(
                         jobId, unfulfilledJobRequirements, pendingResources, 
resultBuilder);
             }
         }
+
+        tryFulFillRedundantResources(
+                totalResourceProfile.multiply(redundantTaskManagerNum),
+                registeredResources,
+                pendingResources,
+                resultBuilder);
+
         return resultBuilder.build();
     }
 
+    @Override
+    public ResourceReleaseResult tryReleaseUselessResources(
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+        List<TaskManagerInfo> taskManagersIdleTimeout = new ArrayList<>();
+        List<TaskManagerInfo> taskManagersNonTimeout = new ArrayList<>();
+        long currentTime = System.currentTimeMillis();
+        taskManagerResourceInfoProvider
+                .getRegisteredTaskManagers()
+                .forEach(
+                        taskManagerInfo -> {
+                            if (taskManagerInfo.isIdle()
+                                    && currentTime - 
taskManagerInfo.getIdleSince()
+                                            >= 
taskManagerTimeout.toMilliseconds()) {
+                                taskManagersIdleTimeout.add(taskManagerInfo);
+                            } else {
+                                taskManagersNonTimeout.add(taskManagerInfo);
+                            }
+                        });
+
+        List<PendingTaskManager> pendingTaskManagersNonUse = new ArrayList<>();
+        List<PendingTaskManager> pendingTaskManagersInuse = new ArrayList<>();
+        taskManagerResourceInfoProvider
+                .getPendingTaskManagers()
+                .forEach(
+                        pendingTaskManager -> {
+                            if (taskManagerResourceInfoProvider
+                                    .getPendingAllocationsOfPendingTaskManager(
+                                            
pendingTaskManager.getPendingTaskManagerId())
+                                    .isEmpty()) {
+                                
pendingTaskManagersNonUse.add(pendingTaskManager);
+                            } else {
+                                
pendingTaskManagersInuse.add(pendingTaskManager);
+                            }
+                        });
+
+        // summary total available resources of using (pending) task managers
+        ResourceProfile resourcesToKeep = ResourceProfile.ZERO;
+        ResourceProfile availableResourcesOfNonIdle =
+                getAvailableResourceOfTaskManagers(taskManagersNonTimeout);
+        resourcesToKeep = resourcesToKeep.merge(availableResourcesOfNonIdle);
+        ResourceProfile availableResourcesOfNonIdlePendingTaskManager =
+                getAvailableResourceOfPendingTaskManagers(
+                        pendingTaskManagersInuse, 
taskManagerResourceInfoProvider);
+        resourcesToKeep = 
resourcesToKeep.merge(availableResourcesOfNonIdlePendingTaskManager);
+

Review Comment:
   We might want to do some pruning here.
   - If the available resources from non-idle registered resources has already 
meet the redundant requirements, we won't need to calculated the available 
resources of pending resources.
   - If there's no idle registered / pending resources, we won't need to 
release anything.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -110,9 +122,84 @@ public ResourceAllocationResult tryFulfillRequirements(
                         jobId, unfulfilledJobRequirements, pendingResources, 
resultBuilder);
             }
         }
+
+        tryFulFillRedundantResources(
+                totalResourceProfile.multiply(redundantTaskManagerNum),
+                registeredResources,
+                pendingResources,
+                resultBuilder);
+
         return resultBuilder.build();
     }
 
+    @Override
+    public ResourceReleaseResult tryReleaseUselessResources(
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+        List<TaskManagerInfo> taskManagersIdleTimeout = new ArrayList<>();
+        List<TaskManagerInfo> taskManagersNonTimeout = new ArrayList<>();
+        long currentTime = System.currentTimeMillis();
+        taskManagerResourceInfoProvider
+                .getRegisteredTaskManagers()
+                .forEach(
+                        taskManagerInfo -> {
+                            if (taskManagerInfo.isIdle()
+                                    && currentTime - 
taskManagerInfo.getIdleSince()
+                                            >= 
taskManagerTimeout.toMilliseconds()) {
+                                taskManagersIdleTimeout.add(taskManagerInfo);
+                            } else {
+                                taskManagersNonTimeout.add(taskManagerInfo);
+                            }
+                        });
+
+        List<PendingTaskManager> pendingTaskManagersNonUse = new ArrayList<>();
+        List<PendingTaskManager> pendingTaskManagersInuse = new ArrayList<>();
+        taskManagerResourceInfoProvider
+                .getPendingTaskManagers()
+                .forEach(
+                        pendingTaskManager -> {
+                            if (taskManagerResourceInfoProvider
+                                    .getPendingAllocationsOfPendingTaskManager(

Review Comment:
   This is expensive. We are querying a map for each pending TM. I think we can 
modify `TaskManagerResourceInfoProvider` to return a map of pending TMs and 
their allocations (including empty ones) and iterate over the map entries.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -236,6 +323,52 @@ && canFulfillRequirement(effectiveProfile, 
remainResource)) {
         }
     }
 
+    private void tryFulFillRedundantResources(
+            ResourceProfile requiredRedundantResource,
+            List<InternalResourceInfo> availableRegisteredResources,
+            List<InternalResourceInfo> availablePendingResources,
+            ResourceAllocationResult.Builder resultBuilder) {
+        ResourceProfile totalAvailableResources =
+                Stream.concat(
+                                availableRegisteredResources.stream(),
+                                availablePendingResources.stream())
+                        .map(internalResourceInfo -> 
internalResourceInfo.availableProfile)
+                        .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
+
+        while (!canFulfillRequirement(requiredRedundantResource, 
totalAvailableResources)) {
+            PendingTaskManager pendingTaskManager =
+                    new PendingTaskManager(totalResourceProfile, 
numSlotsPerWorker);
+            resultBuilder.addPendingTaskManagerAllocate(pendingTaskManager);
+            totalAvailableResources = 
totalAvailableResources.merge(totalResourceProfile);

Review Comment:
   I noticed that `tryFulfillRequirementsForJobWithPendingResources` will add 
new pending resources to `availableResources`, so that the passed-in 
`availableResources` always reflect the latest status and can be used across 
the for-loop in `tryFulfillRequirements`, while `tryFulFillRedundantResources` 
does not keep `availableResources` up-to-date. This is understandable because 
we no longer need `availableResources` after calling 
`tryFulFillRedundantResources` in `tryFulfillRequirements`. However, it might 
be nicer to add a comment here about this, to make it easier for future 
developers to understand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to