zhuzhurk commented on code in PR #20153:
URL: https://github.com/apache/flink/pull/20153#discussion_r917245184


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -1449,4 +1491,42 @@ public Void retrievePayload(ResourceID resourceID) {
             return null;
         }
     }
+
+    private class JobMasterBlocklistContext implements BlocklistContext {
+
+        @Override
+        public void blockResources(Collection<BlockedNode> blockedNodes) {
+            Map<String, Set<ResourceID>> taskManagersByNode =
+                    taskManagerToNode.entrySet().stream()
+                            .collect(
+                                    Collectors.groupingBy(
+                                            Map.Entry::getValue,
+                                            Collectors.mapping(
+                                                    Map.Entry::getKey, 
Collectors.toSet())));
+
+            Collection<ResourceID> blockedTaskMangers =

Review Comment:
   Given that it is now iterating over all the registered taskmanagers. We can 
do it in a simpler and possibly more performant way, by checking all the 
taskmanagers to see if it is on the blocked nodes (creating a set for blocked 
nodes first).
   Or we can maintain the `taskManagersByNode` in JobMaster to make the process 
even more performant.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##########
@@ -234,6 +235,31 @@ public boolean releaseTaskManager(ResourceID 
taskManagerId, Exception cause) {
         return false;
     }
 
+    @Override
+    public void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, 
Exception cause) {
+        assertHasBeenStarted();
+        if (isTaskManagerRegistered(taskManagerId)) {
+
+            Collection<AllocationID> freeSlots =
+                    declarativeSlotPool.getFreeSlotsInformation().stream()
+                            .filter(
+                                    slotInfo ->
+                                            slotInfo.getTaskManagerLocation()
+                                                    .getResourceID()
+                                                    .equals(taskManagerId))
+                            .map(SlotInfoWithUtilization::getAllocationId)
+                            .collect(Collectors.toSet());
+
+            for (AllocationID allocationID : freeSlots) {

Review Comment:
   allocationID -> allocationId



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -208,6 +217,10 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
 
     private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
 
+    private final Map<ResourceID, String> taskManagerToNode;

Review Comment:
   This is not necessarily needed. We can have a method instead
   ```
   private String getNodeIdOfTaskManager(ResourceID taskManagerId) {
       return 
registeredTaskManagers.get(taskManagerId).getTaskManagerLocation().getNodeId();
   }
   ```
   Or even just create the function when `blocklistHandlerFactory.create(...)` 
is invoked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to