[ 
https://issues.apache.org/jira/browse/SAMZA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743157#comment-16743157
 ] 

Peter Georgantas commented on SAMZA-2044:
-----------------------------------------

Hi Xinyu. Thank you very much for looking and taking the time to respond. I 
very much appreciate it.

I do understand your point on how the data is read from the RocksDb iterator 
and your point on using the FoldLeftFunction makes perfect sense as a means to 
reduce memory. However, I believe that may only help in the case of a single 
TriggerKey. My concern for the handleEndOfStream method comes from the fact 
that data is read from the RocksDb iterator for _every TriggerKey remaining in 
the window_. [Lines 
207-213|https://github.com/apache/samza/blob/54ba4c09fc625024cca145fa340e4ab06571bfcb/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java#L207-L213].

Let's define some terms and come up with a formula for approximating the memory 
usage in the normal streaming scenario and the end of message scenario. Let's 
ignore the FoldLeft mechanism for a moment and assume all the keys are distinct 
and the rate of messages is constant. For *W* window time, *M* number of 
messages enter the window. The Window is checked at interval *I*.

During normal streaming the amount of memory needed to handle the trigger 
message is *M* / (*W* / *I*).
For example: if *W=5minutes*, *M=500,000messages*, and *I=600ms* then memory 
needed is equivalent to that of *1,000 messages*. I'll concede for now that 
this is acceptable and can be tuned down.

However, to handle an End of Stream event, the amount of memory needed is just 
*M*, *500,000* messages. That ends up being equivalent to the disk space needed 
for RocksDB to store the window, hence the question: why use RocksDb for 
windowing at all?

I thought of another analogy that may help our understanding: Think of a Samza 
graph as being a Tree data structure. The current algorithm for passing 
messages through the graph, (traversing or searching a tree) is breadth-first. 
All messages (paths) for each operator (node) are discovered before continuing 
to the next operator. My proposal changes the algorithm to that of depth-first. 
Pass messages down to the next operator as they are discovered. Breadth-first 
search has memory constraints O(n). Depth-first search has memory constraints 
of O(log(n)). That is the crux of my proposal.

I'm happy to discuss some alternative solutions as well. Another I thought of 
was a "maximum batch size" constraint to be used in the handleEndOfStream 
method. The caller would repetitively call handleEndOfStream until an empty 
collection was returned, indicating the results had be exhausted. This idea 
seemed less desirable with possible unintended side effects.

Thanks again for your consideration. I'll be looking forward to your thoughts.



> EOSMessage causes out of memory Exceptions related to WindowOperatorImpl
> ------------------------------------------------------------------------
>
>                 Key: SAMZA-2044
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2044
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.14.0, 1.0
>            Reporter: Peter Georgantas
>            Priority: Major
>
> The contract of the handleEndOfStream method dictates that a collection of 
> results be returned. In the case of WindowOperatorImpl which has a backing 
> RocksDB store, this effectively causes the entirety of the store to be pulled 
> into memory. In many cases, this will cause out of memory exceptions 
> (otherwise why not keep the store in memory in the first place).
> Since this is a protected api, I have a relatively simple change to propose 
> which could allow data to be consumed downstream as it is brought out of the 
> store:
> {{protected void handleEndOfStream(Consumer<WindowPane<K, Object>> consumer, 
> MessageCollector collector, TaskCoordinator coordinator)}}
> [Pull Request |https://github.com/apache/samza/pull/862]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to