Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/6254#discussion_r201272052 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -594,22 +626,22 @@ public String getMessage() { } /** - * Adds an exclusive buffer (back) into the queue and recycles one floating buffer if the + * Adds an exclusive buffer (back) into the queue and removes one floating buffer if the * number of available buffers in queue is more than the required amount. * * @param buffer The exclusive buffer to add * @param numRequiredBuffers The number of required buffers * - * @return How many buffers were added to the queue + * @return How many buffers were added to the queue (<tt>0</tt> or <tt>1</tt>) and the + * floating buffer which was removed and should be released (outside!) */ - int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) { + Tuple2<Integer, Optional<Buffer>> addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) { exclusiveBuffers.add(buffer); if (getAvailableBufferSize() > numRequiredBuffers) { --- End diff -- true, we could change that but the `BufferQueue` logic in `addExclusiveBuffer` and `addFloatingBuffer` does not enforce this contract (only outside code does). If outside code decides to break this contract (I can't think of a reason why) than the current one-for-one logic with `getAvailableBufferSize() > numRequiredBuffers` ensures that we do not further deviate from the target. This is purely a theoretical concern and definitely out of the scope of this ticket - I would leave it unchanged for now since we don't gain anything with that change.
---