mxm commented on code in PR #21908:
URL: https://github.com/apache/flink/pull/21908#discussion_r1104885068


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java:
##########
@@ -133,7 +138,56 @@ public Optional<VertexParallelismWithSlotSharing> 
determineParallelism(
         return Optional.of(new 
VertexParallelismWithSlotSharing(allVertexParallelism, assignments));
     }
 
-    private static Map<JobVertexID, Integer> determineParallelism(
+    /**
+     * Distributes free slots across the slot-sharing groups of the job. Slots 
are distributed as
+     * evenly as possible while taking the minimum parallelism of contained 
vertices into account.
+     */
+    private static Map<SlotSharingGroupId, Integer> 
determineSlotsPerSharingGroup(
+            JobInformation jobInformation, int freeSlots) {
+        int numUnassignedSlots = freeSlots;
+        int numUnassignedSlotSharingGroups = 
jobInformation.getSlotSharingGroups().size();
+
+        final Map<SlotSharingGroupId, Integer> slotSharingGroupParallelism = 
new HashMap<>();
+
+        for (Tuple2<SlotSharingGroup, Integer> slotSharingGroup :
+                sortSlotSharingGroupsByUpperParallelism(jobInformation)) {
+            final int groupParallelism =
+                    Math.min(
+                            slotSharingGroup.f1,
+                            numUnassignedSlots / 
numUnassignedSlotSharingGroups);

Review Comment:
   Thanks for elaborating! I just saw [the 
proposal](https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management),
 I had somehow missed it before.
   
   >Then you must already know that these resources can be made available to 
the job, be it either by spawning new TMs or down-scaling other jobs within a 
certain amount of time.
   
   We are assuming a Kubernetes environment. It is possible that the k8s 
cluster is at max capacity; we will still need manual intervention in this 
case. However, we absolutely want to prevent jobs not recovering from a scaling 
operation due to lack of resources. If we could set resource requirements with 
min/max boundaries and those are guaranteed before restarting the job, then 
there won't be an issue in case of a resource-constrained environment because 
existing jobs continue to run. 
   
   Considering the "when" and "how", Gyula and I have thought quite a bit about 
effective scaling. The initial tests we have run with customers have yielded 
promising results. We haven't optimized the checkpoint timing yet which is 
something we want to look into as well.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to