Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5931#discussion_r187466146
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -293,21 +293,16 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
        }
     
        @Override
    -   public boolean stopWorker(YarnWorkerNode workerNode) {
    -           if (workerNode != null) {
    -                   Container container = workerNode.getContainer();
    -                   log.info("Stopping container {}.", container.getId());
    -                   // release the container on the node manager
    -                   try {
    -                           
nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
    -                   } catch (Throwable t) {
    -                           log.warn("Error while calling YARN Node Manager 
to stop container", t);
    -                   }
    -                   
resourceManagerClient.releaseAssignedContainer(container.getId());
    -                   workerNodeMap.remove(workerNode.getResourceID());
    -           } else {
    -                   log.error("Can not find container for null 
workerNode.");
    +   public boolean stopWorker(final YarnWorkerNode workerNode) {
    +           final Container container = workerNode.getContainer();
    +           log.info("Stopping container {}.", container.getId());
    +           try {
    +                   nodeManagerClient.stopContainer(container.getId(), 
container.getNodeId());
    +           } catch (final Exception e) {
    +                   log.warn("Error while calling YARN Node Manager to stop 
container", e);
                }
    +           
resourceManagerClient.releaseAssignedContainer(container.getId());
    +           workerNodeMap.remove(workerNode.getResourceID());
                return true;
    --- End diff --
    
    @sihuazhou  The body in `onContainersCompleted` is wrapped in `runAsync`
    ```
    protected void runAsync(Runnable runnable)
    Execute the runnable in the main thread of the underlying RPC endpoint.
    Parameters:
    runnable - Runnable to be executed in the main thread of the underlying RPC 
endpoint
    ```


---

Reply via email to