[
https://issues.apache.org/jira/browse/FLINK-12529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16842043#comment-16842043
]
Piotr Nowojski commented on FLINK-12529:
----------------------------------------
I think this is a valid improvement and doesn't sound/look too complicated. The
only issue that I can see, is that related code is now being changed in couple
of different efforts (TwoInputStreamSelectable and for example my PR
[https://github.com/apache/flink/pull/8467] ).
But as far as I understand you would only need to modify the
{{Stream(Two)InputProcessor#processBufferOrEvent}} code (assuming that this
change would base on my PR?).
As a side note, I was planning to deduplicate the {{StreamTwoInputProcessor}}
and {{StreamInputProcessor}} classes in some near future, but I'm waiting for
{{TwoInputStreamSelectable}} effort to conclude.
> Release record-deserializer buffers timely to improve the efficiency of heap
> usage on taskmanager
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-12529
> URL: https://issues.apache.org/jira/browse/FLINK-12529
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Operators
> Affects Versions: 1.8.0
> Reporter: Haibo Sun
> Assignee: Haibo Sun
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> In input processors (`StreamInputProcessor` and `StreamTwoInputProcessor`),
> each input channel has a corresponding record deserializer. Currently, these
> record deserializers are cleaned up at the end of the task (look at
> `StreamInputProcessor#cleanup()` and `StreamTwoInputProcessor#cleanup()`).
> This is not a problem for unbounded streams, but it may reduce the efficiency
> of heap memory usage on taskmanger when input is bounded stream.
> For example, in case that all inputs are bounded streams, some of them end
> very early because of the small amount of data, and the other end very late
> because of the large amount of data, then the buffers of the record
> deserializers corresponding to the input channels finished early is idle for
> a long time and no longer used.
> In another case, when both unbounded and bounded streams exist in the inputs,
> the buffers of the record deserializers corresponding to the bounded stream
> are idle for ever (no longer used) after the bounded streams are finished.
> Especially when the record and the parallelism of upstream are large, the
> total size of `SpanningWrapper#buffer` are very large. The size of
> `SpanningWrapper#buffer` is allowed to reach up to 5 MB, and if the
> parallelism of upstream is 100, the maximum total size will reach 500 MB (in
> our production, there are jobs with the record size up to hundreds of KB and
> the parallelism of upstream up to 1000).
> Overall, after receiving `EndOfPartitionEvent` from the input channel, the
> corresponding record deserializer should be cleared immediately to improve
> the efficiency of heap memory usage on taskmanager.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)