Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4729#discussion_r145067245
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,13 +241,19 @@ public void startNewWorker(ResourceProfile
resourceProfile) {
}
@Override
- public void stopWorker(ResourceID resourceID) {
- // TODO: Implement to stop the worker
+ public void stopWorker(YarnWorkerNode workerNode) {
+ workerNodeMap.remove(workerNode.getResourceID().toString());
+ if (workerNode != null) {
+ log.info("Stopping container {}.",
workerNode.getYarnContainer().getId().toString());
+
resourceManagerClient.releaseAssignedContainer(workerNode.getYarnContainer().getId());
--- End diff --
Yes, I think that's also how we do it in the old `YarnFlinkResourceManager`.
---