WeiZhong94 commented on code in PR #25734:
URL: https://github.com/apache/flink/pull/25734#discussion_r2199939234


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##########
@@ -112,6 +116,8 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
     // For slots(resources) requests by batch.
     @Nonnull private final Duration slotRequestMaxInterval;
     @Nullable private ScheduledFuture<?> slotRequestFuture;
+    private boolean resourceRequestStable;
+    @Nullable private Long lastRequestMillisOfInterval;

Review Comment:
   This member variable is never used.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/LoadableResourceProfile.java:
##########


Review Comment:
   It seems that ResourceProfile and LoadingWeight are not tightly coupled. 
LoadingWeight is also used independently in many places. Additionally, 
LoadingWeight is not really a type of Resource, so including it as a member of 
XXXResourceProfile is somewhat forced from a semantic point of view. It might 
be more semantically appropriate to include it directly as a member of the 
various Slot abstractions. In this case, using a separate abstraction to 
generalize them might be a bit redundant — removing that abstraction could 
potentially simplify the code.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java:
##########
@@ -89,13 +112,29 @@ public Collection<RequestSlotMatch> matchRequestsAndSlots(
         
unmatchedRequests.addAll(pendingRequestsWithPreferredAllocations.values());
         if (!freeSlots.isEmpty() && !unmatchedRequests.isEmpty()) {
             requestSlotMatches.addAll(
-                    
SimpleRequestSlotMatchingStrategy.INSTANCE.matchRequestsAndSlots(
-                            freeSlots.values(), unmatchedRequests));
+                    rollbackStrategy.matchRequestsAndSlots(
+                            freeSlots.values(), unmatchedRequests, 
taskExecutorsLoadingWeight));
         }
 
         return requestSlotMatches;
     }
 
+    @VisibleForTesting

Review Comment:
   I think this equals method is well implemented. It is designed as a public 
method and can be used everywhere. So this annotation maybe unnecessary.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##########
@@ -210,34 +223,34 @@ void newSlotsAreAvailable(Collection<? extends 
PhysicalSlot> newSlots) {
             return;
         }
 
-        if (slotBatchAllocatable) {
-            newSlotsAvailableForSlotBatchAllocatable(newSlots);
-        } else {
-            newSlotsAvailableForDirectlyAllocatable(newSlots);
+        if (!slotBatchAllocatable) {

Review Comment:
   The name "slotBatchAllocatable" is potentially misleading, since it might be 
mistaken for indicating support for batch processing mode, rather than 
batch-style slot assignment. Maybe "waitForResourceRequestStable" or 
"deferSlotAllocation" would be better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to