[ 
https://issues.apache.org/jira/browse/FLINK-33668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo resolved FLINK-33668.
--------------------------------
    Fix Version/s: 1.20.0
                       (was: 1.19.0)
       Resolution: Done

master(1.20) via 4171b980717e5dc313d781210a5bbfd35449ee50.

> Decoupling Shuffle network memory and job topology
> --------------------------------------------------
>
>                 Key: FLINK-33668
>                 URL: https://issues.apache.org/jira/browse/FLINK-33668
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>            Reporter: Jiang Xin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.20.0
>
>
> With FLINK-30469  and FLINK-31643, we have decoupled the shuffle network 
> memory and the parallelism of tasks by limiting the number of buffers for 
> each InputGate and ResultPartition in Hybrid Shuffle. However, when a task 
> contains multiple ResultPartitions or many shuffle tasks are running 
> simultaneously on the same TaskManager, "Insufficient number of network 
> buffers" errors would still occur. This usually happens when Slot Sharing 
> Group is enabled or a TaskManager contains multiple slots.
> We want to make sure that the TaskManager does not encounter "Insufficient 
> number of network buffers" even if there are dozens of InputGates and 
> ResultPartitions running on the same TaskManager simultaneously. I have given 
> this some thought, and here is my rough proposal.
> 1. InputGate or ResultPartition only apply for buffers from LocalBufferPool, 
> which means that InputGate will no longer ask for exclusive buffers from 
> NetworkBufferPool directly.
> 2. When creating a LocalBufferPool, we need to specify the maximum, minimum, 
> and expected number of buffers. Whenever a new LBP is created or destroyed, a 
> redistribution will occur, to adjust the buffer count of all LocalBufferPools 
> using the expected value as a weight and between the minimum and maximum 
> values. According to the test, the minimum value can be set to 4 to make the 
> Flink Job work despite the possibility of lower performance. With this 
> minimum value, a task with 20 shuffle edges needs only 5MB of memory to avoid 
> "insufficient network buffer" error.
> 3. During runtime, InputGate and ResultPartition both calculate the number of 
> buffers used by their internal data structures based on the pool size of 
> their corresponding LocalBufferPool, such as the exclusive buffer queue of 
> InputGate and BufferAccumulator of ResultPartition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to