gyfora commented on code in PR #22883:
URL: https://github.com/apache/flink/pull/22883#discussion_r1244836071
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismInformation.java:
##########
@@ -23,6 +23,9 @@
* change during runtime.
*/
public interface VertexParallelismInformation {
+
+ int getMinParallelism();
Review Comment:
Missing javadoc comment
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java:
##########
@@ -78,31 +80,54 @@ public ResourceCounter calculateRequiredSlots(
return ResourceCounter.withResource(ResourceProfile.UNKNOWN,
numTotalRequiredSlots);
}
- private static Map<SlotSharingGroupId, Integer>
getMaxParallelismForSlotSharingGroups(
- Iterable<JobInformation.VertexInformation> vertices) {
- final Map<SlotSharingGroupId, Integer>
maxParallelismForSlotSharingGroups = new HashMap<>();
+ private static <T> Map<SlotSharingGroupId, T> getPerSlotSharingGroups(
+ Iterable<JobInformation.VertexInformation> vertices,
+ Function<JobInformation.VertexInformation, T> mapper,
+ BiFunction<T, T, T> reducer) {
+ final Map<SlotSharingGroupId, T> extractedPerSlotSharingGroups = new
HashMap<>();
for (JobInformation.VertexInformation vertex : vertices) {
- maxParallelismForSlotSharingGroups.compute(
+ extractedPerSlotSharingGroups.compute(
vertex.getSlotSharingGroup().getSlotSharingGroupId(),
- (slotSharingGroupId, currentMaxParallelism) ->
- currentMaxParallelism == null
- ? vertex.getParallelism()
- : Math.max(currentMaxParallelism,
vertex.getParallelism()));
+ (slotSharingGroupId, currentData) ->
+ currentData == null
+ ? mapper.apply(vertex)
+ : reducer.apply(currentData,
mapper.apply(vertex)));
}
- return maxParallelismForSlotSharingGroups;
+ return extractedPerSlotSharingGroups;
+ }
+
+ private static Map<SlotSharingGroupId, Integer>
getMinParallelismPerSlotSharingGroup(
+ Iterable<JobInformation.VertexInformation> vertices) {
+ return getPerSlotSharingGroups(
+ vertices, JobInformation.VertexInformation::getMinParallelism,
Math::min);
+ }
+
+ private static Map<SlotSharingGroupId, Integer>
getMaxParallelismForSlotSharingGroups(
+ Iterable<JobInformation.VertexInformation> vertices) {
+ return getPerSlotSharingGroups(
+ vertices, JobInformation.VertexInformation::getParallelism,
Math::max);
Review Comment:
isn't this supposed to be `getMaxParallelism` here?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java:
##########
@@ -150,21 +175,31 @@ public Optional<JobSchedulingPlan>
determineParallelismAndCalculateAssignment(
* distributed over the remaining groups.
*/
private static Map<SlotSharingGroupId, Integer>
determineSlotsPerSharingGroup(
- JobInformation jobInformation, int freeSlots) {
+ JobInformation jobInformation,
+ int freeSlots,
+ int minRequiredSlots,
+ Map<SlotSharingGroupId, Integer> minSlotsPerSlotSharingGroup) {
+
int numUnassignedSlots = freeSlots;
int numUnassignedSlotSharingGroups =
jobInformation.getSlotSharingGroups().size();
+ int numMinSlotsRequiredByRemainingGroups = minRequiredSlots;
final Map<SlotSharingGroupId, Integer> slotSharingGroupParallelism =
new HashMap<>();
for (Map.Entry<SlotSharingGroupId, Integer> slotSharingGroup :
sortSlotSharingGroupsByDesiredParallelism(jobInformation)) {
+ final int minParallelism =
minSlotsPerSlotSharingGroup.get(slotSharingGroup.getKey());
+
final int groupParallelism =
- Math.min(
- slotSharingGroup.getValue(),
- numUnassignedSlots /
numUnassignedSlotSharingGroups);
+ minParallelism
+ + Math.min(
+ slotSharingGroup.getValue() -
minParallelism,
+ (numUnassignedSlots -
numMinSlotsRequiredByRemainingGroups)
+ / numUnassignedSlotSharingGroups);
Review Comment:
It would be great to have a comment here on this calculation, why we have 2
ways to compute this (and take the minimum) because it's not very obvious at
first glance
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]