Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4729#discussion_r143934519
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -227,13 +241,19 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
        }
     
        @Override
    -   public void stopWorker(ResourceID resourceID) {
    -           // TODO: Implement to stop the worker
    +   public void stopWorker(YarnWorkerNode workerNode) {
    +           workerNodeMap.remove(workerNode.getResourceID().toString());
    --- End diff --
    
    @tillrohrmann I think the reason that we need the workerNodeMap is because 
that the task executor only has the resourceID when it tries to register with 
the resource manager. So the resource manager has to look up the YarnWorkerNode 
using the resouceID somehow when registerTaskExecutor is called. You can look 
at the workerStarted(ResourceID resourceID) method.


---

Reply via email to