zentol commented on code in PR #21908:
URL: https://github.com/apache/flink/pull/21908#discussion_r1104792362


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java:
##########
@@ -133,7 +138,56 @@ public Optional<VertexParallelismWithSlotSharing> 
determineParallelism(
         return Optional.of(new 
VertexParallelismWithSlotSharing(allVertexParallelism, assignments));
     }
 
-    private static Map<JobVertexID, Integer> determineParallelism(
+    /**
+     * Distributes free slots across the slot-sharing groups of the job. Slots 
are distributed as
+     * evenly as possible while taking the minimum parallelism of contained 
vertices into account.
+     */
+    private static Map<SlotSharingGroupId, Integer> 
determineSlotsPerSharingGroup(
+            JobInformation jobInformation, int freeSlots) {
+        int numUnassignedSlots = freeSlots;
+        int numUnassignedSlotSharingGroups = 
jobInformation.getSlotSharingGroups().size();
+
+        final Map<SlotSharingGroupId, Integer> slotSharingGroupParallelism = 
new HashMap<>();
+
+        for (Tuple2<SlotSharingGroup, Integer> slotSharingGroup :
+                sortSlotSharingGroupsByUpperParallelism(jobInformation)) {
+            final int groupParallelism =
+                    Math.min(
+                            slotSharingGroup.f1,
+                            numUnassignedSlots / 
numUnassignedSlotSharingGroups);

Review Comment:
   One assumption that we're making with the proposed rescale api is that when 
you use the API with the intent of scaling up to _exactly_ X then you must 
already _know_ that these resources can be made available to the job, be it 
either by spawning new TMs or down-scaling other jobs within a certain amount 
of time.
   
   This would only not be the case for temporary issues (like say, a TM crash 
at just the right/wrong time) that would be remedied eventually, and thus could 
be covered by the stabilization timeout. (it also seems somewhat odd to handle 
this case so differently to when an in-use TM crashes and forces a 
down-scaling).
   
   Anyhow, this is largely irrelevant to the PR, and I'd suggest to raise these 
points in the FLIP discussion. IIRC this also came up in our internal 
discussions and we concluded that this _may_ be achievable with a `lowerBound` 
parallelism, IFF the current job running below the minimum wouldn't be 
interrupted. Basically we consider this a potential follow-up.
   "When" to _apply_ a rescaling (in terms of time, like shortly after a 
checkpoint, or in terms of value, aka cost of rescaling vs benefit) is a topic 
that we're quite interested in.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to