[
https://issues.apache.org/jira/browse/FLINK-39932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18088848#comment-18088848
]
Sameer Devgan commented on FLINK-39932:
---------------------------------------
I have identified the fix and would like to be assigned this ticket to open a PR
> [flink-datastream] Merging-Window onRecord Callback Receives Wrong Window
> Context in DataStream V2
> --------------------------------------------------------------------------------------------------
>
> Key: FLINK-39932
> URL: https://issues.apache.org/jira/browse/FLINK-39932
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 2.1.3, 2.1.4
> Reporter: Sameer Devgan
> Priority: Major
>
> {{{}In OneInputWindowProcessOperator.processElement{}}}, after
> {{MergingWindowSet.addWindow()}} computes the post-merge result
> ({{{}actualWindow{}}}), two lines in the merging branch still reference the
> original pre-merge candidate window ({{{}window{}}}) instead of
> {{actualWindow.}}
> {code:java}
> outputCollector.setTimestamp(window.maxTimestamp());{code}
> {code:java}
> windowFunctionContext.setWindow(window);{code}
> {{while the trigger context has the correct refrence of actual window ( or
> the merged window) , windowFunctionContext}}{{ and }}{{outputCollector}}{{
> are set from }}{{window}} (bug ripples to here )
> {code:java}
> triggerContext.setWindow(actualWindow);{code}
> {{*Impact*}}
> {{When two session windows merge, the WindowContext}} exposed to the user
> function reports the stale window's bounds, not the merged window bounds so
> any logic inside {{onRecord}} that reads {{windowContext.getStartTime()}} or
> {{getEndTime()}} operates on stale pre-merge data.
> Also ,
> {code:java}
> outputCollector.setTimestamp(window.maxTimestamp()){code}
> {{assigns the candidate window's max-timestamp to records emitted during
> }}{{{}onRecord{}}}{{{}. for a late record all emitted records carry an
> incorrectly early timestamp of the window and not the merged window.{}}}
> {{Reproduce }}
> {{1) Create a session window with with a inactivity gap of 10s }}
> {{{}2) Process two records at R1:timestamp 0s and R2:timestanp
> 5s{}}}windowFunctionContext.setWindow(window) uses the candidate
> [5_000, 15_000) instead of actualWindow [0, 15_000), so getStartTime()
> returns 5_000 instead of 0.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)