[ 
https://issues.apache.org/jira/browse/FLINK-7804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16400614#comment-16400614
 ] 

ASF GitHub Bot commented on FLINK-7804:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5675#discussion_r174834515
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -325,67 +328,74 @@ public float getProgress() {
     
        @Override
        public void onContainersCompleted(List<ContainerStatus> list) {
    -           for (ContainerStatus container : list) {
    -                   if (container.getExitStatus() < 0) {
    -                           closeTaskManagerConnection(new ResourceID(
    -                                   container.getContainerId().toString()), 
new Exception(container.getDiagnostics()));
    +           runAsync(() -> {
    +                           for (ContainerStatus container : list) {
    +                                   if (container.getExitStatus() < 0) {
    +                                           closeTaskManagerConnection(new 
ResourceID(
    +                                                   
container.getContainerId().toString()), new 
Exception(container.getDiagnostics()));
    +                                   }
    +                                   workerNodeMap.remove(new 
ResourceID(container.getContainerId().toString()));
    +                           }
                        }
    -                   workerNodeMap.remove(new 
ResourceID(container.getContainerId().toString()));
    -           }
    +           );
        }
     
        @Override
        public void onContainersAllocated(List<Container> containers) {
    -           for (Container container : containers) {
    -                   log.info(
    -                           "Received new container: {} - Remaining pending 
container requests: {}",
    -                           container.getId(),
    -                           numPendingContainerRequests);
    -
    -                   if (numPendingContainerRequests > 0) {
    -                           numPendingContainerRequests--;
    -
    -                           final String containerIdStr = 
container.getId().toString();
    -
    -                           workerNodeMap.put(new 
ResourceID(containerIdStr), new YarnWorkerNode(container));
    -
    -                           try {
    -                                   // Context information used to start a 
TaskExecutor Java process
    -                                   ContainerLaunchContext 
taskExecutorLaunchContext = createTaskExecutorLaunchContext(
    -                                           container.getResource(),
    -                                           containerIdStr,
    -                                           
container.getNodeId().getHost());
    -
    -                                   
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
    -                           } catch (Throwable t) {
    -                                   log.error("Could not start TaskManager 
in container {}.", container.getId(), t);
    -
    -                                   // release the failed container
    +           runAsync(() -> {
    +                   for (Container container : containers) {
    +                           log.info(
    +                                   "Received new container: {} - Remaining 
pending container requests: {}",
    +                                   container.getId(),
    +                                   numPendingContainerRequests);
    +
    +                           if (numPendingContainerRequests > 0) {
    +                                   numPendingContainerRequests--;
    +
    +                                   final String containerIdStr = 
container.getId().toString();
    +
    +                                   workerNodeMap.put(new 
ResourceID(containerIdStr), new YarnWorkerNode(container));
    +
    +                                   try {
    +                                           // Context information used to 
start a TaskExecutor Java process
    +                                           ContainerLaunchContext 
taskExecutorLaunchContext = createTaskExecutorLaunchContext(
    +                                                   container.getResource(),
    +                                                   containerIdStr,
    +                                                   
container.getNodeId().getHost());
    +
    +                                           
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
    +                                   } catch (Throwable t) {
    +                                           log.error("Could not start 
TaskManager in container {}.", container.getId(), t);
    +
    +                                           // release the failed container
    +                                           
resourceManagerClient.releaseAssignedContainer(container.getId());
    +                                           // and ask for a new one
    +                                           
requestYarnContainer(container.getResource(), container.getPriority());
    +                                   }
    +                           } else {
    +                                   // return the excessive containers
    +                                   log.info("Returning excess container 
{}.", container.getId());
                                        
resourceManagerClient.releaseAssignedContainer(container.getId());
    -                                   // and ask for a new one
    -                                   
requestYarnContainer(container.getResource(), container.getPriority());
                                }
    -                   } else {
    -                           // return the excessive containers
    -                           log.info("Returning excess container {}.", 
container.getId());
    -                           
resourceManagerClient.releaseAssignedContainer(container.getId());
                        }
    -           }
     
    -           // if we are waiting for no further containers, we can go to the
    -           // regular heartbeat interval
    -           if (numPendingContainerRequests <= 0) {
    -                   
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
    -           }
    +                   // if we are waiting for no further containers, we can 
go to the
    +                   // regular heartbeat interval
    +                   if (numPendingContainerRequests <= 0) {
    +                           
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
    +                   }
    +           });
        }
     
        @Override
        public void onShutdownRequest() {
    -           try {
    -                   shutDown();
    -           } catch (Exception e) {
    -                   log.warn("Fail to shutdown the YARN resource manager.", 
e);
    -           }
    +           runAsync(() -> {
    +                   try {
    +                           shutDown();
    --- End diff --
    
    `shutDown` can be directly called


> YarnResourceManager does not execute AMRMClientAsync callbacks in main thread
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-7804
>                 URL: https://issues.apache.org/jira/browse/FLINK-7804
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination, YARN
>    Affects Versions: 1.4.0, 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Gary Yao
>            Priority: Blocker
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> The {{YarnResourceManager}} registers callbacks at a {{AMRMClientAsync}} 
> which it uses to react to Yarn container allocations. These callbacks (e.g. 
> {{onContainersAllocated}} modify the internal state of the 
> {{YarnResourceManager}}. This can lead to race conditions with the 
> {{requestYarnContainer}} method.
> In order to solve this problem we have to execute the state changing 
> operations in the main thread of the {{YarnResourceManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to