Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6087#discussion_r191162993
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -212,13 +214,12 @@ private void getContainersFromPreviousAttempts(final 
RegisterApplicationMasterRe
                }
        }
     
    -   protected NMClient createAndStartNodeManagerClient(YarnConfiguration 
yarnConfiguration) {
    -           // create the client to communicate with the node managers
    -           NMClient nodeManagerClient = NMClient.createNMClient();
    -           nodeManagerClient.init(yarnConfiguration);
    -           nodeManagerClient.start();
    -           nodeManagerClient.cleanupRunningContainersOnStop(true);
    -           return nodeManagerClient;
    +   protected NMClientAsync 
createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
    +           // create the client to communicate with the node managers.
    +           NMClientAsync nodeManagerAsyncClient = 
NMClientAsync.createNMClientAsync(new NMClientAsyncCallbackHandler());
    +           nodeManagerAsyncClient.init(yarnConfiguration);
    +           nodeManagerAsyncClient.start();
    --- End diff --
    
    Maybe we should also call this 
`nodeManagerClient.getClient().cleanupRunningContainersOnStop(true);`.


---

Reply via email to