Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6087#discussion_r191162993
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -212,13 +214,12 @@ private void getContainersFromPreviousAttempts(final
RegisterApplicationMasterRe
}
}
- protected NMClient createAndStartNodeManagerClient(YarnConfiguration
yarnConfiguration) {
- // create the client to communicate with the node managers
- NMClient nodeManagerClient = NMClient.createNMClient();
- nodeManagerClient.init(yarnConfiguration);
- nodeManagerClient.start();
- nodeManagerClient.cleanupRunningContainersOnStop(true);
- return nodeManagerClient;
+ protected NMClientAsync
createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
+ // create the client to communicate with the node managers.
+ NMClientAsync nodeManagerAsyncClient =
NMClientAsync.createNMClientAsync(new NMClientAsyncCallbackHandler());
+ nodeManagerAsyncClient.init(yarnConfiguration);
+ nodeManagerAsyncClient.start();
--- End diff --
Maybe we should also call this
`nodeManagerClient.getClient().cleanupRunningContainersOnStop(true);`.
---