zhuzhurk commented on a change in pull request #15229:
URL: https://github.com/apache/flink/pull/15229#discussion_r742631384
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
##########
@@ -40,38 +39,37 @@
private final Function<ExecutionVertexID, AllocationID>
priorAllocationIdRetriever;
+ private final Supplier<Set<AllocationID>> reservedAllocationIdsRetriever;
+
MergingSharedSlotProfileRetrieverFactory(
SyncPreferredLocationsRetriever preferredLocationsRetriever,
- Function<ExecutionVertexID, AllocationID>
priorAllocationIdRetriever) {
+ Function<ExecutionVertexID, AllocationID>
priorAllocationIdRetriever,
+ Supplier<Set<AllocationID>> reservedAllocationIdsRetriever) {
this.preferredLocationsRetriever =
Preconditions.checkNotNull(preferredLocationsRetriever);
this.priorAllocationIdRetriever =
Preconditions.checkNotNull(priorAllocationIdRetriever);
+ this.reservedAllocationIdsRetriever =
+ Preconditions.checkNotNull(reservedAllocationIdsRetriever);
}
@Override
public SharedSlotProfileRetriever createFromBulk(Set<ExecutionVertexID>
bulk) {
- Set<AllocationID> allPriorAllocationIds =
- bulk.stream()
- .map(priorAllocationIdRetriever)
- .filter(Objects::nonNull)
- .collect(Collectors.toSet());
- return new MergingSharedSlotProfileRetriever(allPriorAllocationIds,
bulk);
+ return new
MergingSharedSlotProfileRetriever(reservedAllocationIdsRetriever.get(), bulk);
Review comment:
Yes it is a limitation of local recovery on bounded streaming jobs. At
the moment, I think the change is acceptable, because
- the problem has been there since local recovery was introduced. (although
disappears when the pipelined region scheduling was introduced in 1.12, because
local recovery restrictions can be broken in these problematic cases)
- slot request timeout will happen even if it is a bounded streaming job at
the moment. So it is expected to have enough slots for all tasks to run at the
same time, even for a bounded streaming jobs.
Later we can improve this for bounded streaming jobs,
1. first, we need to identify bounded streaming jobs. To achieve this, as
you mentioned, we need to derive the `slotWillBeOccupiedIndefinitely` property
from operators
2. if it is bounded streaming jobs, when allocating slots, it can try going
the local recovery path first so that local recovery can still work for it in
most cases. If slot allocation fails due to not enough slots, while a global
check shows that there are enough slots for some of the regions to run,
scheduler can clean up the reserved allocations before re-scheduling tasks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]