KarmaGYZ commented on code in PR #19886:
URL: https://github.com/apache/flink/pull/19886#discussion_r890703027


##########
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##########
@@ -142,6 +142,13 @@ public class ResourceManagerOptions {
                                     + START_WORKER_MAX_FAILURE_RATE.key()
                                     + "') is reached.");
 
+    @Documentation.ExcludeFromDocumentation

Review Comment:
   Why do you introduce this config option? Do you mean to allow users to 
configure it? If so, why do you exclude it from the documentation?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java:
##########
@@ -407,13 +420,39 @@ public void freeSlot(SlotID slotId, AllocationID 
allocationId) {
         LOG.debug("Freeing slot {}.", slotId);
 
         slotTracker.notifyFree(slotId);
-        checkResourceRequirements();
+        checkResourceRequirementsWithDelay();
     }
 
     // 
---------------------------------------------------------------------------------------------
     // Requirement matching
     // 
---------------------------------------------------------------------------------------------
 
+    /**
+     * Depending on the implementation of {@link ResourceAllocationStrategy}, 
checking resource
+     * requirements and potentially making a re-allocation can be heavy. In 
order to cover more
+     * changes with each check, thus reduce the frequency of unnecessary 
re-allocations, the checks
+     * are performed with a slight delay.
+     */
+    private void checkResourceRequirementsWithDelay() {
+        if (requirementsCheckDelay.toMillis() <= 0) {

Review Comment:
   If we allow users to configure the delay period, we should also check it in 
`FineGrainedSlotManager`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java:
##########
@@ -1347,7 +1351,7 @@ public void 
testAllocationUpdatesIgnoredIfSlotMarkedAsPendingForOtherJob() throw
                         .setSlotTracker(slotTracker)
                         .buildAndStart(
                                 ResourceManagerId.generate(),
-                                
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                                Executors.directExecutor(),

Review Comment:
   Could you help me to understand why we need this change?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java:
##########
@@ -1424,6 +1428,37 @@ public void 
testReclaimInactiveSlotsOnClearRequirements() throws Exception {
         }
     }
 
+    @Test
+    public void testProcessResourceRequirementsWithDelay() throws Exception {

Review Comment:
   We'd better also test that the check method will be triggered only once in 
one delay period.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to