StephanEwen opened a new pull request #12225:
URL: https://github.com/apache/flink/pull/12225
## What is the purpose of the change
This PR makes it possible to use safely use spawned threads in the
`OperatorCoordinator`
### The Previous Issue
Before, calls on the `Context` in the `OperatorCoordinator` went directly
synchronously to the `ExcutionGraph`. That introduces two critical problems are:
- It is common that the code in the `OperatorCoordinator` runs in a
separate thread (for example, because it executes blocking operations). In
fact, the *FLIP-27* Source Enumerators do that. Calling the scheduler from
another thread causes the Scheduler to crash (Assertion Error, violation of
single threaded property)
- Calls on the `ExecutionGraph` are removed as part of removing the legacy
scheduler. Certain calls do not work any more.
### Problem Level 1:
The natural solution would be to pass in the scheduler and a main thread
executor to interact with it.
However, to do that the scheduler needs to be created before the Operator
Coordinators are created. One could do that by creating the Coordinators lazily
after the Scheduler.
### Problem Level 2:
The Scheduler restores the savepoints as part of the scheduler creation,
when the `ExecutionGraph` and the `CheckpointCoordinator` are created early in
the constructor.
(Side note: That design is tricky in itself, because it means state is
restored before the scheduler is even properly constructed.)
That means the `OperatorCoordinator` needs to exist (or an in placeholder
component needs to exist) to accept the restored state.
That brings us to a cyclic dependency:
- `OperatorCoordinator` (context) needs `SchedulerNG` and
`MainThreadExecutor`
- `SchedulerNG` and `MainThreadExecutor` need a constructed
`ExecutionGraph`
- `ExecutionGraph` needs `CheckpointCoordinator`
- `CheckpointCoordinator` needs `OperatorCoordinator`
## Brief change log (a.k.a. Breaking the Cycle)
The only way we can currently solve this (without a massive scheduler
refactoring) is with a form of lazy initialization:
- We eagerly create the `OperatorCoordinators` so they exist for state
restore
- We provide an *uninitialized* `Context` to them
- When the `SchedulerNG` is started (after leadership is granted) we
initialize the `Context` with the (then readily constructed) `SchedulerNG` and
`MainThreadExecutor`
## Suggested Follow-up (A longer-term Solution)
The longer term solution would require a major change in the
`DefaultScheduler` and `CheckpointCoordinator` setup. Something like this:
- `DefaultScheduler` (and `ExecutionGraph`) are constructed first
- `JobMaster` waits for leadership
- Upon leader grant, `Operator Coordinators` (they are part of the
`JobMaster`) are constructed and can reference the `SchedulerNG` and
`FencedMainThreadExecutor`
- `CheckpointCoordinator` is constructed and references `ExecutionGraph`
and `OperatorCoordinators`
- Savepoint or latest checkpoint is restored
The implementation in this PR tries to couple parts as loosely as possible
to make it easy to implement the above approach later.
## Verifying this change
This change is an internal refactor of existing components. The contracts
are tested by existing unit tests.
The fact that a separate thread can be used in the `OperatorCoordinator` is
tested in an INtegration Test Case in a follow-up PR, about end-to-end
coordinator exactly-once guarantees.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: **no**
- The serializers: (yes / no / don't know)
- The runtime per-record code paths (performance sensitive): (yes / no /
don't know)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't
know)
- The S3 file system connector: (yes / no / don't know)
## Documentation
- Does this pull request introduce a new feature? (yes / no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs
/ not documented)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]