Roman Khachatryan created FLINK-23204:
-----------------------------------------
Summary: Provide StateBackends access to MailboxExecutor
Key: FLINK-23204
URL: https://issues.apache.org/jira/browse/FLINK-23204
Project: Flink
Issue Type: Sub-task
Components: Runtime / State Backends, Runtime / Task
Reporter: Roman Khachatryan
Fix For: 1.14.0
StateBackends are assumed to be not-thread-safe and accessed from the task
thread only.
In ChangelogStateBackend, there are (more) async operations. In addition to the
usual methods, task thread is needed for:
* DFS writer: collect so far uploaded changes; handle upload results after
completion
* ChangelogKeyedStateBackend: combining state handles upon upload completion
by writer
* ChangelogKeyedStateBackend: materialization - take snapshot (sync phase);
handle results of the async phase
Direct synchronization can be used instead, but executing ^^^ by the Task
thread would simpilfy the code (and ilkely improve performance).
The only way to do this is via MailboxExecutor (because task thread runs mail
actions in a loop until shutdown).
However, it is currently created in StreamTask and classes reside in
flink-streaming-java. So one subtask is to change creation/lifecycle and move
the classes. The location is flink-core (at least for interfaces) and
flink-runtime/flink-core (for implementations).
---
Another subtask is to actually expose it to state backends (can be extracted to
a separate task).
StateBackend.createKeyedStateBackend already has Environment/TaskStateManager
argument which can be used.
However, Environment
# is available to the user (via getContainingTask)
# has too wide scope (e.g. InputGates not needed in state backends)
# has too many responsibilities - also true for TaskStateManager which has
e.g. reportIncompleteTaskStateSnapshots
Probably, there is a better way to expose it.
---
Note that MailboxExecutor will likely be used in future in other places like
ProcessFunction.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)