azagrebin commented on a change in pull request #11615:
URL: https://github.com/apache/flink/pull/11615#discussion_r414556677



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
##########
@@ -74,4 +81,20 @@
                        standaloneClusterStartupPeriodTime,
                        AkkaUtils.getTimeoutAsTime(configuration));
        }
+
+       /**
+        * Get the configuration for standalone ResourceManager, overwrite 
invalid configs.
+        *
+        * @param configuration configuration object
+        * @return the configuration for standalone ResourceManager
+        */
+       private static Configuration 
getConfigurationForStandaloneResourceManager(Configuration configuration) {

Review comment:
       ```suggestion
        private static Configuration removeMaxSlotNumberIfSet(Configuration 
configuration) {
   ```
   As this is the only thing which is done here now, I would suggest to be more 
explicit.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -650,14 +658,48 @@ private TaskManagerSlot 
createAndRegisterTaskManagerSlot(SlotID slotId, Resource
        @Nullable
        private PendingTaskManagerSlot 
findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
                for (PendingTaskManagerSlot pendingTaskManagerSlot : 
pendingSlots.values()) {
-                       if 
(pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) {
+                       if 
(isPendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, 
resourceProfile)) {
                                return pendingTaskManagerSlot;
                        }
                }
 
                return null;
        }
 
+       private boolean 
isPendingSlotExactlyMatchingResourceProfile(PendingTaskManagerSlot 
pendingTaskManagerSlot, ResourceProfile resourceProfile) {
+               return 
pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile);
+       }
+
+       private boolean isMaxSlotNumExceededAfterRegistration(SlotReport 
initialSlotReport) {
+               final int numReportedNewSlots = 
initialSlotReport.getNumSlotStatus();
+               final int numRegisteredSlots =  getNumberRegisteredSlots();
+               final int numPendingSlots = getNumberPendingTaskManagerSlots();
+
+               // check if the total number exceed before matching pending 
slot.
+               if (numRegisteredSlots + numPendingSlots + numReportedNewSlots 
<= maxSlotNum) {
+                       return false;
+               }
+
+               // check how many exceed slots could be consumed by pending 
slot.
+               final int totalSlotNum = numRegisteredSlots + numPendingSlots + 
getNumNonPendingReportedNewSlots(initialSlotReport);
+               return totalSlotNum > maxSlotNum;
+       }
+
+       private int getNumNonPendingReportedNewSlots(SlotReport slotReport) {
+               final Set<TaskManagerSlotId> matchingPendingSlots = new 
HashSet<>();
+
+               for (SlotStatus slotStatus : slotReport) {
+                       for (PendingTaskManagerSlot pendingTaskManagerSlot : 
pendingSlots.values()) {
+                               if 
(!matchingPendingSlots.contains(pendingTaskManagerSlot.getTaskManagerSlotId()) 
&&

Review comment:
       why do we need 
`!matchingPendingSlots.contains(pendingTaskManagerSlot.getTaskManagerSlotId())`?
 it does not look like `SlotReport` can contain duplicated `TaskManagerSlotId`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -390,6 +392,12 @@ public void registerTaskManager(final 
TaskExecutorConnection taskExecutorConnect
                if 
(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
                        
reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
                } else {
+                       if 
(isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
+                               LOG.info("The total number of slots exceeds the 
max limitation {}, release the excess resource.", maxSlotNum);

Review comment:
       If we never allocate more than max in active RMs (`allocateResource`), 
can this happen?
   Except standalone where we say we do not want this limitation
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to