Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r152836741
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
    @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) {
                        exclusiveBuffers.add(buffer);
                }
     
    -           Buffer takeExclusiveBuffer() {
    -                   return exclusiveBuffers.poll();
    -           }
    -
                void addFloatingBuffer(Buffer buffer) {
                        floatingBuffers.add(buffer);
                }
     
    -           Buffer takeFloatingBuffer() {
    -                   return floatingBuffers.poll();
    +           /**
    +            * Add the exclusive buffer into the queue, and recycle one 
floating buffer if the
    +            * number of available buffers in queue is more than required 
amount.
    +            *
    +            * @param buffer The exclusive buffer of this channel.
    +            * @return Whether to recycle one floating buffer.
    +            */
    +           boolean maintainTargetSize(Buffer buffer) {
    +                   exclusiveBuffers.add(buffer);
    +
    +                   if (getAvailableBufferSize() > numRequiredBuffers) {
    +                           Buffer floatingBuffer = floatingBuffers.poll();
    +                           floatingBuffer.recycle();
    +                           return true;
    +                   } else {
    +                           return false;
    +                   }
                }
     
    -           int getFloatingBufferSize() {
    -                   return floatingBuffers.size();
    +           /**
    +            * Take the floating buffer first if possible.
    +            */
    +           @Nullable
    +           Buffer takeBuffer() {
    +                   if (floatingBuffers.size() > 0) {
    +                           return floatingBuffers.poll();
    +                   } else {
    +                           return exclusiveBuffers.poll();
    +                   }
    +           }
    +
    +           /**
    +            * The floating buffer is recycled to local buffer pool 
directly, and the
    +            * exclusive buffer will be gathered to return to global buffer 
pool later.
    +            */
    +           void releaseAll(List<MemorySegment> exclusiveSegments) {
    --- End diff --
    
    please document the `exclusiveSegments` parameter to make it absolutely 
clear for the user that we will add memory segments of exclusive buffers


---

Reply via email to