[ 
https://issues.apache.org/jira/browse/FLINK-23204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17385216#comment-17385216
 ] 

Yuan Mei commented on FLINK-23204:
----------------------------------

merged commit 
[{{6709573}}|https://github.com/apache/flink/commit/67095737989e5196ca713fedf416f81545235d29]
 into master.

> Provide StateBackends access to MailboxExecutor
> -----------------------------------------------
>
>                 Key: FLINK-23204
>                 URL: https://issues.apache.org/jira/browse/FLINK-23204
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / State Backends, Runtime / Task
>            Reporter: Roman Khachatryan
>            Assignee: Yuan Mei
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 1.14.0
>
>
> StateBackends are assumed to be not-thread-safe and accessed from the task 
> thread only.
> In ChangelogStateBackend, there are (more) async operations. In addition to 
> the usual methods, task thread is needed for:
>  * DFS writer: collect so far uploaded changes; handle upload results after 
> completion
>  * ChangelogKeyedStateBackend: combining state handles upon upload completion 
> by writer
>  * ChangelogKeyedStateBackend: materialization - take snapshot (sync phase); 
> handle results of the async phase 
> Direct synchronization can be used instead, but executing ^^^ by the Task 
> thread would simpilfy the code (and ilkely improve performance).
> The only way to do this is via MailboxExecutor (because task thread runs mail 
> actions in a loop until shutdown).
>  
> However, it is currently created in StreamTask and classes reside in 
> flink-streaming-java. So one subtask is to change creation/lifecycle and move 
> the classes. The location is flink-core (at least for interfaces) and 
> flink-runtime/flink-core (for implementations).
>  
> —
> Another subtask is to actually expose Executor to state backends (can be 
> extracted into a separate task).
> StateBackend.createKeyedStateBackend already has Environment/TaskStateManager 
> argument which can be used.
> However, Environment
>  # is available to the user (via getContainingTask)
>  # has too wide scope (e.g. InputGates not needed in state backends)
>  # has too many responsibilities - also true for TaskStateManager which has 
> e.g. reportIncompleteTaskStateSnapshots
> Probably, there is a better way to expose it.
>  
> —
> Note that MailboxExecutor will likely be used in future in other places like 
> ProcessFunction.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to