dmvk commented on a change in pull request #18238:
URL: https://github.com/apache/flink/pull/18238#discussion_r776614525
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
##########
@@ -103,60 +112,53 @@ public ResourceCounter calculateRequiredSlots(
return Optional.empty();
}
- final Iterator<? extends SlotInfo> slotIterator = freeSlots.iterator();
-
final Collection<ExecutionSlotSharingGroupAndSlot> assignments = new
ArrayList<>();
final Map<JobVertexID, Integer> allVertexParallelism = new HashMap<>();
+ final Map<SlotSharingGroupId, Set<? extends SlotInfo>> slotsByGroupId =
+ slotAssigner.splitSlotsBetweenSlotSharingGroups(
+ freeSlots, jobInformation.getSlotSharingGroups());
+
for (SlotSharingGroup slotSharingGroup :
jobInformation.getSlotSharingGroups()) {
final List<JobInformation.VertexInformation> containedJobVertices =
slotSharingGroup.getJobVertexIds().stream()
.map(jobInformation::getVertexInformation)
.collect(Collectors.toList());
-
- final Map<JobVertexID, Integer> vertexParallelism =
- determineParallelism(containedJobVertices,
slotsPerSlotSharingGroup);
-
- final Iterable<ExecutionSlotSharingGroup>
sharedSlotToVertexAssignment =
- createExecutionSlotSharingGroups(vertexParallelism);
-
- for (ExecutionSlotSharingGroup executionSlotSharingGroup :
- sharedSlotToVertexAssignment) {
- final SlotInfo slotInfo = slotIterator.next();
-
- assignments.add(
- new
ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo));
- }
- allVertexParallelism.putAll(vertexParallelism);
+ final Map<JobVertexID, Integer> adjustedParallelism =
+ adjustParallelism(containedJobVertices,
slotsPerSlotSharingGroup);
+ final List<ExecutionSlotSharingGroup> sharedSlotToVertexAssignment
=
+ createExecutionSlotSharingGroups(adjustedParallelism);
+ final Set<? extends SlotInfo> groupSlots =
+
slotsByGroupId.get(slotSharingGroup.getSlotSharingGroupId());
+ assignments.addAll(slotAssigner.assignSlots(groupSlots,
sharedSlotToVertexAssignment));
+ allVertexParallelism.putAll(adjustedParallelism);
}
+ System.out.println("==== <determine parallelism end> ====");
return Optional.of(new
VertexParallelismWithSlotSharing(allVertexParallelism, assignments));
}
- private static Map<JobVertexID, Integer> determineParallelism(
+ private static Map<JobVertexID, Integer> adjustParallelism(
Collection<JobInformation.VertexInformation> containedJobVertices,
int availableSlots) {
final Map<JobVertexID, Integer> vertexParallelism = new HashMap<>();
for (JobInformation.VertexInformation jobVertex :
containedJobVertices) {
final int parallelism = Math.min(jobVertex.getParallelism(),
availableSlots);
-
vertexParallelism.put(jobVertex.getJobVertexID(), parallelism);
}
-
return vertexParallelism;
}
- private static Iterable<ExecutionSlotSharingGroup>
createExecutionSlotSharingGroups(
+ private static List<ExecutionSlotSharingGroup>
createExecutionSlotSharingGroups(
Map<JobVertexID, Integer> containedJobVertices) {
final Map<Integer, Set<ExecutionVertexID>>
sharedSlotToVertexAssignment = new HashMap<>();
-
- for (Map.Entry<JobVertexID, Integer> jobVertex :
containedJobVertices.entrySet()) {
- for (int i = 0; i < jobVertex.getValue(); i++) {
- sharedSlotToVertexAssignment
- .computeIfAbsent(i, ignored -> new HashSet<>())
- .add(new ExecutionVertexID(jobVertex.getKey(), i));
- }
- }
-
+ containedJobVertices.forEach(
+ (jobVertexId, parallelism) -> {
+ for (int subtaskIdx = 0; subtaskIdx < parallelism;
subtaskIdx++) {
+ sharedSlotToVertexAssignment
+ .computeIfAbsent(subtaskIdx, ignored -> new
HashSet<>())
+ .add(new ExecutionVertexID(jobVertexId,
subtaskIdx));
+ }
+ });
Review comment:
no special reason, it just felt more readable having a named "key" and
"value"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]