[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16181713#comment-16181713
 ] 

Paolo Rendano edited comment on FLINK-7606 at 9/26/17 10:59 PM:
----------------------------------------------------------------

Hi [~kkl0u],
1) sure I have to set and I was not setting it in my test: 
{code:java}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
{code}
I've double checked this and without the set I have a memory leak (as reported 
by [~matteoferrario29]). Looking at the memory after the test, it seems that 
used keys are disposed (the memory come back to the initial size after last 
GC). Example (after processing 100k keys 2msgs/key): 
[^Schermata 2017-09-27 alle 00.35.53.png]

2) I've done again my test related with issue FLINK-7549 adding more logs and 
checking again the result and now it seems that all the expected events are 
generated, but... the last chunk of events (maybe thousands) are not generated 
until I run again the test (even 1 more message is enough to trigger the 
generation of all the remaining events). It seems the minimum number is about 
5k input messages before it starts to flush out the buffer. So.. the question 
is: *can you explain the strategy to flush out the generation of events*? How 
to trigger it? Of course as it is now can block the generation of events until 
a new message is processed (maybe with a watermark that exceed that <last 
message timestamp>+10 sec).
Just one answer to your questions regarding my last test scenario: parallelism 
is 1, Idle precedes Start, Idle timestamp is set to x and Start timestamp is 
set to x+1sec, no delay set between messages, and during the test I see the 
watermark advancing. Since the generation of messages in jmeter is in a cycle 
and no delay between cycles, x+1sec of a cycle can be greater than x in the 
following cycle. This was set intentionally by me to verify reordering of 
events.

Thanks
Paolo



was (Author: i...@paolorendano.it):
Hi [~kkl0u],
1) sure I have to set and I was not setting it in my test: 
{code:java}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
{code}
I've double checked this and without the set I have a memory leak (as reported 
by [~matteoferrario29]). Looking at the memory after the test, it seems that 
used keys are disposed (the memory come back to the initial size after last 
GC). Example (after processing 100k keys 2msgs/key): 
[^Schermata 2017-09-27 alle 00.35.53.png]

2) I've done again my test related with issue FLINK-7606 adding more logs and 
checking again the result and now it seems that all the expected events are 
generated, but... the last chunk of events (maybe thousands) are not generated 
until I run again the test (even 1 more message is enough to trigger the 
generation of all the remaining events). It seems the minimum number is about 
5k input messages before it starts to flush out the buffer. So.. the question 
is: *can you explain the strategy to flush out the generation of events*? How 
to trigger it? Of course as it is now can block the generation of events until 
a new message is processed (maybe with a watermark that exceed that <last 
message timestamp>+10 sec).
Just one answer to your questions regarding my last test scenario: parallelism 
is 1, Idle precedes Start, Idle timestamp is set to x and Start timestamp is 
set to x+1sec, no delay set between messages, and during the test I see the 
watermark advancing. Since the generation of messages in jmeter is in a cycle 
and no delay between cycles, x+1sec of a cycle can be greater than x in the 
following cycle. This was set intentionally by me to verify reordering of 
events.

Thanks
Paolo


> CEP operator leaks state
> ------------------------
>
>                 Key: FLINK-7606
>                 URL: https://issues.apache.org/jira/browse/FLINK-7606
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.1
>            Reporter: Matteo Ferrario
>         Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to