[ 
https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15130452#comment-15130452
 ] 

Gabor Gevay commented on FLINK-3291:
------------------------------------

OK, sorry, I think I have now understood what you meant by ReduceDriver.run not 
tracking the object returned from the iterator call. The problem here is that 
after 0a8df6d513fa59d650ff875bdf3a1613d0f14af5, I mustn't modify an object that 
I have given to an iterator.next call as a reuse object, because 
MergeIterator.HeadStream.nextHead saves a reference to it, and expects that 
object to not change. But this seems like a rather scary requirement, and I 
wouldn't be sure that some other code besides ReduceDriver somewhere doesn't 
also violate it.

I think that the root cause of these issues, is that the documentation about 
object reuse [1] is rather inadequate in clearly stating what are the contracts 
in this area, so I tried to put together a Google Doc about this: [2]. Could 
you please look at it, and tell me how much it aligns with your way of thinking 
about object reuse?

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
[2] 
https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit

> Object reuse bug in MergeIterator.HeadStream.nextHead
> -----------------------------------------------------
>
>                 Key: FLINK-3291
>                 URL: https://issues.apache.org/jira/browse/FLINK-3291
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Runtime
>    Affects Versions: 1.0.0
>            Reporter: Gabor Gevay
>            Assignee: Gabor Gevay
>            Priority: Critical
>
> MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the 
> `reuse` object that it got as an argument. This object might be modified 
> later by the caller.
> This actually happens when ReduceDriver.run calls input.next (which will 
> actually be MergeIterator.next(E reuse)) in the inner while loop of the 
> objectReuseEnabled branch, and that calls top.nextHead with the reference 
> that it got from ReduceDriver, which erroneously saves the reference, and 
> then ReduceDriver later uses that same object for doing the reduce.
> Another way in which this fails is when MergeIterator.next(E reuse) gives 
> `reuse` to different `top`s in different calls, and then the heads end up 
> being the same object.
> You can observe the latter situation in action by running ReducePerformance 
> here:
> https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug
> Set memory to -Xmx200m (so that the MergeIterator actually has merging to 
> do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then 
> watch `reuse`, and the heads of the first two elements of `this.heap` in the 
> debugger. They will get to be the same object after hitting continue about 6 
> times.
> You can also look at the count that is printed at the end, which shouldn't be 
> larger than the key range. Also, if you look into the output file 
> /tmp/xxxobjectreusebug, for example the key 999977 appears twice.
> The good news is that I think I can see an easy fix that doesn't affect 
> performance: MergeIterator.HeadStream could have a reuse object of its own as 
> a member, and give that to iterator.next in nextHead(E reuse). And then we 
> wouldn't need the overload of nextHead that has the reuse parameter, and 
> MergeIterator.next(E reuse) could just call its other overload.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to