1996fanrui commented on code in PR #25134:
URL: https://github.com/apache/flink/pull/25134#discussion_r1701252201
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##########
@@ -202,10 +206,73 @@ protected void onReleaseTaskManager(ResourceCounter
previouslyFulfilledRequireme
@VisibleForTesting
void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) {
+ if (pendingRequests.isEmpty()) {
+ return;
+ }
+
+ if (slotBatchAllocatable) {
+ newSlotsAvailableForSlotBatchAllocatable(newSlots);
+ } else {
+ newSlotsAvailableForNonSlotBatchAllocatable(newSlots);
+ }
+ }
+
+ private void newSlotsAvailableForSlotBatchAllocatable(
+ Collection<? extends PhysicalSlot> newSlots) {
+ log.debug("Received new available slots: {}", newSlots);
+
+ final FreeSlotInfoTracker freeSlotInfoTracker =
+ getDeclarativeSlotPool().getFreeSlotInfoTracker();
+
+ final int slotsNum = freeSlotInfoTracker.getAvailableSlots().size();
+ if (slotsNum < pendingRequests.size()) {
+ // Do nothing and waiting slots.
+ log.debug(
+ "The number of available slots: {}, the required number of
slots: {}, waiting for more available slots.",
+ slotsNum,
+ pendingRequests.size());
+ return;
+ }
+
+ final Collection<PhysicalSlot> availableSlots =
+
freeSlotInfoTracker.getFreeSlotsInformation(PhysicalSlot.class);
+ final Collection<RequestSlotMatchingStrategy.RequestSlotMatch>
requestSlotMatches =
+ requestSlotMatchingStrategy.matchRequestsAndSlots(
+ availableSlots, pendingRequests.values());
+ if (requestSlotMatches.size() == pendingRequests.size()) {
+ reserveMatchedFreeSlots(requestSlotMatches);
+ fulfillMatchedSlots(requestSlotMatches);
+ } else if (requestSlotMatches.size() < pendingRequests.size()) {
+ // Do nothing and waiting slots.
+ log.debug(
+ "Ignored the matched results: {}, pendingRequests: {},
waiting for more available slots.",
+ requestSlotMatches,
+ pendingRequests);
+ } else {
+ // For requestSlotMatches.size() > pendingRequests.size()
+ throw new IllegalStateException(
+ "The number of matched slots is not equals to the
pendingRequests.");
+ }
+ }
+
+ private void newSlotsAvailableForNonSlotBatchAllocatable(
Review Comment:
```suggestion
private void newSlotsAvailableForDirectlyAllocatable(
```
how about updating the name to `newSlotsAvailableForDirectlyAllocatable`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##########
@@ -221,7 +288,10 @@ void newSlotsAreAvailable(Collection<? extends
PhysicalSlot> newSlots) {
slot.getAllocationId(),
pendingRequest.getResourceProfile());
}
+ }
+ private void fulfillMatchedSlots(
Review Comment:
nit: all callers call `reserveMatchedFreeSlots` and `fulfillMatchedSlots`
together, why do we need 2 methods?
As I understand, merge them into one method is enough:
`reserveAndFulfillMatchedFreeSlots`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/FreeSlotInfoTracker.java:
##########
@@ -58,7 +58,19 @@ public interface FreeSlotInfoTracker {
* @return a list of {@link SlotInfo} objects about all slots that are
currently available in
* the slot pool.
*/
- Collection<SlotInfo> getFreeSlotsInformation();
+ default Collection<SlotInfo> getFreeSlotsInformation() {
+ return getFreeSlotsInformation(SlotInfo.class);
+ }
+
+ /**
+ * Returns a list of {@link SlotInfo} objects about all slots that are
currently available in
+ * the slot pool.
+ *
+ * @param klass The class of the type of the elements in the result.
+ * @return a list of {@link SlotInfo} objects about all slots that are
currently available in
+ * the slot pool.
+ */
+ <T extends SlotInfo> Collection<T> getFreeSlotsInformation(Class<T> klass);
Review Comment:
`PhysicalSlot` extends `SlotInfo`, so could we return `PhysicalSlot` here
directly? It doesn't effect all old code.
If so, we could refactor `FreeSlotInfoTracker` to `FreePhysicalSlotTracker`
in this PR(by a separate commit), and update all `SlotInfo` to `PhysicalSlot`.
In addition, the comment of `FreeSlotInfoTracker` should be changed together,
it not only supports `SlotSelectionStrategy`.
Also, the comment of `getFreeSlotsInformation(Class<T> klass);` is copied
from `getFreeSlotsInformation()`, it's a little unsuitable here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]