Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4729#discussion_r142100776
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile
resourceProfile) {
@Override
public void stopWorker(ResourceID resourceID) {
- // TODO: Implement to stop the worker
+ ContainerId containerId =
containerIdMap.remove(resourceID.getResourceIdString());
+ if (containerId != null) {
+ log.info("Stopping container {}.",
containerId.toString());
+
resourceManagerClient.releaseAssignedContainer(containerId);
--- End diff --
Then we should change the `Worker` type to also include the Mesos
`ContainerId` and pass it to the `stopWorker` method.
---