[
https://issues.apache.org/jira/browse/FLINK-5652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15851882#comment-15851882
]
ASF GitHub Bot commented on FLINK-5652:
---------------------------------------
GitHub user tillrohrmann opened a pull request:
https://github.com/apache/flink/pull/3264
[FLINK-5652] [asyncIO] Cancel timers when completing a
StreamRecordQueueEntry
Whenever a StreamRecordQueueEntry has been completed we no longer need the
registered timeout.
Therefore, we have to cancel the corresponding ScheduledFuture so that the
system knows that
it can remove the associated TriggerTask. This is important since the
TriggerTask contains a
reference on the StreamRecordQueueEntry. Consequently, such a task will
prevent the
StreamRecordQueueEntry from being garbage collected.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tillrohrmann/flink asyncIOFixTimers
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3264.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3264
----
----
> Memory leak in AsyncDataStream
> ------------------------------
>
> Key: FLINK-5652
> URL: https://issues.apache.org/jira/browse/FLINK-5652
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.3.0
> Reporter: Dmitry Golubets
> Assignee: Till Rohrmann
> Fix For: 1.3.0, 1.2.1
>
>
> When async operation timeout is > 0, the number of StreamRecordQueueEntry
> instances keeps growing.
> It can be easily reproduced with the following code:
> {code}
> val src: DataStream[Int] = env.fromCollection((1 to Int.MaxValue).iterator)
>
> val asyncFunction = new AsyncFunction[Int, Int] with Serializable {
> override def asyncInvoke(input: Int, collector: AsyncCollector[Int]):
> Unit = {
> collector.collect(List(input))
> }
> }
>
> AsyncDataStream.unorderedWait(src, asyncFunction, 1, TimeUnit.MINUTES,
> 1).print()
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)