huwh commented on code in PR #23230:
URL: https://github.com/apache/flink/pull/23230#discussion_r1297326609


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceReconcileResult.java:
##########
@@ -22,17 +22,25 @@
 import java.util.List;
 
 /** Contains the results of the {@link ResourceAllocationStrategy}. */
-public class ResourceReleaseResult {
+public class ResourceReconcileResult {
+
+    private final List<PendingTaskManager> pendingTaskManagersToAdd;

Review Comment:
   It's better to use pendingTaskManagersToAllocate.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -812,29 +812,34 @@ public Collection<SlotInfo> 
getAllocatedSlotsOf(InstanceID instanceID) {
     // Internal periodic check methods
     // 
---------------------------------------------------------------------------------------------
 
-    private void tryReleaseUnusedTaskManagers() {
-        if (checkTaskManagerReleasable()) {
+    private void checkClusterReconciliation() {
+        if (checkResourcesNeedReconcile()) {
             // only declare on needed.
             declareNeededResourcesWithDelay();
         }
     }
 
-    private boolean checkTaskManagerReleasable() {
-        ResourceReleaseResult releaseResult =
-                
resourceAllocationStrategy.tryReleaseUnusedResources(taskManagerTracker);
+    private boolean checkResourcesNeedReconcile() {
+        ResourceReconcileResult reconcileResult =
+                
resourceAllocationStrategy.tryReconcileClusterResources(taskManagerTracker);
 
-        releaseResult.getPendingTaskManagersToRelease().stream()
+        reconcileResult.getPendingTaskManagersToRelease().stream()
                 .map(PendingTaskManager::getPendingTaskManagerId)
                 .forEach(taskManagerTracker::removePendingTaskManager);
 
-        for (TaskManagerInfo taskManagerToRelease : 
releaseResult.getTaskManagersToRelease()) {
+        for (TaskManagerInfo taskManagerToRelease : 
reconcileResult.getTaskManagersToRelease()) {
             if (waitResultConsumedBeforeRelease) {
                 releaseIdleTaskExecutorIfPossible(taskManagerToRelease);
             } else {
                 releaseIdleTaskExecutor(taskManagerToRelease.getInstanceId());
             }
         }
-        return releaseResult.needRelease();
+
+        reconcileResult
+                .getPendingTaskManagersToAdd()
+                .forEach(taskManagerTracker::addPendingTaskManager);

Review Comment:
   We should use #allocateResource to ensure the maximum resource limitation is 
not broken.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceReconcileResult.java:
##########
@@ -41,18 +49,26 @@ public List<TaskManagerInfo> getTaskManagersToRelease() {
         return taskManagersToRelease;
     }
 
-    public boolean needRelease() {
-        return !pendingTaskManagersToRelease.isEmpty() || 
!taskManagersToRelease.isEmpty();
+    public boolean needReconcile() {
+        return pendingTaskManagersToRelease.size() > 0
+                || taskManagersToRelease.size() > 0
+                || pendingTaskManagersToAdd.size() > 0;
     }
 
     public static Builder builder() {
         return new Builder();
     }
 
     public static class Builder {

Review Comment:
   Javadoc was not added in the previous commits. Could you help to adding it?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java:
##########
@@ -440,13 +441,52 @@ void 
testUnusedResourcesShouldBeReleasedIfNonIdleResourceIsEnough() {
                         .build();
 
         DefaultResourceAllocationStrategy strategy = createStrategy(1);
-        ResourceReleaseResult result =
-                
strategy.tryReleaseUnusedResources(taskManagerResourceInfoProvider);
+        ResourceReconcileResult result =
+                
strategy.tryReconcileClusterResources(taskManagerResourceInfoProvider);
         assertThat(result.getPendingTaskManagersToRelease())
                 .containsExactly(pendingTaskManagerIdle);
         
assertThat(result.getTaskManagersToRelease()).containsExactly(taskManagerIdle);
     }
 
+    @Test
+    void testRedundantResourceShouldBeFulfilled() {
+        final TaskManagerInfo taskManagerInUse =
+                new TestingTaskManagerInfo(
+                        DEFAULT_SLOT_RESOURCE.multiply(5),
+                        DEFAULT_SLOT_RESOURCE.multiply(2),
+                        DEFAULT_SLOT_RESOURCE);
+
+        final TestingTaskManagerInfo taskManagerIdle =
+                new TestingTaskManagerInfo(
+                        DEFAULT_SLOT_RESOURCE.multiply(5),
+                        DEFAULT_SLOT_RESOURCE.multiply(5),
+                        DEFAULT_SLOT_RESOURCE);
+        taskManagerIdle.setIdleSince(System.currentTimeMillis() - 10);
+
+        final PendingTaskManager pendingTaskManagerIdle =
+                new PendingTaskManager(DEFAULT_SLOT_RESOURCE.multiply(5), 
NUM_OF_SLOTS);
+
+        final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
+                TestingTaskManagerResourceInfoProvider.newBuilder()
+                        .setRegisteredTaskManagersSupplier(
+                                () -> Arrays.asList(taskManagerInUse, 
taskManagerIdle))
+                        .setPendingTaskManagersSupplier(
+                                () -> 
Collections.singletonList(pendingTaskManagerIdle))
+                        .build();
+
+        DefaultResourceAllocationStrategy strategy = createStrategy(4);
+        ResourceReconcileResult result =
+                
strategy.tryReconcileClusterResources(taskManagerResourceInfoProvider);
+
+        // pending task manager should reserved for redundant
+        assertThat(result.getPendingTaskManagersToRelease()).isEmpty();
+        // both in use and idle task manager should be reserved for redundant
+        assertThat(result.getTaskManagersToRelease()).isEmpty();
+        // add two more pending task manager for redundant since total 
available resource equals
+        // 12(2+5+5)
+        assertThat(result.getPendingTaskManagersToAdd().size()).isEqualTo(2);

Review Comment:
   `assertThat(result.getPendingTaskManagersToAdd()).hasSize(2);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to