[
https://issues.apache.org/jira/browse/FLINK-25728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Piotr Nowojski closed FLINK-25728.
----------------------------------
Fix Version/s: 1.15.0
1.13.6
1.14.4
Resolution: Fixed
Merged to master as a7eadf57e42^ and a7eadf57e42
Merged to release-1.14 as 761a4623dda^ and 761a4623dda
Merged to release-1.13 as 9653999a27b^ and 9653999a27b
> Potential memory leaks in StreamMultipleInputProcessor
> ------------------------------------------------------
>
> Key: FLINK-25728
> URL: https://issues.apache.org/jira/browse/FLINK-25728
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.12.5, 1.15.0, 1.13.5, 1.14.2
> Reporter: pc wang
> Assignee: pc wang
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.15.0, 1.13.6, 1.14.4
>
> Attachments: flink-completablefuture-issue.tar.xz,
> image-2022-01-20-18-43-32-816.png
>
>
> We have an application that contains a broadcast process stage. The
> none-broadcast input has roughly 10 million messages per second, and the
> broadcast side is some kind of control stream, rarely has message follow
> through.
> After several hours of running, the TaskManager will run out of heap memory
> and restart. We reviewed the application code without finding any relevant
> issues.
> We found that the running to crash time was roughly the same. Then we make a
> heap dump before the crash and found mass `CompletableFuture$UniRun`
> instances.
> These `CompletableFuture$UniRun` instances consume several gigabytes memories.
>
> The following pic is from the heap dump we get from a mock testing stream
> with the same scenario.
> !image-2022-01-20-18-43-32-816.png|width=1161,height=471!
>
> After some source code research. We found that it might be caused by the
> *StreamMultipleInputProcessor.getAvailableFuture()*.
> *StreamMultipleInputProcessor* has multiple *inputProcessors* , it's
> *availableFuture* got completed when any of it's input's *availableFuture* is
> complete.
> The current implementation create a new *CompletableFuture* and a new
> *CompletableFuture$UniRun* append to delegate inputProcessor's
> *avaiableFuture*.
> The issue is caused by the stacking of *CompletableFuture$UniRun* on the slow
> inputProcessor's *avaiableFuture*.
> See the source code below.
> [StreamMultipleInputProcessor.java#L65|https://github.com/wpc009/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java#L65]
> Because the *UniRun* holds the reference of outside
> *StreamMultipleInputProcessor*'s avaiableFuture, that cause mass
> *CompletableFuture* instance which can not be recycled.
> We made some modifications to the
> *StreamMultipleInputProcessor*.*getAvaiableFuture* function, and verify that
> the issue is gone on our modified version.
> We are willing to make a PR for this fix.
> Heap Dump File [^flink-completablefuture-issue.tar.xz]
> PS: This is a YourKit heap dump. may be not compatible HPROF files.
> [Sample Code to reproduce the
> issue|https://github.com/wpc009/flink/blob/FLINK-25728/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/MultipleInputStreamMemoryIssueTest.java]
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)