[ 
https://issues.apache.org/jira/browse/SAMZA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16742547#comment-16742547
 ] 

Xinyu Liu commented on SAMZA-2044:
----------------------------------

Hi, [~pgeorgantas]: first, thanks a lot for looking into this problem!

I did a quick look into your rb to understand the problem you are solving here. 
Seems the original problem is due to the amount of data emitted from window 
when we reach end-of-stream. The approach in the RB is to consume the data on 
the spot instead of returning the whole collection. To me, seems there is a 
step already loads the data into memory before getting into this code. The 
loading happens in WindowOperatorImpl, line 258. This line will read from 
RocksDb iterator and load all the data in this window pane into memory, which 
might already cause the OOM.

In my opinion, Samza's current window api has two flavors: with a aggregation 
function (i.e. the foldleft fn) or without the aggregation function. To reduce 
the memory footprint, using window with aggregation fn seems to be the way to 
go. Applying aggregation on the spot instead of buffering all the data then 
apply some aggregation, which can cause memory problem. Could you please see 
whether your use case can use this flavor of the window? 

The same problem might not be present in other functions, e.g. timer, process 
since Samza doesn't accumulate the state by itself. You can perform some clean 
up and emit the results by implementing the ClosableFunction. If that's not 
enough, please let us know your use case and we can enrich the api together.

> EOSMessage causes out of memory Exceptions related to WindowOperatorImpl
> ------------------------------------------------------------------------
>
>                 Key: SAMZA-2044
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2044
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.14.0, 1.0
>            Reporter: Peter Georgantas
>            Priority: Major
>
> The contract of the handleEndOfStream method dictates that a collection of 
> results be returned. In the case of WindowOperatorImpl which has a backing 
> RocksDB store, this effectively causes the entirety of the store to be pulled 
> into memory. In many cases, this will cause out of memory exceptions 
> (otherwise why not keep the store in memory in the first place).
> Since this is a protected api, I have a relatively simple change to propose 
> which could allow data to be consumed downstream as it is brought out of the 
> store:
> {{protected void handleEndOfStream(Consumer<WindowPane<K, Object>> consumer, 
> MessageCollector collector, TaskCoordinator coordinator)}}
> [Pull Request |https://github.com/apache/samza/pull/862]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to