tillrohrmann opened a new pull request #15251: URL: https://github.com/apache/flink/pull/15251
This PR adds functionality to let the `AdaptiveScheduler` create the `ExecutionGraph` asynchronously. This is necessary to avoid blocking I/O operations (retrieving of checkpoints) to happen in the main thread. The change consists essentially of three steps: 1. Introducing a new `CreatingExecutionGraph` state between `WaitingForResources` and `Executing` 2. Splitting the determination of the job parallelism and reserving of slots in two steps 3. Tracking of asynchronous background tasks and waiting on their completion before closing checkpoint services ### New `CreatingExecutionGraph` state The new `CreatingExecutionGraph` state has the purpose to model that an asynchronous `ExecutionGraph` creation is happening. Since the operation happens asynchronously, it can happen that the available set of slots changes: * If the `ExecutionGraph` creation succeeds and if we can assign the required slots, then we go to the `Executing` state. * If the `ExecutionGraph` creation succeeds and we cannot assign the required slots (e.g. some slots were lost), then we go back to the `WaitingForResources` state. * If the `ExecutionGraph` creation fails, then we go into the `Finished` state and with a `JobStatus.FAILED` result ### Splitting of determining job parallelism and reserving slots Since we do not directly create the `ExecutionGraph` after we have decided on its parallelism we cannot directly reserve the required slots. In order to support this two-step process, the `SlotAllocator` interface was adjusted to make the `reserveResources` failable. The new API is `Optional<? extends ReservedSlots> tryReserveResources(VertexParallelism vertexParallelism);` which returns `Optional.empty()` if the slots cannot be reserved. This allows the `AdaptiveScheduler` to tell the `CreatingExecutionGraph` state that it could not reserve the required slots which lets the scheduler transition back into the `WaitingForResources` state. ### Tracking of asynchronous tasks With the introduction of the asynchronous `ExecutionGraph` creation, we also introduced concurrent accesses to the checkpoint services (`CompletedCheckpointStore`, `CheckpointIdCounter`) because they are now accessed by the `ioExecutor` which creates the `ExecutionGraph` and by the main thread when the `Scheduler` reaches the `Finished` state. This is a problem and in order to solve it, the `AdaptiveScheduler` now tracks its background/asynchronous tasks. Moreover, it only allows to run a single task at a time. Before shutting down the checkpoint services, it waits for the completion of these tasks in order to make sure that no concurrent accesses to the `CompletedCheckpointStore` and the `CheckpointIdCounter` happen. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
