Github user suez1224 commented on a diff in the pull request:
https://github.com/apache/flink/pull/4729#discussion_r143934519
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,13 +241,19 @@ public void startNewWorker(ResourceProfile
resourceProfile) {
}
@Override
- public void stopWorker(ResourceID resourceID) {
- // TODO: Implement to stop the worker
+ public void stopWorker(YarnWorkerNode workerNode) {
+ workerNodeMap.remove(workerNode.getResourceID().toString());
--- End diff --
@tillrohrmann I think the reason that we need the workerNodeMap is because
that the task executor only has the resourceID when it tries to register with
the resource manager. So the resource manager has to look up the YarnWorkerNode
using the resouceID somehow when registerTaskExecutor is called. You can look
at the workerStarted(ResourceID resourceID) method.
---