xintongsong commented on code in PR #22305:
URL: https://github.com/apache/flink/pull/22305#discussion_r1181534754


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -110,9 +121,93 @@ public ResourceAllocationResult tryFulfillRequirements(
                         jobId, unfulfilledJobRequirements, pendingResources, 
resultBuilder);
             }
         }
+
+        // Unlike tryFulfillRequirementsForJobWithPendingResources, which 
updates pendingResources
+        // to the latest state after a new PendingTaskManager is created,
+        // tryFulFillRedundantResources will not update pendingResources even 
after new
+        // PendingTaskManagers are created.
+        // This is because the pendingResources are no longer needed 
afterwards.
+        tryFulFillRedundantResources(
+                totalResourceProfile.multiply(redundantTaskManagerNum),
+                registeredResources,
+                pendingResources,
+                resultBuilder);
+
         return resultBuilder.build();
     }
 
+    @Override
+    public ResourceReleaseResult tryReleaseUnusedResources(
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+        ResourceProfile requiredRedundantResources =
+                totalResourceProfile.multiply(redundantTaskManagerNum);
+        ResourceReleaseResult.Builder builder = 
ResourceReleaseResult.builder();
+
+        List<TaskManagerInfo> taskManagersIdleTimeout = new ArrayList<>();
+        List<TaskManagerInfo> taskManagersNonTimeout = new ArrayList<>();
+        long currentTime = System.currentTimeMillis();
+        taskManagerResourceInfoProvider
+                .getRegisteredTaskManagers()
+                .forEach(
+                        taskManagerInfo -> {
+                            if (taskManagerInfo.isIdle()
+                                    && currentTime - 
taskManagerInfo.getIdleSince()
+                                            >= 
taskManagerTimeout.toMilliseconds()) {
+                                taskManagersIdleTimeout.add(taskManagerInfo);
+                            } else {
+                                taskManagersNonTimeout.add(taskManagerInfo);
+                            }
+                        });
+
+        List<PendingTaskManager> pendingTaskManagersNonUse = new ArrayList<>();
+        List<PendingTaskManager> pendingTaskManagersInuse = new ArrayList<>();
+        taskManagerResourceInfoProvider
+                .getPendingTaskManagers()
+                .forEach(
+                        pendingTaskManager -> {
+                            if 
(pendingTaskManager.getPendingSlotAllocationRecords().isEmpty()) {
+                                
pendingTaskManagersNonUse.add(pendingTaskManager);
+                            } else {
+                                
pendingTaskManagersInuse.add(pendingTaskManager);
+                            }
+                        });
+
+        if (taskManagersIdleTimeout.isEmpty() && 
pendingTaskManagersNonUse.isEmpty()) {
+            // short-cut for nothing to release
+            return builder.build();
+        }
+
+        ResourceProfile resourcesToKeep = ResourceProfile.ZERO;
+
+        // check whether available resources of used (pending) task manager is 
enough.
+        ResourceProfile availableResourcesOfNonIdle =
+                getAvailableResourceOfTaskManagers(taskManagersNonTimeout);
+        resourcesToKeep = resourcesToKeep.merge(availableResourcesOfNonIdle);
+        if (!canFulfillRequirement(requiredRedundantResources, 
resourcesToKeep)) {
+            ResourceProfile availableResourcesOfNonIdlePendingTaskManager =
+                    
getAvailableResourceOfPendingTaskManagers(pendingTaskManagersInuse);
+            resourcesToKeep = 
resourcesToKeep.merge(availableResourcesOfNonIdlePendingTaskManager);
+        }
+
+        // try reserve or release unused (pending) task managers
+        for (TaskManagerInfo taskManagerInfo : taskManagersIdleTimeout) {
+            if (canFulfillRequirement(requiredRedundantResources, 
resourcesToKeep)) {

Review Comment:
   Minor: we can use a local variable to skip unnecessary 
`canFulfillRequirement` calls. `if (redundantFulfilled || 
canFulfillRequirement(xxx, xxx))`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java:
##########
@@ -46,4 +46,17 @@ ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
             TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
             BlockedTaskManagerChecker blockedTaskManagerChecker);
+
+    /**
+     * Try to make a release decision to release useless PendingTaskManagers 
and TaskManagers. This

Review Comment:
   ```suggestion
        * Try to make a release decision to release unused PendingTaskManagers 
and TaskManagers. This
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java:
##########
@@ -135,19 +135,25 @@ class TaskExecutorManager implements AutoCloseable {
         this.unWantedWorkers = new HashSet<>();
         this.resourceAllocator = Preconditions.checkNotNull(resourceAllocator);
         this.mainThreadExecutor = mainThreadExecutor;
-        taskManagerTimeoutsAndRedundancyCheck =
-                scheduledExecutor.scheduleWithFixedDelay(
-                        () ->
-                                mainThreadExecutor.execute(
-                                        
this::checkTaskManagerTimeoutsAndRedundancy),
-                        0L,
-                        taskManagerTimeout.toMilliseconds(),
-                        TimeUnit.MILLISECONDS);
+        if (resourceAllocator.isSupported()) {
+            taskManagerTimeoutsAndRedundancyCheck =
+                    scheduledExecutor.scheduleWithFixedDelay(
+                            () ->
+                                    mainThreadExecutor.execute(
+                                            
this::checkTaskManagerTimeoutsAndRedundancy),
+                            0L,
+                            taskManagerTimeout.toMilliseconds(),
+                            TimeUnit.MILLISECONDS);
+        } else {
+            taskManagerTimeoutsAndRedundancyCheck = null;

Review Comment:
   Let's mark `taskManagerTimeoutsAndRedundancyCheck` as `@Nullable`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java:
##########
@@ -46,4 +46,17 @@ ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
             TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
             BlockedTaskManagerChecker blockedTaskManagerChecker);
+
+    /**
+     * Try to make a release decision to release useless PendingTaskManagers 
and TaskManagers. This

Review Comment:
   And there seems to be a few more occurrences of `useless`. Please replace 
all of them with `unused` or `unwanted`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to