1996fanrui commented on code in PR #25134:
URL: https://github.com/apache/flink/pull/25134#discussion_r1701252201


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##########
@@ -202,10 +206,73 @@ protected void onReleaseTaskManager(ResourceCounter 
previouslyFulfilledRequireme
 
     @VisibleForTesting
     void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) {
+        if (pendingRequests.isEmpty()) {
+            return;
+        }
+
+        if (slotBatchAllocatable) {
+            newSlotsAvailableForSlotBatchAllocatable(newSlots);
+        } else {
+            newSlotsAvailableForNonSlotBatchAllocatable(newSlots);
+        }
+    }
+
+    private void newSlotsAvailableForSlotBatchAllocatable(
+            Collection<? extends PhysicalSlot> newSlots) {
+        log.debug("Received new available slots: {}", newSlots);
+
+        final FreeSlotInfoTracker freeSlotInfoTracker =
+                getDeclarativeSlotPool().getFreeSlotInfoTracker();
+
+        final int slotsNum = freeSlotInfoTracker.getAvailableSlots().size();
+        if (slotsNum < pendingRequests.size()) {
+            // Do nothing and waiting slots.
+            log.debug(
+                    "The number of available slots: {}, the required number of 
slots: {}, waiting for more available slots.",
+                    slotsNum,
+                    pendingRequests.size());
+            return;
+        }
+
+        final Collection<PhysicalSlot> availableSlots =
+                
freeSlotInfoTracker.getFreeSlotsInformation(PhysicalSlot.class);
+        final Collection<RequestSlotMatchingStrategy.RequestSlotMatch> 
requestSlotMatches =
+                requestSlotMatchingStrategy.matchRequestsAndSlots(
+                        availableSlots, pendingRequests.values());
+        if (requestSlotMatches.size() == pendingRequests.size()) {
+            reserveMatchedFreeSlots(requestSlotMatches);
+            fulfillMatchedSlots(requestSlotMatches);
+        } else if (requestSlotMatches.size() < pendingRequests.size()) {
+            // Do nothing and waiting slots.
+            log.debug(
+                    "Ignored the matched results: {}, pendingRequests: {}, 
waiting for more available slots.",
+                    requestSlotMatches,
+                    pendingRequests);
+        } else {
+            // For requestSlotMatches.size() > pendingRequests.size()
+            throw new IllegalStateException(
+                    "The number of matched slots is not equals to the 
pendingRequests.");
+        }
+    }
+
+    private void newSlotsAvailableForNonSlotBatchAllocatable(

Review Comment:
   ```suggestion
       private void newSlotsAvailableForDirectlyAllocatable(
   ```
   
   how about updating the name to `newSlotsAvailableForDirectlyAllocatable`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##########
@@ -221,7 +288,10 @@ void newSlotsAreAvailable(Collection<? extends 
PhysicalSlot> newSlots) {
                     slot.getAllocationId(),
                     pendingRequest.getResourceProfile());
         }
+    }
 
+    private void fulfillMatchedSlots(

Review Comment:
   nit: all callers call `reserveMatchedFreeSlots` and `fulfillMatchedSlots` 
together, why do we need 2 methods?
   
   As I understand, merge them into one method is enough: 
`reserveAndFulfillMatchedFreeSlots`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java:
##########
@@ -58,7 +58,19 @@ public interface FreeSlotInfoTracker {
      * @return a list of {@link SlotInfo} objects about all slots that are 
currently available in
      *     the slot pool.
      */
-    Collection<SlotInfo> getFreeSlotsInformation();
+    default Collection<SlotInfo> getFreeSlotsInformation() {
+        return getFreeSlotsInformation(SlotInfo.class);
+    }
+
+    /**
+     * Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in
+     * the slot pool.
+     *
+     * @param klass The class of the type of the elements in the result.
+     * @return a list of {@link SlotInfo} objects about all slots that are 
currently available in
+     *     the slot pool.
+     */
+    <T extends SlotInfo> Collection<T> getFreeSlotsInformation(Class<T> klass);

Review Comment:
   `PhysicalSlot` extends `SlotInfo`, so could we return `PhysicalSlot` here 
directly?  It doesn't effect all old code.
   
   If so, we could refactor `FreeSlotInfoTracker` to `FreePhysicalSlotTracker` 
in this PR(by a separate commit), and update all `SlotInfo` to `PhysicalSlot`. 
In addition, the comment of `FreeSlotInfoTracker` should be changed together, 
it not only supports `SlotSelectionStrategy`.
   
   Also, the comment of `getFreeSlotsInformation(Class<T> klass);` is copied 
from `getFreeSlotsInformation()`, it's a little unsuitable here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to