Github user suez1224 commented on a diff in the pull request:
https://github.com/apache/flink/pull/4729#discussion_r141820967
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile
resourceProfile) {
@Override
public void stopWorker(ResourceID resourceID) {
- // TODO: Implement to stop the worker
+ ContainerId containerId =
containerIdMap.remove(resourceID.getResourceIdString());
+ if (containerId != null) {
+ log.info("Stopping container {}.",
containerId.toString());
+
resourceManagerClient.releaseAssignedContainer(containerId);
--- End diff --
The ContainerId for YARN can be only generated using
ContainerId.newInstance, which requires the application attempt id and the
container Id. The resourceID only contains the container Id information, so
it's not sufficient to rebuild the YARN ContainerId object that the
releaseAssignedContainer method takes.
---