[ 
https://issues.apache.org/jira/browse/FLINK-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17066771#comment-17066771
 ] 

Piotr Nowojski commented on FLINK-16645:
----------------------------------------

I've assigned the ticket to you [~wind_ljy]. I think technically speaking you 
are right, this option is relevant just for the 
{{ChannelSelectorRecordWriter}}. However I'm also not sure, if the easiest 
solution wouldn't be a general solution that works for both of them in the same 
way. 

Can you share an idea how were you planing to implement this feature?

Please keep in mind, that since last year network stack code has evolved, and 
it's important to implement this feature in a non-blocking way - the code 
should try to avoid blocking inside {{#emit(...)}} calls. For example take a 
look at the existing method 
{{org.apache.flink.runtime.io.network.api.writer.RecordWriter#getAvailableFuture}}.
 As in the description we were thinking about hooking up the data skew backlog 
limit check somewhere in this method (more preciscely probably in 
{{ResultPartition#getAvailableFuture}}) - just mark the record writer/result 
partition not available if we exceed the backlog limit in one of the sub 
partitions.


> Limit the maximum backlogs in subpartitions for data skew case
> --------------------------------------------------------------
>
>                 Key: FLINK-16645
>                 URL: https://issues.apache.org/jira/browse/FLINK-16645
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Network
>            Reporter: Zhijiang
>            Assignee: Jiayi Liao
>            Priority: Major
>             Fix For: 1.11.0
>
>
> In the case of data skew, most of the buffers in partition's LocalBufferPool 
> are probably requested away and accumulated in certain subpartition, which 
> would increase in-flight data to slow down the barrier alignment.
> We can set up a proper config to control how many backlogs are allowed for 
> one subpartition. If one subpartition reaches this threshold, it will make 
> the buffer pool unavailable which blocks task processing continuously. Then 
> we can reduce the in-flight data for speeding up checkpoint process a bit and 
> not impact on the performance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to