beyond1920 opened a new pull request #17509:
URL: https://github.com/apache/flink/pull/17509


   ## What is the purpose of the change
   
   ### Problem description
   
   After recover from savepoint or checkpoint, unexpected behavior of cumulate 
window aggregate for late event may happened.
   
   ### Bug analyze
   
   Currently, for cumulate window aggregate, late events belongs to the cleaned 
slice would be merged into the merged window state, and would be counted into 
the later slice.
   
   For example, for a CUMULATE window, step is 1 minute, size is 1 day.
   
   SELECT window_start, window_end, COUNT(USER_ID)
     FROM TABLE(
       CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '1' MINUTES, INTERVAL 
'1' DAY))
     GROUP BY window_start, window_end;
   When the watermark already comes to 11:01, result of window [00:00, 11:01) 
would be emitted. Let's assume the result is INSERT (00:00, 11:01, 4)
   
   Then if a late record which event time is 11:00 comes, it would be merged 
into merged state, and would be counted into the later slice, for example, for 
window [00:00, 11:02), [00:00, 11:03)... But the emitted window result INSERT 
(00:00, 11:01, 4) would not be retracted and updated.
   
   The behavior would be different if the job recover from savepoint/checkpoint.
   
   Let's do a savepoint after watermark comes to 11:01 and emit (00:00, 11:01, 
4).
   
   Then recover the job from savepoint. Watermarks are not checkpointed and 
they need to be repopulated again. So after recovered, the watermark may 
rollback to 11:00, then if a record which event time is 11:00 comes, it would 
not be processed as late event, after watermark comes to 11:01 again, a window 
result INSERT (00:00, 11:01, 5)  would be emitted to downstream.
   
   So the downstream operator would receive two INSERT record for WINDOW 
(00:00, 11:01) which may leads to wrong result.
   
    This pull request aims to solve the problem by storing window progress into 
state in order to check whether an input event is late or not.
   
   ## Brief change log
   
     - Update `WindowValueState` in order to reuse it store window progress
     - Update `AbstractWindowAggProcessor` to store window progress into keyed 
state
   
   ## Verifying this change
     - Update UT in 
`SlicingWindowAggOperatorTest#testEventTimeCumulativeWindows` to check the late 
event would still be processed as late event even if the job is restored from 
savepoint.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to