[ 
https://issues.apache.org/jira/browse/FLINK-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17068867#comment-17068867
 ] 

Jiayi Liao commented on FLINK-16645:
------------------------------------

[~pnowojski]  Agree. This's a very performance-sensitive improvement. Do we 
need to add a backpressure benchmark, for example adding a {{sleep(0.1s)}} in 
the sink? In this way we can measure the impact of this change. (Maybe the 
unaligned-checkpoint feature can also leverage this)

About the counter of multiple exceeding max-backlog sub-partitions, it might be 
something like this:
{code:java}
int counter = 0;

void notifyIncreaseBacklog(int backlog, int subpartitionIndex) {
        if (backlog == maxBacklog + 1) counter++;
}

void notifyDecreaseBacklog(int backlog, int subpartitionIndex) {
        if (backlog == maxBacklog) counter--;
}
{code}
We can use the fact that the backlog's increasing/decreasing step can only be 
one, otherwise we may need to remember every sub-partition's backlog, which 
looks not good to me.

> Limit the maximum backlogs in subpartitions for data skew case
> --------------------------------------------------------------
>
>                 Key: FLINK-16645
>                 URL: https://issues.apache.org/jira/browse/FLINK-16645
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Network
>            Reporter: Zhijiang
>            Assignee: Jiayi Liao
>            Priority: Major
>             Fix For: 1.11.0
>
>
> In the case of data skew, most of the buffers in partition's LocalBufferPool 
> are probably requested away and accumulated in certain subpartition, which 
> would increase in-flight data to slow down the barrier alignment.
> We can set up a proper config to control how many backlogs are allowed for 
> one subpartition. If one subpartition reaches this threshold, it will make 
> the buffer pool unavailable which blocks task processing continuously. Then 
> we can reduce the in-flight data for speeding up checkpoint process a bit and 
> not impact on the performance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to