[ 
https://issues.apache.org/jira/browse/FLINK-15981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17035052#comment-17035052
 ] 

zhijiang commented on FLINK-15981:
----------------------------------

[~sewen] You really proposed some good and critical questions. I try to share 
some my previous thoughts, probably not very throughly.

> Is this only about the "read side", or do we also need to make adjustments to 
>the "write side"?

This issue is more serious on read side compared with write side, so it is 
better to solve the reader side in first step. For write side it already makes 
use of the `LocalBufferPool` which should be considered in `maxDirectMemory` 
setting.

Of course we can also improve the write side if necessary, from the aspect of 
reducing the amount of `MemorySegment` in `LocalBufferPool`. The current core 
size of segment is equivalent to the number of subpartitions. We ever 
implemented a `MergePartition` in Blink to give a fixed memory to write the 
data from all the subpartitions. After this portion of memory is fulfilled, it 
would be flushed to disk and trigger further merge among different portions. It 
can save memory because the portion size is not relevant to the job scale, also 
we can get performance benefits for some scenarios.

 

For the improvement on read side, there are some options and actually you also 
mentioned them.

> Should there be one in Netty, shared across all readers? In that case, when 
> would it be created / destroyed, and how much memory would it get?

It is feasible to get this buffer for reading data from netty stack in 
technology. Further we still have two options. One option is one buffer per 
netty channel, and another option is one buffer per netty thread.

One netty thread might be responsible for multiple netty channels for reading 
data, and based on the assumption that the netty thread would not continue 
reading data if the previous buffer not recycled (in the case of socket not 
writable and pending the flush operation).

So one netty thread only requires one buffer at most, the total required buffer 
amount is equal to the number of netty threads at most. Considering the number 
of netty threads is limited, it would reduce the total memory overhead greatly, 
because now it requires 2 buffers per subpartition for reading.

> Would there be one pool per "producing task"? In that case, when would that 
>pool get destroyed (memory returned)? Especially with the new cluster 
>partitions, the data can live much longer than any tasks (cached for later / 
>for recovery).

I think this is another possible option to focus. The LocalBufferPool on write 
side would be destroyed after task finishes. It seems not very reasonable. We 
already decouple the lifecycle between task and partition via `partition 
lifecycle management`. The LocalBufferPool should belong to the partition 
component, so it should be up to the partition when to release it. The 
lifecycle of LocalBufferPool might not be coupled with task lifecycle. If this 
assumption makes sense, we can still make use this pool for read side later.

Of course it would bring another bad thing to occupy the global buffer resource 
long time if the partition's lifecycle too long as you mentioned. But it can be 
interpreted from the concept/semantic (if partition is not released yet, it 
should occupy resources), maybe not very efficient.  From this point, the above 
option 1 might be a better choice.

> Or one local pool per reader? In that case, what do we do if the number of 
>network buffers is not sufficient? Wait until more buffers become available?

I guess it is based on different lifecycles for different subpartitions, so one 
pool per reader for fine-grained usage/controlling? If so I think it is the 
further improvement. First we should confirm which option we want to focus. If 
this portion of buffers comes from the netty stack, it would be always 
available. If it comes from the previous pool on write side, it can be still 
available as long as the write pool is not destroyed as mentioned above.

No matter which option we take, this portion of memory should be exposed 
explicitly via some interface, then the framework can fetch it and then 
consider it in MaxDirectMemory setting.

 

> Control the direct memory in FileChannelBoundedData.FileBufferReader
> --------------------------------------------------------------------
>
>                 Key: FLINK-15981
>                 URL: https://issues.apache.org/jira/browse/FLINK-15981
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>    Affects Versions: 1.10.0
>            Reporter: Jingsong Lee
>            Priority: Critical
>             Fix For: 1.10.1, 1.11.0
>
>
> Now, the default blocking BoundedData is FileChannelBoundedData. In its 
> reader, will create new direct buffer 64KB.
> When parallelism greater than 100, users need configure 
> "taskmanager.memory.task.off-heap.size" to avoid direct memory OOM. It is 
> hard to configure, and it cost a lot of memory. Consider 1000 parallelism, 
> maybe we need 1GB+ for a task manager.
> This is not conducive to the scenario of less slots and large parallelism. 
> Batch jobs could run little by little, but memory shortage would consume a 
> lot.
> If we provided N-Input operators, maybe things will be worse. This means the 
> number of subpartitions that can be requested at the same time will be more. 
> We have no idea how much memory.
> Here are my rough thoughts:
>  * Obtain memory from network buffers.
>  * provide "The maximum number of subpartitions that can be requested at the 
> same time".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to