Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4509#discussion_r143230905
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
---
@@ -306,10 +311,27 @@ public void recycle(MemorySegment segment) {
ExceptionUtils.rethrow(t);
}
}
+
+ // Recycle the extra floating buffers in order not to
stack up 2*initialCredit
+ // buffers once current backlog is 0
+ if (senderBacklog.get() == 0 && availableBuffers.size()
>= initialCredit) {
+ final int size = availableBuffers.size();
+ for (int i = 0; i < size; i++) {
+ final Buffer buffer =
availableBuffers.poll();
--- End diff --
I wonder if it would make things simpler if we had two separate lists of
buffers: one for exclusive buffers and one for floating buffers. Then you would
at least not have to iterate through the list. When retrieving buffers, you'd
always take the floating buffers first (so we don't get into this situation too
often). If that makes it simpler, we could also extract this logic into a
separate `AvailableBuffersQueue` class or so. Regarding testability, this may
also make sense.
---