pnowojski commented on a change in pull request #12120:
URL: https://github.com/apache/flink/pull/12120#discussion_r426226419
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java
##########
@@ -85,6 +90,6 @@ private int read(byte[] buffer) {
@Override
public void close() throws Exception {
- closeAll(stream, file::release);
+ closeAll(stream, file::release, () ->
buffersToClose.forEach(Buffer::recycleBuffer));
Review comment:
I don't understand it how is it working right now. Where are the non
last buffers recycled? And why do you have to handle the last buffer
differently? As I mentioned before, I think:
> The ownership of the buffers returned from the iterator should belong to
the caller of next() method.
so buffers' ownership should be handed over, and owner should be always
responsible to recycle it when it's done using it. For in memory collections,
all of the remaining buffers should be recycled via closing the iterator (as he
would be the owner of not yet returned buffers).
I guess we are avoiding memory leaks right now, only because the returned
Buffers are not pooled and using simple `FreeingBufferRecycler`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]