Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4729#discussion_r148808482
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile
resourceProfile) {
}
@Override
- public boolean stopWorker(ResourceID resourceID) {
- // TODO: Implement to stop the worker
- return false;
+ public boolean stopWorker(YarnWorkerNode workerNode) {
+ workerNodeMap.remove(workerNode.getResourceID().toString());
+ if (workerNode != null) {
+ Container container = workerNode.getYarnContainer();
+ log.info("Stopping container {}.",
container.getId().toString());
+ // release the container on the node manager
+ try {
+
nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
+ } catch (Throwable t) {
+ log.error("Error while calling YARN Node
Manager to stop container", t);
--- End diff --
this should be a warning
---