azagrebin commented on a change in pull request #11615:
URL: https://github.com/apache/flink/pull/11615#discussion_r415573487
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -650,14 +658,48 @@ private TaskManagerSlot
createAndRegisterTaskManagerSlot(SlotID slotId, Resource
@Nullable
private PendingTaskManagerSlot
findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
for (PendingTaskManagerSlot pendingTaskManagerSlot :
pendingSlots.values()) {
- if
(pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) {
+ if
(isPendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot,
resourceProfile)) {
return pendingTaskManagerSlot;
}
}
return null;
}
+ private boolean
isPendingSlotExactlyMatchingResourceProfile(PendingTaskManagerSlot
pendingTaskManagerSlot, ResourceProfile resourceProfile) {
+ return
pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile);
+ }
+
+ private boolean isMaxSlotNumExceededAfterRegistration(SlotReport
initialSlotReport) {
+ final int numReportedNewSlots =
initialSlotReport.getNumSlotStatus();
+ final int numRegisteredSlots = getNumberRegisteredSlots();
+ final int numPendingSlots = getNumberPendingTaskManagerSlots();
+
+ // check if the total number exceed before matching pending
slot.
+ if (numRegisteredSlots + numPendingSlots + numReportedNewSlots
<= maxSlotNum) {
+ return false;
+ }
+
+ // check how many exceed slots could be consumed by pending
slot.
+ final int totalSlotNum = numRegisteredSlots + numPendingSlots +
getNumNonPendingReportedNewSlots(initialSlotReport);
+ return totalSlotNum > maxSlotNum;
+ }
+
+ private int getNumNonPendingReportedNewSlots(SlotReport slotReport) {
+ final Set<TaskManagerSlotId> matchingPendingSlots = new
HashSet<>();
+
+ for (SlotStatus slotStatus : slotReport) {
+ for (PendingTaskManagerSlot pendingTaskManagerSlot :
pendingSlots.values()) {
+ if
(!matchingPendingSlots.contains(pendingTaskManagerSlot.getTaskManagerSlotId())
&&
Review comment:
true, sorry, I confused it with `slotReport`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]