Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5436#discussion_r168197381
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -325,26 +325,43 @@ public void 
onContainersCompleted(List<ContainerStatus> list) {
        @Override
        public void onContainersAllocated(List<Container> containers) {
                for (Container container : containers) {
    -                   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
    -                   log.info("Received new container: {} - Remaining 
pending container requests: {}",
    -                                   container.getId(), 
numPendingContainerRequests);
    -                   final String containerIdStr = 
container.getId().toString();
    -                   workerNodeMap.put(new ResourceID(containerIdStr),
    -                                   new YarnWorkerNode(container));
    -                   try {
    -                           /** Context information used to start a 
TaskExecutor Java process */
    -                           ContainerLaunchContext 
taskExecutorLaunchContext =
    -                                           createTaskExecutorLaunchContext(
    -                                                           
container.getResource(), containerIdStr, container.getNodeId().getHost());
    -                           nodeManagerClient.startContainer(container, 
taskExecutorLaunchContext);
    -                   }
    -                   catch (Throwable t) {
    -                           // failed to launch the container, will release 
the failed one and ask for a new one
    -                           log.error("Could not start TaskManager in 
container {},", container, t);
    +                   log.info(
    +                           "Received new container: {} - Remaining pending 
container requests: {}",
    +                           container.getId(),
    +                           numPendingContainerRequests);
    +
    +                   if (numPendingContainerRequests > 0) {
    +                           numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
    +
    +                           final String containerIdStr = 
container.getId().toString();
    +
    +                           workerNodeMap.put(new 
ResourceID(containerIdStr), new YarnWorkerNode(container));
    +
    +                           try {
    +                                   // Context information used to start a 
TaskExecutor Java process
    +                                   ContainerLaunchContext 
taskExecutorLaunchContext = createTaskExecutorLaunchContext(
    +                                           container.getResource(),
    +                                           containerIdStr,
    +                                           
container.getNodeId().getHost());
    +
    +                                   
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
    +                           } catch (Throwable t) {
    +                                   log.error("Could not start TaskManager 
in container {}.", container.getId(), t);
    +
    +                                   // release the failed container
    +                                   
resourceManagerClient.releaseAssignedContainer(container.getId());
    +                                   // and ask for a new one
    +                                   
requestYarnContainer(container.getResource(), container.getPriority());
    +                           }
    +                   } else {
    +                           // return the excessive containers
    +                           log.info("Returning excess container {}.", 
container.getId());
                                
resourceManagerClient.releaseAssignedContainer(container.getId());
    -                           requestYarnContainer(container.getResource(), 
container.getPriority());
                        }
                }
    +
    +           // if we are waiting for no further containers, we can go to the
    +           // regular heartbeat interval
                if (numPendingContainerRequests <= 0) {
    --- End diff --
    
    yes indeed. However, the logic should not be affected by using `<=` as the 
comparison operator.


---

Reply via email to