[ 
https://issues.apache.org/jira/browse/FLINK-33668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33668:
-----------------------------------
    Labels: pull-request-available  (was: )

> Decoupling Shuffle network memory and job topology
> --------------------------------------------------
>
>                 Key: FLINK-33668
>                 URL: https://issues.apache.org/jira/browse/FLINK-33668
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>            Reporter: Jiang Xin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.19.0
>
>
> With FLINK-30469  and FLINK-31643, we have decoupled the shuffle network 
> memory and the parallelism of tasks by limiting the number of buffers for 
> each InputGate and ResultPartition in Hybrid Shuffle. However, when too many 
> shuffle tasks are running simultaneously on the same TaskManager, 
> "Insufficient number of network buffers" errors would still occur. This 
> usually happens when Slot Sharing Group is enabled or a TaskManager contains 
> multiple slots.
> We want to make sure that the TaskManager does not encounter "Insufficient 
> number of network buffers" even if there are dozens of InputGates and 
> ResultPartitions running on the same TaskManager simultaneously. I have given 
> this some thought, and here is my rough proposal.
> 1. InputGate or ResultPartition only apply for buffers from LocalBufferPool, 
> which means that InputGate will no longer ask for exclusive buffers from 
> NetworkBufferPool directly.
> 2. When creating a LocalBufferPool, we need to specify the maximum, minimum, 
> and expected number of buffers. Whenever a new LBP is created or destroyed, a 
> redistribution will occur, to adjust the buffer count of all LocalBufferPools 
> using the expected value as a weight and between the minimum and maximum 
> values. According to the test, the minimum value can be set to 4 to make the 
> Flink Job work despite the possibility of lower performance. With this 
> minimum value, a task with 20 shuffle edges needs only 5MB of memory to avoid 
> "insufficient network buffer" error.
> 3. During runtime, InputGate and ResultPartition both calculate the number of 
> buffers used by their internal data structures based on the pool size of 
> their corresponding LocalBufferPool, such as the exclusive buffer queue of 
> InputGate and BufferAccumulator of ResultPartition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to