Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4729#discussion_r141820967
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
     
        @Override
        public void stopWorker(ResourceID resourceID) {
    -           // TODO: Implement to stop the worker
    +           ContainerId containerId = 
containerIdMap.remove(resourceID.getResourceIdString());
    +           if (containerId != null) {
    +                   log.info("Stopping container {}.", 
containerId.toString());
    +                   
resourceManagerClient.releaseAssignedContainer(containerId);
    --- End diff --
    
    The ContainerId for YARN can be only generated using 
ContainerId.newInstance, which requires the application attempt id and the 
container Id. The resourceID only contains the container Id information, so 
it's not sufficient to rebuild the YARN ContainerId object that the 
releaseAssignedContainer method takes.


---

Reply via email to