StephanEwen opened a new pull request #12225:
URL: https://github.com/apache/flink/pull/12225


   ## What is the purpose of the change
   
   This PR makes it possible to use safely use spawned threads in the 
`OperatorCoordinator`
   
   ### The Previous Issue
   
   Before, calls on the `Context` in the `OperatorCoordinator` went directly 
synchronously to the `ExcutionGraph`. That introduces two critical problems are:
   
     - It is common that the code in the `OperatorCoordinator` runs in a 
separate thread (for example, because it executes blocking operations). In 
fact, the *FLIP-27* Source Enumerators do that. Calling the scheduler from 
another thread causes the Scheduler to crash (Assertion Error, violation of 
single threaded property)
     - Calls on the `ExecutionGraph` are removed as part of removing the legacy 
scheduler. Certain calls do not work any more.
   
   ### Problem Level 1:
   
   The natural solution would be to pass in the scheduler and a main thread 
executor to interact with it.
   
   However, to do that the scheduler needs to be created before the Operator 
Coordinators are created. One could do that by creating the Coordinators lazily 
after the Scheduler.
   
   ### Problem Level 2:
   
   The Scheduler restores the savepoints as part of the scheduler creation, 
when the `ExecutionGraph` and the `CheckpointCoordinator` are created early in 
the constructor.
   (Side note: That design is tricky in itself, because it means state is 
restored before the scheduler is even properly constructed.)
   
   That means the `OperatorCoordinator` needs to exist (or an in placeholder 
component needs to exist) to accept the restored state.
   
   That brings us to a cyclic dependency:
   
     - `OperatorCoordinator` (context) needs `SchedulerNG` and 
`MainThreadExecutor`
     - `SchedulerNG` and `MainThreadExecutor` need a constructed 
`ExecutionGraph`
     - `ExecutionGraph` needs `CheckpointCoordinator`
     - `CheckpointCoordinator` needs `OperatorCoordinator`
   
   
   ## Brief change log (a.k.a. Breaking the Cycle)
   
   The only way we can currently solve this (without a massive scheduler 
refactoring) is with a form of lazy initialization:
   
     - We eagerly create the `OperatorCoordinators` so they exist for state 
restore
     - We provide an *uninitialized* `Context` to them
     - When the `SchedulerNG` is started (after leadership is granted) we 
initialize the `Context` with the (then readily constructed) `SchedulerNG` and 
`MainThreadExecutor`
   
   ## Suggested Follow-up (A longer-term Solution)
   
   The longer term solution would require a major change in the 
`DefaultScheduler` and `CheckpointCoordinator` setup. Something like this:
   
     - `DefaultScheduler` (and `ExecutionGraph`) are constructed first
     - `JobMaster` waits for leadership
     - Upon leader grant, `Operator Coordinators` (they are part of the 
`JobMaster`) are constructed and can reference the `SchedulerNG` and 
`FencedMainThreadExecutor`
     - `CheckpointCoordinator` is constructed and references `ExecutionGraph` 
and `OperatorCoordinators`
     - Savepoint or latest checkpoint is restored
   
   
   The implementation in this PR tries to couple parts as loosely as possible 
to make it easy to implement the above approach later.
   
   ## Verifying this change
   
   This change is an internal refactor of existing components. The contracts 
are tested by existing unit tests.
   
   The fact that a separate thread can be used in the `OperatorCoordinator` is 
tested in an INtegration Test Case in a follow-up PR, about end-to-end 
coordinator exactly-once guarantees.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
     - The serializers: (yes / no / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't 
know)
     - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to