[ 
https://issues.apache.org/jira/browse/FLINK-5652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15851882#comment-15851882
 ] 

ASF GitHub Bot commented on FLINK-5652:
---------------------------------------

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/3264

    [FLINK-5652] [asyncIO] Cancel timers when completing a 
StreamRecordQueueEntry

    Whenever a StreamRecordQueueEntry has been completed we no longer need the 
registered timeout.
    Therefore, we have to cancel the corresponding ScheduledFuture so that the 
system knows that
    it can remove the associated TriggerTask. This is important since the 
TriggerTask contains a
    reference on the StreamRecordQueueEntry. Consequently, such a task will 
prevent the
    StreamRecordQueueEntry from being garbage collected.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink asyncIOFixTimers

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3264.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3264
    
----

----


> Memory leak in AsyncDataStream
> ------------------------------
>
>                 Key: FLINK-5652
>                 URL: https://issues.apache.org/jira/browse/FLINK-5652
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.3.0
>            Reporter: Dmitry Golubets
>            Assignee: Till Rohrmann
>             Fix For: 1.3.0, 1.2.1
>
>
> When async operation timeout is > 0, the number of StreamRecordQueueEntry 
> instances keeps growing.
> It can be easily reproduced with the following code:
> {code}
>   val src: DataStream[Int] = env.fromCollection((1 to Int.MaxValue).iterator)
>   
>   val asyncFunction = new AsyncFunction[Int, Int] with Serializable {
>     override def asyncInvoke(input: Int, collector: AsyncCollector[Int]): 
> Unit = {
>       collector.collect(List(input))
>     }
>   }
>   
>   AsyncDataStream.unorderedWait(src, asyncFunction, 1, TimeUnit.MINUTES, 
> 1).print()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to