[ 
https://issues.apache.org/jira/browse/FLINK-33192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853344#comment-17853344
 ] 

Kartikey Pant commented on FLINK-33192:
---------------------------------------

Hello [~Yanfei Lei],

I've submitted a Pull Request ([GitHub Pull Request 
#24917|https://github.com/apache/flink/pull/24917]) aimed at resolving the 
reported bug. Additionally, I believe there may be a memory leak possibility 
within the MergingWindow section 
([https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L369-L388]).
 Consequently, I've included the relevant modification in this pull request as 
well.

Could you please review it and let me know if everything looks good, or if any 
further adjustments or validations are necessary?

Thank you.

> State memory leak in the Window Operator due to unregistered cleanup timer
> --------------------------------------------------------------------------
>
>                 Key: FLINK-33192
>                 URL: https://issues.apache.org/jira/browse/FLINK-33192
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.14.6, 1.15.4, 1.16.2, 1.17.1
>            Reporter: Vidya Sagar Kalvakunta
>            Assignee: Kartikey Pant
>            Priority: Major
>              Labels: easyfix, pull-request-available
>
> I have encountered a state memory leak issue in the default window operator. 
> The cleanup timer for a window is not registered when it does not emit a 
> result if it’s fired immediately after creation. The window is added to the 
> window state and as the cleanup timer isn't registered, it's never cleaned 
> up, allowing it to live forever.
> *Steps to Reproduce:*
>  # Write a custom trigger that triggers for every element.
>  # Write a custom aggregate function that never produces a result.
>  # Use a default tumbling event time window with this custom trigger and 
> aggregate function.
>  # Publish events spanning multiple time windows.
>  # The window state will contain all the windows even after their 
> expiry/cleanup time.
> *Code with the bug:*
> [https://github.com/apache/flink/blob/cd95b560d0c11a64b42bf6b98107314d32a4de86/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L398-L417|https://github.com/apache/flink/blob/cd95b560d0c11a64b42bf6b98107314d32a4de86/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L399-L417]
> {code:java}
> windowState.setCurrentNamespace(window);
> windowState.add(element.getValue());
> if (triggerResult.isFire()) {
>     ACC contents = windowState.get();
>     if (contents == null) {
>         continue;
>     }
>     emitWindowContents(window, contents);
> }
> if (triggerResult.isPurge()) {
>     windowState.clear();
> }
> registerCleanupTimer(window);{code}
>  
> *Expected Result:*
> The cleanup timer should be registered for every window that's added to the 
> window state regardless of it emitting a result after it’s fired.
> *Actual Result:*
> The cleanup timer is not registered for a window when it does not emit a 
> result after it’s fired, causing the window state that is already created to 
> live on indefinitely.
> *Impact:*
> This issue led to a huge state memory leak in our applications and was very 
> challenging to identify.
>  
> *Fix:*
> There are two ways to fix this issue. I'm willing to create a PR with the fix 
> if approved.
> 1. Register the cleanup timer immediately after a window is added to the 
> state.
> {code:java}
> windowState.setCurrentNamespace(window);
> windowState.add(element.getValue());
> registerCleanupTimer(window);{code}
> 2. Emit the results when the contents are not null and remove the continue 
> statement.
> {code:java}
> if (triggerResult.isFire()) {
>     ACC contents = windowState.get();
>     if (contents != null) {         
>         emitWindowContents(window, contents);
>     }
> } {code}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to