zentol opened a new pull request #13722: URL: https://github.com/apache/flink/pull/13722
Adds the DeclarativeSlotPool, a slot pool supporting declarative resource management. This pool will for the time being not be used directly, as this will require proper integration into the scheduler. Instead, a wrapper will be added in FLINK-19314 that bridges the existing SlotRequest-based protocol to the declarative one. ### DeclarativeSlotPool This pool no longer accepts slot requests but only in-/decrements to the requirements, which it accumulators, with the resulting total requirements being announced to the ResourceManager through a listener interface. Slots that are offered by TaskExecutors are matched against these requirements; any slot not matching any requirement will be rejected. The Scheduler then can make use of these slots by iterating over the free slots returned by the pool and reserving them. The slot will remain reserved until it is either a) freed by the Scheduler, in which case it will remain in the pool for future use b) explicitly released, usually due to a failed allocated or TaskExecutor shutting down. Freed slots remain in the pool so long as they correspond to a requirement; a slot that has been free for the idleTimeout and is required will be released. This action is periodically triggered from the outside. As mentioned above, when slots are offered to the SlotPool they are matched against the declared requirements to figure out whether we actually need the slot. However, this mapping between slots and the fulfilled requirement is mostly theoretical; the scheduler is free to use a slot to fulfill another requirement, e.g., because the locality of the slot makes it a desirable choice, despite potentially wasting resources. To prevent deadlocks where the scheduler reserves slots in such a way that the requirements could no longer be fulfilled, the SlotPool automatically adjusts the requirements. As an example, assume we have 2 types of resource profiles (small/large), where the large one can also be used for the small profile. The job requires 1 of each, the slot pool is offered one of each, and it does a perfect match between the offered slots and requirements: ``` Requirements: large: 1 small: 1 Mapping: largeSlot <-> largeRequirement smallSlot <-> smallRequirement ``` The scheduler may now decide to use the large slot to fulfill the small requirement. Without any further action, the job would not able to run since it is missing a large slot. We prevent this by adjusting the requirements such that we need 1 more large slot (since we used up the other one) and one less small slot (since we fulfilled this requirement with the large one). The currently allocated small slot is hence no longer needed, and could be released: ``` Requirements: large: 2 small: 0 Mapping: largeSlot <-> smallRequirement ``` Once the second large slot is allocated and offered to the pool it will be matched against the outstanding large requirement, and the slot can start running. ### ResourceCounter The ResourceCounter is an immutable version of the existing `org.apache.flink.runtime.slots.ResourceCounter`, it being immutable due to being exposed at various points in the API. ### AutoRequirementDecrementingSlotPoolWrapper The current resource allocation protocol will be bridged to the declarative one by mapping SlotRequests to requirement increases, and slot freeing/releases to requirement reductions. The increments will be done explicitly in the wrapper that we'll introduce in FLINK-19314, whereas the reductions will be handled in the `AutoRequirementDecrementingSlotPoolWrapper`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
