xintongsong commented on a change in pull request #13592:
URL: https://github.com/apache/flink/pull/13592#discussion_r505196788
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -442,38 +444,25 @@ private RegisterApplicationMasterResponse
registerApplicationMaster() throws Exc
return
resourceManagerClient.registerApplicationMaster(rpcAddress, restPort,
webInterfaceUrl);
}
- private void getContainersFromPreviousAttempts(final
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
+ private int getContainersFromPreviousAttempts(final
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
final List<Container> containersFromPreviousAttempts =
registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
final List<YarnWorkerNode> recoveredWorkers = new ArrayList<>();
log.info("Recovered {} containers from previous attempts
({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
+ int maxPriority = 0;
for (Container container : containersFromPreviousAttempts) {
final YarnWorkerNode worker = new
YarnWorkerNode(container, getContainerResourceId(container));
recoveredWorkers.add(worker);
+ maxPriority =
Math.max(container.getPriority().getPriority(), maxPriority);
Review comment:
> I guess there is no easy way to calculate the TaskExecutorProcessSpec
from a Container to restore the old priority to TaskExecutorProcessSpec mapping?
Unfortunately no.
> Would it be an option to say that the default TaskExecutorProcessSpec
always has the priority 1? That way, the behaviour wrt priorities would not
change for the current Flink version.
Good point. I think the easiest way is to always start the priority from `1`
for a new attempt. Since there's only the default `TaskExecutorProcessSpec` for
the current version, the priority should never increase within the attempt.
For each application, Yarn RM should only accept container requests from one
AM at a time. Container requests from previous attempt will be discarded, only
running containers can be recovered by the new AM. Therefore, there should be
no problem the new attempt requesting containers with the same priorities the
previous attempts have used.
I was trying to avoid using repeated priorities across attempts for better
log readability. It's not absolutely necessary, because the container id
already suggests which attempt the container is allocated. Since there's a good
reason, I'm good with always start priority from 1 in a new attempt.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]