xintongsong commented on a change in pull request #13592:
URL: https://github.com/apache/flink/pull/13592#discussion_r505196788



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -442,38 +444,25 @@ private RegisterApplicationMasterResponse 
registerApplicationMaster() throws Exc
                return 
resourceManagerClient.registerApplicationMaster(rpcAddress, restPort, 
webInterfaceUrl);
        }
 
-       private void getContainersFromPreviousAttempts(final 
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
+       private int getContainersFromPreviousAttempts(final 
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
                final List<Container> containersFromPreviousAttempts =
                        
registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
                final List<YarnWorkerNode> recoveredWorkers = new ArrayList<>();
 
                log.info("Recovered {} containers from previous attempts 
({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
 
+               int maxPriority = 0;
                for (Container container : containersFromPreviousAttempts) {
                        final YarnWorkerNode worker = new 
YarnWorkerNode(container, getContainerResourceId(container));
                        recoveredWorkers.add(worker);
+                       maxPriority = 
Math.max(container.getPriority().getPriority(), maxPriority);

Review comment:
       > I guess there is no easy way to calculate the TaskExecutorProcessSpec 
from a Container to restore the old priority to TaskExecutorProcessSpec mapping?
   
   Unfortunately no.
   
   > Would it be an option to say that the default TaskExecutorProcessSpec 
always has the priority 1? That way, the behaviour wrt priorities would not 
change for the current Flink version.
   
   Good point. I think the easiest way is to always start the priority from `1` 
for a new attempt. Since there's only the default `TaskExecutorProcessSpec` for 
the current version, the priority should never increase within the attempt.
   
   For each application, Yarn RM should only accept container requests from one 
AM at a time. Container requests from previous attempt will be discarded, only 
running containers can be recovered by the new AM. Therefore, there should be 
no problem the new attempt requesting containers with the same priorities the 
previous attempts have used.
   
   I was trying to avoid using repeated priorities across attempts for better 
log readability. It's not absolutely necessary, because the container id 
already suggests which attempt the container is allocated. Since there's a good 
reason, I'm good with always start priority from 1 in a new attempt.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to