beyond1920 opened a new pull request #17509:
URL: https://github.com/apache/flink/pull/17509
## What is the purpose of the change
### Problem description
After recover from savepoint or checkpoint, unexpected behavior of cumulate
window aggregate for late event may happened.
### Bug analyze
Currently, for cumulate window aggregate, late events belongs to the cleaned
slice would be merged into the merged window state, and would be counted into
the later slice.
For example, for a CUMULATE window, step is 1 minute, size is 1 day.
SELECT window_start, window_end, COUNT(USER_ID)
FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '1' MINUTES, INTERVAL
'1' DAY))
GROUP BY window_start, window_end;
When the watermark already comes to 11:01, result of window [00:00, 11:01)
would be emitted. Let's assume the result is INSERT (00:00, 11:01, 4)
Then if a late record which event time is 11:00 comes, it would be merged
into merged state, and would be counted into the later slice, for example, for
window [00:00, 11:02), [00:00, 11:03)... But the emitted window result INSERT
(00:00, 11:01, 4) would not be retracted and updated.
The behavior would be different if the job recover from savepoint/checkpoint.
Let's do a savepoint after watermark comes to 11:01 and emit (00:00, 11:01,
4).
Then recover the job from savepoint. Watermarks are not checkpointed and
they need to be repopulated again. So after recovered, the watermark may
rollback to 11:00, then if a record which event time is 11:00 comes, it would
not be processed as late event, after watermark comes to 11:01 again, a window
result INSERT (00:00, 11:01, 5) would be emitted to downstream.
So the downstream operator would receive two INSERT record for WINDOW
(00:00, 11:01) which may leads to wrong result.
This pull request aims to solve the problem by storing window progress into
state in order to check whether an input event is late or not.
## Brief change log
- Update `WindowValueState` in order to reuse it store window progress
- Update `AbstractWindowAggProcessor` to store window progress into keyed
state
## Verifying this change
- Update UT in
`SlicingWindowAggOperatorTest#testEventTimeCumulativeWindows` to check the late
event would still be processed as late event even if the job is restored from
savepoint.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]