[ 
https://issues.apache.org/jira/browse/FLINK-11408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302181#comment-17302181
 ] 

Kezhu Wang commented on FLINK-11408:
------------------------------------

I think it is same as FLINK-5717 and similar to event time trigger counterpart 
FLINK-4862.

The problems is:
 * In merging, old trigger states are merged out. In later {{clear}} for old 
triggers, they will get no {{fireTimestampState}}. This causes Bug1.
 * In merging, only state got merged into new window namespace but processing 
timer was not registered there. This causes Bug2.

[~aljoscha] [~mxm] [~dwysakowicz] Could I take over this due to stale 
FLINK-5717 ? I plans to add test similar to 
{{ContinuousEventTimeTriggerTest.testMergingWindows}} to cover both cases.

> ContinuousProcessingTimeTrigger: NPE on clear() and state is lost on merge
> --------------------------------------------------------------------------
>
>                 Key: FLINK-11408
>                 URL: https://issues.apache.org/jira/browse/FLINK-11408
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>         Environment: Put both bugs in 
> [https://github.com/casidiablo/flink-continuous-processing-trigger-bugs]
> This is running Flink 1.7.1 locally.
>            Reporter: Cristian
>            Priority: Major
>
> I was testing session windows using processing time and found a couple of 
> problems with the 
> ContinuousProcessingTimeTrigger. The first one is an NPE in the clear method:
> [https://github.com/casidiablo/flink-continuous-processing-trigger-bugs/blob/master/src/main/java/flink/bug/Bug1.java]
> The second one, which is most likely related and the root cause of the first 
> one, is that the way the state is merged for windows that are merged somehow 
> makes it so that the trigger gets confused and it stops triggering:
>  
> [https://github.com/casidiablo/flink-continuous-processing-trigger-bugs/blob/master/src/main/java/flink/bug/Bug2.java]
>  
> I managed to solve both of these by using a modified version of 
> ContinuousProcessingTimeTrigger which does NOT call 
> `ctx.mergePartitionedState(stateDesc);` in the onMerge method.
> This is what I understand it happens at the trigger level:
>  * The first element in the stream sets an initial fire time (logic is in
> ContinuousProcessingTimeTrigger#onElement()), if there is no trigger set.
>  * From then on, ContinuousProcessingTimeTrigger#onProcessingTime() is 
> responsible for scheduling the next trigger. 
>  * When the windows are merged (in the case of session windows), somehow the 
> clear and merge methods are called using the wrong window namespace (I think 
> this is the root cause of the bug, but I'm not too familiar with that code).
>  * Because the state is not cleared properly in the new window namespace, the 
> previously scheduled trigger gets executed against the window that was 
> cleared.
>  * Moreover, the new window has the state of the previous window, which means 
> that:
>  ## onElement will NOT schedule a fire trigger
>  ## onProcessingTime will never be called at all



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to