Sameer Devgan created FLINK-39932:
-------------------------------------

             Summary: [flink-datastream] Merging-Window onRecord Callback 
Receives Wrong Window Context in DataStream V2
                 Key: FLINK-39932
                 URL: https://issues.apache.org/jira/browse/FLINK-39932
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 2.1.3, 2.1.4
            Reporter: Sameer Devgan


{{{}In {{{}OneInputWindowProcessOperator.processElement{}}}, after 
{{MergingWindowSet.addWindow()}} computes the post-merge result 
({{{}actualWindow{}}}), two lines in the merging branch still reference the 
original pre-merge candidate window ({{{}window{}}}) instead of 
{{actualWindow.}}{}}}{{{}{{}}{}}}{{{}{{**}}{}}}
{code:java}
outputCollector.setTimestamp(window.maxTimestamp());{code}
{{{{**}}}}
{code:java}
windowFunctionContext.setWindow(window);{code}
{{{}{{**}}{}}}{{{}{{while the trigger context has the correct refrence of 
actual window ( or the merged window) , }}{{windowFunctionContext}}{{ and 
}}{{outputCollector}}{{ are set from }}{{window}}{{ (bug ripples to here 
).}}{}}}
{{{{}}}}
{{{{ ** }}}}
{code:java}
triggerContext.setWindow(actualWindow);{code}
{{{{}}}}
{{{{}}}}

{{*Impact*}}

{{When two session windows merge, the {{WindowContext}} exposed to the user 
function reports the stale window's bounds, not the merged window bounds so any 
logic inside {{onRecord}} that reads {{windowContext.getStartTime()}} or 
{{getEndTime()}} operates on stale pre-merge data.}}

{{{{Also , }}}}
{{{{{}{}}}{{{}{}}}}}
{code:java}
outputCollector.setTimestamp(window.maxTimestamp()){code}
{{{{{}{}}}{{{}assigns the candidate window's max-timestamp to records emitted 
during {}}}{{{}onRecord{}}}{{{}. for a late record all emitted records carry an 
incorrectly early timestamp of the window and not the merged window. {}}}}}
{{{{}}}}
{{Reproduce }}
{{1) Create a session window with with a inactivity gap of 10s }}
{{2) Process two records at R1:timestamp 0s and R2:timestanp 5s}}

{{}}
windowFunctionContext.setWindow(window) uses the candidate
[5_000, 15_000) instead of actualWindow [0, 15_000), so getStartTime() returns 
5_000 instead of 0.
{{{{}}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to