tillrohrmann opened a new pull request #15251:
URL: https://github.com/apache/flink/pull/15251


   This PR adds functionality to let the `AdaptiveScheduler` create the 
`ExecutionGraph` asynchronously. This is necessary to avoid blocking I/O 
operations (retrieving of checkpoints) to happen in the main thread. The change 
consists essentially of three steps:
   
   1. Introducing a new `CreatingExecutionGraph` state between 
`WaitingForResources` and `Executing`
   2. Splitting the determination of the job parallelism and reserving of slots 
in two steps
   3. Tracking of asynchronous background tasks and waiting on their completion 
before closing checkpoint services
   
   ### New `CreatingExecutionGraph` state
   
   The new `CreatingExecutionGraph` state has the purpose to model that an 
asynchronous `ExecutionGraph` creation is happening. Since the operation 
happens asynchronously, it can happen that the available set of slots changes:
   
   * If the `ExecutionGraph` creation succeeds and if we can assign the 
required slots, then we go to the `Executing` state.
   * If the `ExecutionGraph` creation succeeds and we cannot assign the 
required slots (e.g. some slots were lost), then we go back to the 
`WaitingForResources` state.
   * If the `ExecutionGraph` creation fails, then we go into the `Finished` 
state and with a `JobStatus.FAILED` result
   
   ### Splitting of determining job parallelism and reserving slots
   
   Since we do not directly create the `ExecutionGraph` after we have decided 
on its parallelism we cannot directly reserve the required slots. In order to 
support this two-step process, the `SlotAllocator` interface was adjusted to 
make the `reserveResources` failable. The new API is `Optional<? extends 
ReservedSlots> tryReserveResources(VertexParallelism vertexParallelism);` which 
returns `Optional.empty()` if the slots cannot be reserved. This allows the 
`AdaptiveScheduler` to tell the `CreatingExecutionGraph` state that it could 
not reserve the required slots which lets the scheduler transition back into 
the `WaitingForResources` state.
   
   ### Tracking of asynchronous tasks
   
   With the introduction of the asynchronous `ExecutionGraph` creation, we also 
introduced concurrent accesses to the checkpoint services 
(`CompletedCheckpointStore`, `CheckpointIdCounter`) because they are now 
accessed by the `ioExecutor` which creates the `ExecutionGraph` and by the main 
thread when the `Scheduler` reaches the `Finished` state. This is a problem and 
in order to solve it, the `AdaptiveScheduler` now tracks its 
background/asynchronous tasks. Moreover, it only allows to run a single task at 
a time. Before shutting down the checkpoint services, it waits for the 
completion of these tasks in order to make sure that no concurrent accesses to 
the `CompletedCheckpointStore` and the `CheckpointIdCounter` happen.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to