zhuzhurk commented on code in PR #20153:
URL: https://github.com/apache/flink/pull/20153#discussion_r917245184
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -1449,4 +1491,42 @@ public Void retrievePayload(ResourceID resourceID) {
return null;
}
}
+
+ private class JobMasterBlocklistContext implements BlocklistContext {
+
+ @Override
+ public void blockResources(Collection<BlockedNode> blockedNodes) {
+ Map<String, Set<ResourceID>> taskManagersByNode =
+ taskManagerToNode.entrySet().stream()
+ .collect(
+ Collectors.groupingBy(
+ Map.Entry::getValue,
+ Collectors.mapping(
+ Map.Entry::getKey,
Collectors.toSet())));
+
+ Collection<ResourceID> blockedTaskMangers =
Review Comment:
Given that it is now iterating over all the registered taskmanagers. We can
do it in a simpler and possibly more performant way, by checking all the
taskmanagers to see if it is on the blocked nodes (creating a set for blocked
nodes first).
Or we can maintain the `taskManagersByNode` in JobMaster to make the process
even more performant.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##########
@@ -234,6 +235,31 @@ public boolean releaseTaskManager(ResourceID
taskManagerId, Exception cause) {
return false;
}
+ @Override
+ public void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId,
Exception cause) {
+ assertHasBeenStarted();
+ if (isTaskManagerRegistered(taskManagerId)) {
+
+ Collection<AllocationID> freeSlots =
+ declarativeSlotPool.getFreeSlotsInformation().stream()
+ .filter(
+ slotInfo ->
+ slotInfo.getTaskManagerLocation()
+ .getResourceID()
+ .equals(taskManagerId))
+ .map(SlotInfoWithUtilization::getAllocationId)
+ .collect(Collectors.toSet());
+
+ for (AllocationID allocationID : freeSlots) {
Review Comment:
allocationID -> allocationId
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -208,6 +217,10 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId>
private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
+ private final Map<ResourceID, String> taskManagerToNode;
Review Comment:
This is not necessarily needed. We can have a method instead
```
private String getNodeIdOfTaskManager(ResourceID taskManagerId) {
return
registeredTaskManagers.get(taskManagerId).getTaskManagerLocation().getNodeId();
}
```
Or even just create the function when `blocklistHandlerFactory.create(...)`
is invoked.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]