[
https://issues.apache.org/jira/browse/FLINK-24501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-24501:
-----------------------------------
Labels: pull-request-available (was: )
> Unexpected behavior of cumulate window aggregate for late event after recover
> from sp/cp
> ----------------------------------------------------------------------------------------
>
> Key: FLINK-24501
> URL: https://issues.apache.org/jira/browse/FLINK-24501
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Reporter: JING ZHANG
> Assignee: JING ZHANG
> Priority: Major
> Labels: pull-request-available
>
> *Problem description*
> After recover from savepoint or checkpoint, unexpected behavior of cumulate
> window aggregate for late event may happened.
> *Bug analyze*
> Currently, for cumulate window aggregate, late events belongs to the cleaned
> slice would be merged into the merged window state, and would be counted into
> the later slice.
> For example, for a CUMULATE window, step is 1 minute, size is 1 day.
> {code:java}
> SELECT window_start, window_end, COUNT(USER_ID)
> FROM TABLE(
> CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '1' MINUTES, INTERVAL
> '1' DAY))
> GROUP BY window_start, window_end;{code}
> When the watermark already comes to 11:01, result of window [00:00, 11:01)
> would be emitted. Let's assume the result is INSERT (00:00, 11:01, 4)
> Then if a late record which event time is 11:00 comes, it would be merged
> into merged state, and would be counted into the later slice, for example,
> for window [00:00, 11:02), [00:00, 11:03)... But the emitted window result
> INSERT (00:00, 11:01, 4) would not be retracted and updated.
> The behavior would be different if the job recover from savepoint/checkpoint.
> Let's do a savepoint after watermark comes to 11:01 and emit (00:00, 11:01,
> 4).
> Then recover the job from savepoint. Watermarks are not checkpointed and they
> need to be repopulated again. So after recovered, the watermark may rollback
> to 11:00, then if a record which event time is 11:00 comes, it would not be
> processed as late event, after watermark comes to 11:01 again, a window
> result INSERT (00:00, 11:01, 5) would be emitted to downstream.
> So the downstream operator would receive two INSERT record for WINDOW (00:00,
> 11:01) which may leads to wrong result.
>
> *Solution*
> There are two solutions for the problem:
> # save watermark to state in slice shared operator. (Prefered)
> # update the behavior for late event. For example, retract the emitted
> result and send the updated result. It needs to change the behavior of slice
> state clean mechanism because we clean the slice state after watermark
> exceeds the slice end currently.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)