xintongsong commented on a change in pull request #13347:
URL: https://github.com/apache/flink/pull/13347#discussion_r484655148



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##########
@@ -122,8 +128,8 @@ private InternalContainerResource 
createAndMapContainerResource(final WorkerReso
                final TaskExecutorProcessSpec taskExecutorProcessSpec =
                        
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
                final InternalContainerResource internalContainerResource = new 
InternalContainerResource(
-                       
normalize(taskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes(), 
minMemMB),
-                       
normalize(taskExecutorProcessSpec.getCpuCores().getValue().intValue(), 
minVcore),
+                       
normalize(taskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes(), 
unitMemMB),
+                       
normalize(taskExecutorProcessSpec.getCpuCores().getValue().intValue(), 
unitVcore),

Review comment:
       This should not work.
   The method `normalize` should take both unit and min values into 
consideration. With the current implementation, there's a change that the 
normalized resource is smaller than min. 

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
##########
@@ -105,6 +106,14 @@
 
        private final YarnConfiguration yarnConfig;
 
+       private static final String RM_FAIR_SCHEDULER = 
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler";

Review comment:
       Not only `FairScheduler`, but also `SLSFairScheduler` is using the 
increment allocation configurations.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
##########
@@ -105,6 +106,14 @@
 
        private final YarnConfiguration yarnConfig;
 
+       private static final String RM_FAIR_SCHEDULER = 
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler";
+       private static final String RM_SCHEDULER_INCREMENT_ALLOCATION_MB = 
"yarn.scheduler.increment-allocation-mb";
+       private static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB = 
1024;
+       private static final String RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = 
"yarn.scheduler.increment-allocation-vcores";
+       private static final int 
DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = 1;
+       private static final String RM_RESOURCE_TYPES_INCREMENT_ALLOCATION_MB = 
"yarn.resource-types.memory-mb.increment-allocation";
+       private static final String 
RM_RESOURCE_TYPES_INCREMENT_ALLOCATION_VCORES = 
"yarn.resource-types.vcores.increment-allocation";

Review comment:
       These constants are not related to the main responsibility of 
`YarnResourceManager`. Let's try to move them to somewhere else and keep 
`YarnResourceManager` simple and clear.
   
   I would suggest to introduce a public static util method in `Utils` for 
creating the `WorkerSpecContainerResourceAdapter`, something like the following.
   ```
   public static createWorkerSpecContainerResourceAdapter(Configuration 
flinkConfig, YarnConfiguration yarnConfig) {
        // ...
   }
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##########
@@ -122,8 +128,8 @@ private InternalContainerResource 
createAndMapContainerResource(final WorkerReso
                final TaskExecutorProcessSpec taskExecutorProcessSpec =
                        
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
                final InternalContainerResource internalContainerResource = new 
InternalContainerResource(
-                       
normalize(taskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes(), 
minMemMB),
-                       
normalize(taskExecutorProcessSpec.getCpuCores().getValue().intValue(), 
minVcore),
+                       
normalize(taskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes(), 
unitMemMB),
+                       
normalize(taskExecutorProcessSpec.getCpuCores().getValue().intValue(), 
unitVcore),

Review comment:
       I think this also reveals that we have failed cover such scenarios with 
our test cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to