zentol commented on code in PR #21908:
URL: https://github.com/apache/flink/pull/21908#discussion_r1104792362
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java:
##########
@@ -133,7 +138,56 @@ public Optional<VertexParallelismWithSlotSharing>
determineParallelism(
return Optional.of(new
VertexParallelismWithSlotSharing(allVertexParallelism, assignments));
}
- private static Map<JobVertexID, Integer> determineParallelism(
+ /**
+ * Distributes free slots across the slot-sharing groups of the job. Slots
are distributed as
+ * evenly as possible while taking the minimum parallelism of contained
vertices into account.
+ */
+ private static Map<SlotSharingGroupId, Integer>
determineSlotsPerSharingGroup(
+ JobInformation jobInformation, int freeSlots) {
+ int numUnassignedSlots = freeSlots;
+ int numUnassignedSlotSharingGroups =
jobInformation.getSlotSharingGroups().size();
+
+ final Map<SlotSharingGroupId, Integer> slotSharingGroupParallelism =
new HashMap<>();
+
+ for (Tuple2<SlotSharingGroup, Integer> slotSharingGroup :
+ sortSlotSharingGroupsByUpperParallelism(jobInformation)) {
+ final int groupParallelism =
+ Math.min(
+ slotSharingGroup.f1,
+ numUnassignedSlots /
numUnassignedSlotSharingGroups);
Review Comment:
One assumption that we're making with the proposed rescale api is that when
you use the API with the intent of scaling up to _exactly_ X then you must
already _know_ that these resources can be made available to the job, be it
either by spawning new TMs or down-scaling other jobs within a certain amount
of time.
This would only not be the case for temporary issues (like say, a TM crash
at just the right/wrong time) that would be remedied eventually, and thus could
be covered by the stabilization timeout. (it also seems somewhat odd to handle
this case so differently to when an in-use TM crashes and forces a
down-scaling).
Anyhow, this is largely irrelevant to the PR, and I'd suggest to raise these
points in the FLIP discussion. IIRC this also came up in our internal
discussions and we concluded that this _may_ be achievable with a `lowerBound`
parallelism, IFF the current job running below the minimum wouldn't be
interrupted. Basically we consider this a potential follow-up.
"When" to _apply_ a rescaling (in terms of time, like shortly after a
checkpoint, or in terms of value, aka cost of rescaling vs benefit) is a topic
that we're quite interested in.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]