azagrebin commented on a change in pull request #11615:
URL: https://github.com/apache/flink/pull/11615#discussion_r415579887



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
##########
@@ -1301,16 +1305,15 @@ private SlotRequest createSlotRequest(JobID jobId, 
ResourceProfile resourceProfi
        }
 
        private SlotManagerImpl createSlotManager(ResourceManagerId 
resourceManagerId, ResourceActions resourceManagerActions) {
-               return createSlotManager(resourceManagerId, 
resourceManagerActions, 1);
+               return createSlotManager(resourceManagerId, 
resourceManagerActions, new Configuration());
        }
 
-       private SlotManagerImpl createSlotManager(ResourceManagerId 
resourceManagerId, ResourceActions resourceManagerActions, int 
numSlotsPerWorker) {
-               SlotManagerImpl slotManager = SlotManagerBuilder.newBuilder()
+       private SlotManagerImpl createSlotManager(ResourceManagerId 
resourceManagerId, ResourceActions resourceManagerActions, Configuration 
configuration) {
+               return SlotManagerBuilder.newBuilder()
                        .setDefaultWorkerResourceSpec(WORKER_RESOURCE_SPEC)
-                       .setNumSlotsPerWorker(numSlotsPerWorker)
-                       .build();
-               slotManager.start(resourceManagerId, 
Executors.directExecutor(), resourceManagerActions);
-               return slotManager;
+                       
.setNumSlotsPerWorker(configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
+                       
.setMaxSlotNum(configuration.getInteger(ResourceManagerOptions.MAX_SLOT_NUM))
+                       .buildWithDirectExec(resourceManagerId, 
Executors.directExecutor(), resourceManagerActions);

Review comment:
       ```suggestion
                        .buildAndStartWithDirectExec(resourceManagerId, 
Executors.directExecutor(), resourceManagerActions);
   ```
   `buildWithDirectExec` already has `WithDirectExec` in its name so I would 
not pass `Executors.directExecutor()` to it but already have it inside 
`buildWithDirectExec`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -808,16 +850,31 @@ private void 
fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequ
                return Optional.empty();
        }
 
-       private boolean isFulfillableByRegisteredSlots(ResourceProfile 
resourceProfile) {
+       private boolean isFulfillableByRegisteredOrPendingSlots(ResourceProfile 
resourceProfile) {
                for (TaskManagerSlot slot : slots.values()) {
                        if 
(slot.getResourceProfile().isMatching(resourceProfile)) {
                                return true;
                        }
                }
+
+               for (PendingTaskManagerSlot slot : pendingSlots.values()) {
+                       if 
(slot.getResourceProfile().isMatching(resourceProfile)) {
+                               return true;
+                       }
+               }
+
                return false;
        }
 
        private Optional<PendingTaskManagerSlot> 
allocateResource(ResourceProfile requestedSlotResourceProfile) {
+               final int numRegisteredSlots =  getNumberRegisteredSlots();
+               final int numPendingSlots = getNumberPendingTaskManagerSlots();
+               if (numPendingSlots + numRegisteredSlots + numSlotsPerWorker > 
maxSlotNum) {
+                       LOG.warn("Could not allocate {} more slots. The number 
of registered and pending slots is {}, while the maximum is {}.",
+                               numSlotsPerWorker, numPendingSlots + 
numRegisteredSlots, maxSlotNum);
+                       return Optional.empty();
+               }

Review comment:
       nit: potentially this can be deduplicated with 
`isMaxSlotNumExceededAfterRegistration` if we make it something like:
   `isMaxSlotNumExceededAfterRegistration(int maxNewSlotsNumber, 
Supplier<Integer> newSlotsNumberExact)`
   but maybe too complicated.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -804,16 +850,31 @@ private void 
fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequ
                return Optional.empty();
        }
 
-       private boolean isFulfillableByRegisteredSlots(ResourceProfile 
resourceProfile) {
+       private boolean isFulfillableByRegisteredOrPendingSlots(ResourceProfile 
resourceProfile) {
                for (TaskManagerSlot slot : slots.values()) {
                        if 
(slot.getResourceProfile().isMatching(resourceProfile)) {
                                return true;
                        }
                }
+
+               for (PendingTaskManagerSlot slot : pendingSlots.values()) {

Review comment:
       alright, makes sense

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
##########
@@ -1301,13 +1305,14 @@ private SlotRequest createSlotRequest(JobID jobId, 
ResourceProfile resourceProfi
        }
 
        private SlotManagerImpl createSlotManager(ResourceManagerId 
resourceManagerId, ResourceActions resourceManagerActions) {
-               return createSlotManager(resourceManagerId, 
resourceManagerActions, 1);
+               return createSlotManager(resourceManagerId, 
resourceManagerActions, new Configuration());
        }
 
-       private SlotManagerImpl createSlotManager(ResourceManagerId 
resourceManagerId, ResourceActions resourceManagerActions, int 
numSlotsPerWorker) {
+       private SlotManagerImpl createSlotManager(ResourceManagerId 
resourceManagerId, ResourceActions resourceManagerActions, Configuration 
configuration) {

Review comment:
       You could have a method to return a common builder for this test suite 
`SlotManagerBuilder.newBuilder().setDefaultWorkerResourceSpec(WORKER_RESOURCE_SPEC)`.
   
   I would then use explicitly other setters and `buildWithDirectExec`.
   The reason we use builders is to improve readability to avoid methods with a 
lot of parameters which are not named in java. Setters basically make them 
named for the price of some duplication and verbosity.
   
   I would also agree to keep the existing `createSlotManager` with 3 args 
(`NUM_TASK_SLOTS`) but adding a 4th arg for `MAX_SLOT_NUM` may be too much 
indeed.
   
   imo, passing `Configuration` makes some implicit contract which is not 
obvious that it is used only for `NUM_TASK_SLOTS` and `MAX_SLOT_NUM`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to