WeiZhong94 commented on code in PR #25734:
URL: https://github.com/apache/flink/pull/25734#discussion_r2199939234
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##########
@@ -112,6 +116,8 @@ public class DefaultDeclarativeSlotPool implements
DeclarativeSlotPool {
// For slots(resources) requests by batch.
@Nonnull private final Duration slotRequestMaxInterval;
@Nullable private ScheduledFuture<?> slotRequestFuture;
+ private boolean resourceRequestStable;
+ @Nullable private Long lastRequestMillisOfInterval;
Review Comment:
This member variable is never used.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/LoadableResourceProfile.java:
##########
Review Comment:
It seems that ResourceProfile and LoadingWeight are not tightly coupled.
LoadingWeight is also used independently in many places. Additionally,
LoadingWeight is not really a type of Resource, so including it as a member of
XXXResourceProfile is somewhat forced from a semantic point of view. It might
be more semantically appropriate to include it directly as a member of the
various Slot abstractions. In this case, using a separate abstraction to
generalize them might be a bit redundant — removing that abstraction could
potentially simplify the code.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java:
##########
@@ -89,13 +112,29 @@ public Collection<RequestSlotMatch> matchRequestsAndSlots(
unmatchedRequests.addAll(pendingRequestsWithPreferredAllocations.values());
if (!freeSlots.isEmpty() && !unmatchedRequests.isEmpty()) {
requestSlotMatches.addAll(
-
SimpleRequestSlotMatchingStrategy.INSTANCE.matchRequestsAndSlots(
- freeSlots.values(), unmatchedRequests));
+ rollbackStrategy.matchRequestsAndSlots(
+ freeSlots.values(), unmatchedRequests,
taskExecutorsLoadingWeight));
}
return requestSlotMatches;
}
+ @VisibleForTesting
Review Comment:
I think this equals method is well implemented. It is designed as a public
method and can be used everywhere. So this annotation maybe unnecessary.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##########
@@ -210,34 +223,34 @@ void newSlotsAreAvailable(Collection<? extends
PhysicalSlot> newSlots) {
return;
}
- if (slotBatchAllocatable) {
- newSlotsAvailableForSlotBatchAllocatable(newSlots);
- } else {
- newSlotsAvailableForDirectlyAllocatable(newSlots);
+ if (!slotBatchAllocatable) {
Review Comment:
The name "slotBatchAllocatable" is potentially misleading, since it might be
mistaken for indicating support for batch processing mode, rather than
batch-style slot assignment. Maybe "waitForResourceRequestStable" or
"deferSlotAllocation" would be better?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]