tillrohrmann commented on a change in pull request #13592:
URL: https://github.com/apache/flink/pull/13592#discussion_r504657264



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -260,36 +259,44 @@ public void releaseResource(YarnWorkerNode workerNode) {
        //  Internal
        // 
------------------------------------------------------------------------
 
-       private void onContainersOfResourceAllocated(Resource resource, 
List<Container> containers) {
-               final List<TaskExecutorProcessSpec> 
pendingTaskExecutorProcessSpecs =
-                       
taskExecutorProcessSpecContainerResourceAdapter.getTaskExecutorProcessSpec(resource,
 matchingStrategy).stream()
-                               .flatMap(spec -> 
Collections.nCopies(getNumRequestedNotAllocatedWorkersFor(spec), spec).stream())
-                               .collect(Collectors.toList());
+       private void onContainersOfPriorityAllocated(Priority priority, 
List<Container> containers) {
+               final 
Optional<TaskExecutorProcessSpecContainerResourcePriorityAdapter.TaskExecutorProcessSpecAndResource>
 taskExecutorProcessSpecAndResourceOpt =
+                       
taskExecutorProcessSpecContainerResourcePriorityAdapter.getTaskExecutorProcessSpecAndResource(priority);
+
+               if (!taskExecutorProcessSpecAndResourceOpt.isPresent()) {
+                       log.warn("Receive {} containers with unrecognized 
priority {}. This should not happen.",
+                               containers.size(), priority.getPriority());
+                       for (Container container : containers) {
+                               returnExcessContainer(container);
+                       }
+                       return;
+               }

Review comment:
       Are we not failing hard here because this can happen in case of a 
JobManager failover?

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -442,38 +444,25 @@ private RegisterApplicationMasterResponse 
registerApplicationMaster() throws Exc
                return 
resourceManagerClient.registerApplicationMaster(rpcAddress, restPort, 
webInterfaceUrl);
        }
 
-       private void getContainersFromPreviousAttempts(final 
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
+       private int getContainersFromPreviousAttempts(final 
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
                final List<Container> containersFromPreviousAttempts =
                        
registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
                final List<YarnWorkerNode> recoveredWorkers = new ArrayList<>();
 
                log.info("Recovered {} containers from previous attempts 
({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
 
+               int maxPriority = 0;
                for (Container container : containersFromPreviousAttempts) {
                        final YarnWorkerNode worker = new 
YarnWorkerNode(container, getContainerResourceId(container));
                        recoveredWorkers.add(worker);
+                       maxPriority = 
Math.max(container.getPriority().getPriority(), maxPriority);

Review comment:
       I guess there is no easy way to calculate the `TaskExecutorProcessSpec` 
from a `Container` to restore the old priority to `TaskExecutorProcessSpec` 
mapping?

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -170,8 +159,12 @@ protected void initializeInternal() throws Exception {
                        resourceManagerClient.start();
 
                        final RegisterApplicationMasterResponse 
registerApplicationMasterResponse = registerApplicationMaster();
-                       
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
-                       
updateMatchingStrategy(registerApplicationMasterResponse);
+                       int maxPriority = 
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
+                       taskExecutorProcessSpecContainerResourcePriorityAdapter 
=
+                               new 
TaskExecutorProcessSpecContainerResourcePriorityAdapter(
+                                       maxPriority + 1,

Review comment:
       Will this be a problem that we continue increasing the priority value? 
Potentially, this can grow arbitrarily large.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -442,38 +444,25 @@ private RegisterApplicationMasterResponse 
registerApplicationMaster() throws Exc
                return 
resourceManagerClient.registerApplicationMaster(rpcAddress, restPort, 
webInterfaceUrl);
        }
 
-       private void getContainersFromPreviousAttempts(final 
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
+       private int getContainersFromPreviousAttempts(final 
RegisterApplicationMasterResponse registerApplicationMasterResponse) {
                final List<Container> containersFromPreviousAttempts =
                        
registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
                final List<YarnWorkerNode> recoveredWorkers = new ArrayList<>();
 
                log.info("Recovered {} containers from previous attempts 
({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
 
+               int maxPriority = 0;
                for (Container container : containersFromPreviousAttempts) {
                        final YarnWorkerNode worker = new 
YarnWorkerNode(container, getContainerResourceId(container));
                        recoveredWorkers.add(worker);
+                       maxPriority = 
Math.max(container.getPriority().getPriority(), maxPriority);

Review comment:
       Would it be an option to say that the default `TaskExecutorProcessSpec` 
always has the priority `1`? That way, the behaviour wrt priorities would not 
change for the current Flink version.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -228,21 +221,27 @@ public void deregisterApplication(ApplicationStatus 
finalStatus, @Nullable Strin
 
        @Override
        public CompletableFuture<YarnWorkerNode> 
requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
-               final Optional<Resource> containerResourceOptional = 
getContainerResource(taskExecutorProcessSpec);
+               checkInitialized();
+
                final CompletableFuture<YarnWorkerNode> requestResourceFuture = 
new CompletableFuture<>();
 
-               if (containerResourceOptional.isPresent()) {
-                       
resourceManagerClient.addContainerRequest(getContainerRequest(containerResourceOptional.get()));
+               final 
Optional<TaskExecutorProcessSpecContainerResourcePriorityAdapter.PriorityAndResource>
 priorityAndResourceOpt =
+                       
taskExecutorProcessSpecContainerResourcePriorityAdapter.getPriorityAndResource(taskExecutorProcessSpec);
+
+               if (!priorityAndResourceOpt.isPresent()) {
+                       requestResourceFuture.completeExceptionally(
+                               new 
ResourceManagerException(String.format("Could not compute the container 
Resource from the given TaskExecutorProcessSpec %s.", 
taskExecutorProcessSpec)));

Review comment:
       Would a bit more details be helpful here? Maybe why we could not compute 
the container resource?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to