[ 
https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364100#comment-16364100
 ] 

ASF GitHub Bot commented on FLINK-8613:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5436#discussion_r168185682
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -325,26 +325,43 @@ public void 
onContainersCompleted(List<ContainerStatus> list) {
        @Override
        public void onContainersAllocated(List<Container> containers) {
                for (Container container : containers) {
    -                   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
    -                   log.info("Received new container: {} - Remaining 
pending container requests: {}",
    -                                   container.getId(), 
numPendingContainerRequests);
    -                   final String containerIdStr = 
container.getId().toString();
    -                   workerNodeMap.put(new ResourceID(containerIdStr),
    -                                   new YarnWorkerNode(container));
    -                   try {
    -                           /** Context information used to start a 
TaskExecutor Java process */
    -                           ContainerLaunchContext 
taskExecutorLaunchContext =
    -                                           createTaskExecutorLaunchContext(
    -                                                           
container.getResource(), containerIdStr, container.getNodeId().getHost());
    -                           nodeManagerClient.startContainer(container, 
taskExecutorLaunchContext);
    -                   }
    -                   catch (Throwable t) {
    -                           // failed to launch the container, will release 
the failed one and ask for a new one
    -                           log.error("Could not start TaskManager in 
container {},", container, t);
    +                   log.info(
    +                           "Received new container: {} - Remaining pending 
container requests: {}",
    +                           container.getId(),
    +                           numPendingContainerRequests);
    +
    +                   if (numPendingContainerRequests > 0) {
    +                           numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
    +
    +                           final String containerIdStr = 
container.getId().toString();
    +
    +                           workerNodeMap.put(new 
ResourceID(containerIdStr), new YarnWorkerNode(container));
    +
    +                           try {
    +                                   // Context information used to start a 
TaskExecutor Java process
    +                                   ContainerLaunchContext 
taskExecutorLaunchContext = createTaskExecutorLaunchContext(
    +                                           container.getResource(),
    +                                           containerIdStr,
    +                                           
container.getNodeId().getHost());
    +
    +                                   
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
    +                           } catch (Throwable t) {
    +                                   log.error("Could not start TaskManager 
in container {}.", container.getId(), t);
    +
    +                                   // release the failed container
    +                                   
resourceManagerClient.releaseAssignedContainer(container.getId());
    +                                   // and ask for a new one
    +                                   
requestYarnContainer(container.getResource(), container.getPriority());
    +                           }
    +                   } else {
    +                           // return the excessive containers
    +                           log.info("Returning excess container {}.", 
container.getId());
                                
resourceManagerClient.releaseAssignedContainer(container.getId());
    -                           requestYarnContainer(container.getResource(), 
container.getPriority());
                        }
                }
    +
    +           // if we are waiting for no further containers, we can go to the
    +           // regular heartbeat interval
                if (numPendingContainerRequests <= 0) {
    --- End diff --
    
    I think going negative should not be possible.


> Return excess container in YarnResourceManager
> ----------------------------------------------
>
>                 Key: FLINK-8613
>                 URL: https://issues.apache.org/jira/browse/FLINK-8613
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Coordination, YARN
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Major
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> The {{YarnResourceManager}} should return excess containers which the Yarn 
> RessourceManager assigned wrongly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to