[
https://issues.apache.org/jira/browse/FLINK-11408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302181#comment-17302181
]
Kezhu Wang commented on FLINK-11408:
------------------------------------
I think it is same as FLINK-5717 and similar to event time trigger counterpart
FLINK-4862.
The problems is:
* In merging, old trigger states are merged out. In later {{clear}} for old
triggers, they will get no {{fireTimestampState}}. This causes Bug1.
* In merging, only state got merged into new window namespace but processing
timer was not registered there. This causes Bug2.
[~aljoscha] [~mxm] [~dwysakowicz] Could I take over this due to stale
FLINK-5717 ? I plans to add test similar to
{{ContinuousEventTimeTriggerTest.testMergingWindows}} to cover both cases.
> ContinuousProcessingTimeTrigger: NPE on clear() and state is lost on merge
> --------------------------------------------------------------------------
>
> Key: FLINK-11408
> URL: https://issues.apache.org/jira/browse/FLINK-11408
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Environment: Put both bugs in
> [https://github.com/casidiablo/flink-continuous-processing-trigger-bugs]
> This is running Flink 1.7.1 locally.
> Reporter: Cristian
> Priority: Major
>
> I was testing session windows using processing time and found a couple of
> problems with the
> ContinuousProcessingTimeTrigger. The first one is an NPE in the clear method:
> [https://github.com/casidiablo/flink-continuous-processing-trigger-bugs/blob/master/src/main/java/flink/bug/Bug1.java]
> The second one, which is most likely related and the root cause of the first
> one, is that the way the state is merged for windows that are merged somehow
> makes it so that the trigger gets confused and it stops triggering:
>
> [https://github.com/casidiablo/flink-continuous-processing-trigger-bugs/blob/master/src/main/java/flink/bug/Bug2.java]
>
> I managed to solve both of these by using a modified version of
> ContinuousProcessingTimeTrigger which does NOT call
> `ctx.mergePartitionedState(stateDesc);` in the onMerge method.
> This is what I understand it happens at the trigger level:
> * The first element in the stream sets an initial fire time (logic is in
> ContinuousProcessingTimeTrigger#onElement()), if there is no trigger set.
> * From then on, ContinuousProcessingTimeTrigger#onProcessingTime() is
> responsible for scheduling the next trigger.
> * When the windows are merged (in the case of session windows), somehow the
> clear and merge methods are called using the wrong window namespace (I think
> this is the root cause of the bug, but I'm not too familiar with that code).
> * Because the state is not cleared properly in the new window namespace, the
> previously scheduled trigger gets executed against the window that was
> cleared.
> * Moreover, the new window has the state of the previous window, which means
> that:
> ## onElement will NOT schedule a fire trigger
> ## onProcessingTime will never be called at all
--
This message was sent by Atlassian Jira
(v8.3.4#803005)