mxm commented on code in PR #21908:
URL: https://github.com/apache/flink/pull/21908#discussion_r1103091640
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java:
##########
@@ -133,7 +138,56 @@ public Optional<VertexParallelismWithSlotSharing>
determineParallelism(
return Optional.of(new
VertexParallelismWithSlotSharing(allVertexParallelism, assignments));
}
- private static Map<JobVertexID, Integer> determineParallelism(
+ /**
+ * Distributes free slots across the slot-sharing groups of the job. Slots
are distributed as
+ * evenly as possible while taking the minimum parallelism of contained
vertices into account.
+ */
+ private static Map<SlotSharingGroupId, Integer>
determineSlotsPerSharingGroup(
+ JobInformation jobInformation, int freeSlots) {
+ int numUnassignedSlots = freeSlots;
+ int numUnassignedSlotSharingGroups =
jobInformation.getSlotSharingGroups().size();
+
+ final Map<SlotSharingGroupId, Integer> slotSharingGroupParallelism =
new HashMap<>();
+
+ for (Tuple2<SlotSharingGroup, Integer> slotSharingGroup :
+ sortSlotSharingGroupsByUpperParallelism(jobInformation)) {
+ final int groupParallelism =
+ Math.min(
+ slotSharingGroup.f1,
+ numUnassignedSlots /
numUnassignedSlotSharingGroups);
Review Comment:
That is terrible from an autoscaler perspective, as we want full control
over the scaling. We never want the adaptive scheduler to reduce the
parallelism in any way. Instead, we provide the spec and the scheduler has to
rescale the job safely. It either fulfils the rescaling request, or it does
nothing. We do not have unlimited retries for rescaling. Rescaling is costly
especially for stateful workloads.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]