mxm commented on code in PR #21908:
URL: https://github.com/apache/flink/pull/21908#discussion_r1104885068
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java:
##########
@@ -133,7 +138,56 @@ public Optional<VertexParallelismWithSlotSharing>
determineParallelism(
return Optional.of(new
VertexParallelismWithSlotSharing(allVertexParallelism, assignments));
}
- private static Map<JobVertexID, Integer> determineParallelism(
+ /**
+ * Distributes free slots across the slot-sharing groups of the job. Slots
are distributed as
+ * evenly as possible while taking the minimum parallelism of contained
vertices into account.
+ */
+ private static Map<SlotSharingGroupId, Integer>
determineSlotsPerSharingGroup(
+ JobInformation jobInformation, int freeSlots) {
+ int numUnassignedSlots = freeSlots;
+ int numUnassignedSlotSharingGroups =
jobInformation.getSlotSharingGroups().size();
+
+ final Map<SlotSharingGroupId, Integer> slotSharingGroupParallelism =
new HashMap<>();
+
+ for (Tuple2<SlotSharingGroup, Integer> slotSharingGroup :
+ sortSlotSharingGroupsByUpperParallelism(jobInformation)) {
+ final int groupParallelism =
+ Math.min(
+ slotSharingGroup.f1,
+ numUnassignedSlots /
numUnassignedSlotSharingGroups);
Review Comment:
Thanks for elaborating! I just saw [the
proposal](https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management),
I had somehow missed it before.
>Then you must already know that these resources can be made available to
the job, be it either by spawning new TMs or down-scaling other jobs within a
certain amount of time.
We are assuming a Kubernetes environment. It is possible that the k8s
cluster is at max capacity; we will still need manual intervention in this
case. However, we absolutely want to prevent jobs not recovering from a scaling
operation due to lack of resources. If we could set resource requirements with
min/max boundaries and those are guaranteed before restarting the job, then
there won't be an issue in case of a resource-constrained environment because
existing jobs continue to run.
Considering the "when" and "how", Gyula and I have thought quite a bit about
effective scaling. The initial tests we have run with customers have yielded
promising results. We haven't optimized the checkpoint timing yet which is
something we want to look into as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]