rkhachatryan commented on a change in pull request #12120:
URL: https://github.com/apache/flink/pull/12120#discussion_r426263155



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java
##########
@@ -85,6 +90,6 @@ private int read(byte[] buffer) {
 
        @Override
        public void close() throws Exception {
-               closeAll(stream, file::release);
+               closeAll(stream, file::release, () -> 
buffersToClose.forEach(Buffer::recycleBuffer));

Review comment:
       > I don't understand it how is it working right now. Where are the non 
last buffers recycled? 
   
   In the beginnning of `FileBasedBufferIterator.next()`.
   
   > And why do you have to handle the last buffer differently?
   
   Because `next()` won't be called anymore.
   
   > The ownership of the buffers returned from the iterator should belong to 
the caller of next() method.
   
   The client doesn't know whether the iterator is lazy or not. So if failure 
happens before iteration, it would have to read the whole file just to recycle 
the buffers.
   
   > I guess we are avoiding memory leaks right now, only because the returned 
Buffers are not pooled and using simple FreeingBufferRecycler?
   
   No, the buffers are recycled explicitly (see above).
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to