Stephan Ewen created FLINK-17781:
------------------------------------
Summary: OperatorCoordinator Context must support calls from
thread other than JobMaster Main Thread
Key: FLINK-17781
URL: https://issues.apache.org/jira/browse/FLINK-17781
Project: Flink
Issue Type: Sub-task
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Fix For: 1.11.0
Currently, calls on the Context in the OperatorCoordinator go directly
synchronously to the ExcutionGraph.
There are two critical problems are:
- It is common that the code in the OperatorCoordinator runs in a separate
thread (for example, because it executes blocking operations). Calling the
scheduler from another thread causes the Scheduler to crash (Assertion Error,
violation of single threaded property)
- Calls on the ExecutionGraph are removed as part of removing the legacy
scheduler. Certain calls do not work any more.
+Problem Level 1:+
The solution would be to pass in the scheduler and a main thread executor to
interact with it.
However, to do that the scheduler needs to be created before the
OperatorCoordinators are created. One could do that by creating the
Coordinators lazily after the Scheduler.
+Problem Level 2:+
The Scheduler restores the savepoints as part of the scheduler creation, when
the ExecutionGraph and the CheckpointCoordinator are created early in the
constructor.
(Side note: That design is tricky in itself, because it means state is restored
before the scheduler is even properly constructed.)
That means the OperatorCoordinator needs to exist (or an in placeholder
component needs to exist) to accept the restored state.
That brings us to a cyclic dependency:
- OperatorCoordinator (context) needs Scheduler and MainThreadExecutor
- Scheduler and MainThreadExecutor need constructed ExecutionGraph
- ExecutionGraph needs CheckpointCoordinator
- CheckpointCoordinator needs OperatorCoordinator
+Breaking the Cycle+
The only way we can do this is with a form of lazy initialization:
- We eagerly create the OperatorCoordinators so they exist for state restore
- We provide an uninitialized context to them
- When the Scheduler is started (after leadership is granted) we initialize
the context with the (then readily constructed) Scheduler and MainThreadExecutor
+Longer-term Solution+
The longer term solution would require a major change in the Scheduler and
CheckpointCoordinator setup. Something like this:
- Scheduler (and ExecutionGraph) are constructed first
- JobMaster waits for leadership
- Upon leader grant, Operator Coordinators are constructed and can reference
the Scheduler and FencedMainThreadExecutor
- CheckpointCoordinator is constructed and references ExecutionGraph and
OperatorCoordinators
- Savepoint or latest checkpoint is restored
The implementation of the current should try to couple parts as loosely as
possible to make it easy to implement the above approach later.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)