Roman Khachatryan created FLINK-23204:
-----------------------------------------

             Summary: Provide StateBackends access to MailboxExecutor
                 Key: FLINK-23204
                 URL: https://issues.apache.org/jira/browse/FLINK-23204
             Project: Flink
          Issue Type: Sub-task
          Components: Runtime / State Backends, Runtime / Task
            Reporter: Roman Khachatryan
             Fix For: 1.14.0


StateBackends are assumed to be not-thread-safe and accessed from the task 
thread only.

In ChangelogStateBackend, there are (more) async operations. In addition to the 
usual methods, task thread is needed for:
 * DFS writer: collect so far uploaded changes; handle upload results after 
completion
 * ChangelogKeyedStateBackend: combining state handles upon upload completion 
by writer
 * ChangelogKeyedStateBackend: materialization - take snapshot (sync phase); 
handle results of the async phase 

Direct synchronization can be used instead, but executing ^^^ by the Task 
thread would simpilfy the code (and ilkely improve performance).

The only way to do this is via MailboxExecutor (because task thread runs mail 
actions in a loop until shutdown).

 

However, it is currently created in StreamTask and classes reside in 
flink-streaming-java. So one subtask is to change creation/lifecycle and move 
the classes. The location is flink-core (at least for interfaces) and 
flink-runtime/flink-core (for implementations).

 

---

Another subtask is to actually expose it to state backends (can be extracted to 
a separate task).

StateBackend.createKeyedStateBackend already has Environment/TaskStateManager 
argument which can be used.

However, Environment
 # is available to the user (via getContainingTask)
 # has too wide scope (e.g. InputGates not needed in state backends)
 # has too many responsibilities - also true for TaskStateManager which has 
e.g. reportIncompleteTaskStateSnapshots

Probably, there is a better way to expose it.

 

---

Note that MailboxExecutor will likely be used in future in other places like 
ProcessFunction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to