tillrohrmann commented on a change in pull request #13592:
URL: https://github.com/apache/flink/pull/13592#discussion_r504657264
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -260,36 +259,44 @@ public void releaseResource(YarnWorkerNode workerNode) {
// Internal
//
------------------------------------------------------------------------
- private void onContainersOfResourceAllocated(Resource resource,
List<Container> containers) {
- final List<TaskExecutorProcessSpec>
pendingTaskExecutorProcessSpecs =
-
taskExecutorProcessSpecContainerResourceAdapter.getTaskExecutorProcessSpec(resource,
matchingStrategy).stream()
- .flatMap(spec ->
Collections.nCopies(getNumRequestedNotAllocatedWorkersFor(spec), spec).stream())
- .collect(Collectors.toList());
+ private void onContainersOfPriorityAllocated(Priority priority,
List<Container> containers) {
+ final
Optional<TaskExecutorProcessSpecContainerResourcePriorityAdapter.TaskExecutorProcessSpecAndResource>
taskExecutorProcessSpecAndResourceOpt =
+
taskExecutorProcessSpecContainerResourcePriorityAdapter.getTaskExecutorProcessSpecAndResource(priority);
+
+ if (!taskExecutorProcessSpecAndResourceOpt.isPresent()) {
+ log.warn("Receive {} containers with unrecognized
priority {}. This should not happen.",
+ containers.size(), priority.getPriority());
+ for (Container container : containers) {
+ returnExcessContainer(container);
+ }
+ return;
+ }
Review comment:
Are we not failing hard here because this can happen in case of a
JobManager failover?
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -442,38 +444,25 @@ private RegisterApplicationMasterResponse
registerApplicationMaster() throws Exc
return
resourceManagerClient.registerApplicationMaster(rpcAddress, restPort,
webInterfaceUrl);
}
- private void getContainersFromPreviousAttempts(final
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
+ private int getContainersFromPreviousAttempts(final
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
final List<Container> containersFromPreviousAttempts =
registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
final List<YarnWorkerNode> recoveredWorkers = new ArrayList<>();
log.info("Recovered {} containers from previous attempts
({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
+ int maxPriority = 0;
for (Container container : containersFromPreviousAttempts) {
final YarnWorkerNode worker = new
YarnWorkerNode(container, getContainerResourceId(container));
recoveredWorkers.add(worker);
+ maxPriority =
Math.max(container.getPriority().getPriority(), maxPriority);
Review comment:
I guess there is no easy way to calculate the `TaskExecutorProcessSpec`
from a `Container` to restore the old priority to `TaskExecutorProcessSpec`
mapping?
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -170,8 +159,12 @@ protected void initializeInternal() throws Exception {
resourceManagerClient.start();
final RegisterApplicationMasterResponse
registerApplicationMasterResponse = registerApplicationMaster();
-
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
-
updateMatchingStrategy(registerApplicationMasterResponse);
+ int maxPriority =
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
+ taskExecutorProcessSpecContainerResourcePriorityAdapter
=
+ new
TaskExecutorProcessSpecContainerResourcePriorityAdapter(
+ maxPriority + 1,
Review comment:
Will this be a problem that we continue increasing the priority value?
Potentially, this can grow arbitrarily large.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -442,38 +444,25 @@ private RegisterApplicationMasterResponse
registerApplicationMaster() throws Exc
return
resourceManagerClient.registerApplicationMaster(rpcAddress, restPort,
webInterfaceUrl);
}
- private void getContainersFromPreviousAttempts(final
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
+ private int getContainersFromPreviousAttempts(final
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
final List<Container> containersFromPreviousAttempts =
registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
final List<YarnWorkerNode> recoveredWorkers = new ArrayList<>();
log.info("Recovered {} containers from previous attempts
({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
+ int maxPriority = 0;
for (Container container : containersFromPreviousAttempts) {
final YarnWorkerNode worker = new
YarnWorkerNode(container, getContainerResourceId(container));
recoveredWorkers.add(worker);
+ maxPriority =
Math.max(container.getPriority().getPriority(), maxPriority);
Review comment:
Would it be an option to say that the default `TaskExecutorProcessSpec`
always has the priority `1`? That way, the behaviour wrt priorities would not
change for the current Flink version.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -228,21 +221,27 @@ public void deregisterApplication(ApplicationStatus
finalStatus, @Nullable Strin
@Override
public CompletableFuture<YarnWorkerNode>
requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
- final Optional<Resource> containerResourceOptional =
getContainerResource(taskExecutorProcessSpec);
+ checkInitialized();
+
final CompletableFuture<YarnWorkerNode> requestResourceFuture =
new CompletableFuture<>();
- if (containerResourceOptional.isPresent()) {
-
resourceManagerClient.addContainerRequest(getContainerRequest(containerResourceOptional.get()));
+ final
Optional<TaskExecutorProcessSpecContainerResourcePriorityAdapter.PriorityAndResource>
priorityAndResourceOpt =
+
taskExecutorProcessSpecContainerResourcePriorityAdapter.getPriorityAndResource(taskExecutorProcessSpec);
+
+ if (!priorityAndResourceOpt.isPresent()) {
+ requestResourceFuture.completeExceptionally(
+ new
ResourceManagerException(String.format("Could not compute the container
Resource from the given TaskExecutorProcessSpec %s.",
taskExecutorProcessSpec)));
Review comment:
Would a bit more details be helpful here? Maybe why we could not compute
the container resource?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]