Roman Khachatryan created FLINK-23204: -----------------------------------------
Summary: Provide StateBackends access to MailboxExecutor Key: FLINK-23204 URL: https://issues.apache.org/jira/browse/FLINK-23204 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends, Runtime / Task Reporter: Roman Khachatryan Fix For: 1.14.0 StateBackends are assumed to be not-thread-safe and accessed from the task thread only. In ChangelogStateBackend, there are (more) async operations. In addition to the usual methods, task thread is needed for: * DFS writer: collect so far uploaded changes; handle upload results after completion * ChangelogKeyedStateBackend: combining state handles upon upload completion by writer * ChangelogKeyedStateBackend: materialization - take snapshot (sync phase); handle results of the async phase Direct synchronization can be used instead, but executing ^^^ by the Task thread would simpilfy the code (and ilkely improve performance). The only way to do this is via MailboxExecutor (because task thread runs mail actions in a loop until shutdown). However, it is currently created in StreamTask and classes reside in flink-streaming-java. So one subtask is to change creation/lifecycle and move the classes. The location is flink-core (at least for interfaces) and flink-runtime/flink-core (for implementations). --- Another subtask is to actually expose it to state backends (can be extracted to a separate task). StateBackend.createKeyedStateBackend already has Environment/TaskStateManager argument which can be used. However, Environment # is available to the user (via getContainingTask) # has too wide scope (e.g. InputGates not needed in state backends) # has too many responsibilities - also true for TaskStateManager which has e.g. reportIncompleteTaskStateSnapshots Probably, there is a better way to expose it. --- Note that MailboxExecutor will likely be used in future in other places like ProcessFunction. -- This message was sent by Atlassian Jira (v8.3.4#803005)