GJL commented on a change in pull request #11615:
URL: https://github.com/apache/flink/pull/11615#discussion_r412252028



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
##########
@@ -74,4 +81,21 @@
                        standaloneClusterStartupPeriodTime,
                        AkkaUtils.getTimeoutAsTime(configuration));
        }
+
+       /**
+        * Get the configuration for standalone ResourceManager, overwrite 
invalid configs.
+        *
+        * @param configuration configuration object
+        * @return the configuration for standalone ResourceManager
+        */
+       private static Configuration 
getConfigurationForStandaloneResourceManager(Configuration configuration) {
+               final Configuration copiedConfig = new 
Configuration(configuration);
+               if 
(configuration.contains(ResourceManagerOptions.MAX_SLOT_NUM)) {
+                       // The max slot limit should not take effect for 
standalone cluster, we overwrite the configure in case user
+                       // sets this value by mistake.
+                       LOG.warn("The {} should not take effect for standalone 
cluster, If configured, it will be ignored.", 
ResourceManagerOptions.MAX_SLOT_NUM.key());

Review comment:
       Maybe 
   ```suggestion
                        LOG.warn("Config option {} will be ignored in 
standalone mode", ResourceManagerOptions.MAX_SLOT_NUM.key());
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -646,14 +658,48 @@ private TaskManagerSlot 
createAndRegisterTaskManagerSlot(SlotID slotId, Resource
        @Nullable
        private PendingTaskManagerSlot 
findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
                for (PendingTaskManagerSlot pendingTaskManagerSlot : 
pendingSlots.values()) {
-                       if 
(pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) {
+                       if 
(pendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, 
resourceProfile)) {
                                return pendingTaskManagerSlot;
                        }
                }
 
                return null;
        }
 
+       private boolean 
pendingSlotExactlyMatchingResourceProfile(PendingTaskManagerSlot 
pendingTaskManagerSlot, ResourceProfile resourceProfile) {
+               return 
pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile);
+       }
+
+       private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport 
slotReport) {

Review comment:
       Maybe `slotReport` -> `initialSlotReport` to make it more obvious when 
this method is called.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -646,14 +658,48 @@ private TaskManagerSlot 
createAndRegisterTaskManagerSlot(SlotID slotId, Resource
        @Nullable
        private PendingTaskManagerSlot 
findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
                for (PendingTaskManagerSlot pendingTaskManagerSlot : 
pendingSlots.values()) {
-                       if 
(pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) {
+                       if 
(pendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, 
resourceProfile)) {
                                return pendingTaskManagerSlot;
                        }
                }
 
                return null;
        }
 
+       private boolean 
pendingSlotExactlyMatchingResourceProfile(PendingTaskManagerSlot 
pendingTaskManagerSlot, ResourceProfile resourceProfile) {
+               return 
pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile);
+       }
+
+       private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport 
slotReport) {

Review comment:
       Maybe prefix boolean methods with `isXXX`
   
   ```suggestion
        private boolean isMaxSlotNumExceededAfterRegistration(SlotReport 
initialSlotReport) {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -804,16 +850,31 @@ private void 
fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequ
                return Optional.empty();
        }
 
-       private boolean isFulfillableByRegisteredSlots(ResourceProfile 
resourceProfile) {
+       private boolean isFulfillableByRegisteredOrPendingSlots(ResourceProfile 
resourceProfile) {
                for (TaskManagerSlot slot : slots.values()) {
                        if 
(slot.getResourceProfile().isMatching(resourceProfile)) {
                                return true;
                        }
                }
+
+               for (PendingTaskManagerSlot slot : pendingSlots.values()) {

Review comment:
       I am not very familiar with the `SlotManager`. Can you explain me why we 
did not do this check/loop before?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -386,6 +392,12 @@ public void registerTaskManager(final 
TaskExecutorConnection taskExecutorConnect
                if 
(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
                        
reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
                } else {
+                       if 
(totalSlotNumExceedMaxLimitAfterRegister(initialSlotReport)) {
+                               LOG.warn("The total number of slots exceeds the 
max limitation {}, release the excess resource.", maxSlotNum);

Review comment:
       Should this really be a warning, or should it be info?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -646,14 +658,48 @@ private TaskManagerSlot 
createAndRegisterTaskManagerSlot(SlotID slotId, Resource
        @Nullable
        private PendingTaskManagerSlot 
findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
                for (PendingTaskManagerSlot pendingTaskManagerSlot : 
pendingSlots.values()) {
-                       if 
(pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) {
+                       if 
(pendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, 
resourceProfile)) {
                                return pendingTaskManagerSlot;
                        }
                }
 
                return null;
        }
 
+       private boolean 
pendingSlotExactlyMatchingResourceProfile(PendingTaskManagerSlot 
pendingTaskManagerSlot, ResourceProfile resourceProfile) {
+               return 
pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile);
+       }
+
+       private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport 
slotReport) {
+               final int numReportedNewSlots = slotReport.getNumSlotStatus();
+               final int numRegisteredSlots =  getNumberRegisteredSlots();
+               final int numPendingSlots = getNumberPendingTaskManagerSlots();
+
+               // check if the total number exceed before matching pending 
slot.
+               if (numRegisteredSlots + numPendingSlots + numReportedNewSlots 
<= maxSlotNum) {

Review comment:
       Is this check to improve performance and avoid the nested for-loop in 
`getNumNonPendingReportedNewSlots()`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
##########
@@ -74,4 +81,21 @@
                        standaloneClusterStartupPeriodTime,
                        AkkaUtils.getTimeoutAsTime(configuration));
        }
+
+       /**
+        * Get the configuration for standalone ResourceManager, overwrite 
invalid configs.
+        *
+        * @param configuration configuration object
+        * @return the configuration for standalone ResourceManager
+        */
+       private static Configuration 
getConfigurationForStandaloneResourceManager(Configuration configuration) {
+               final Configuration copiedConfig = new 
Configuration(configuration);
+               if 
(configuration.contains(ResourceManagerOptions.MAX_SLOT_NUM)) {
+                       // The max slot limit should not take effect for 
standalone cluster, we overwrite the configure in case user
+                       // sets this value by mistake.
+                       LOG.warn("The {} should not take effect for standalone 
cluster, If configured, it will be ignored.", 
ResourceManagerOptions.MAX_SLOT_NUM.key());
+                       
copiedConfig.removeConfig(ResourceManagerOptions.MAX_SLOT_NUM);

Review comment:
       Optional; You don't need to use `.contains()` because `removeConfig()` 
returns a boolean that is `true` if the config was changed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to