Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4509#discussion_r141902956
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
---
@@ -70,6 +79,21 @@
*/
private int expectedSequenceNumber = 0;
+ /** The initial number of exclusive buffers assigned to this channel. */
+ private int initialCredit;
+
+ /** The current available buffers including both exclusive buffers and
requested floating buffers. */
+ private final ArrayDeque<Buffer> availableBuffers = new ArrayDeque<>();
+
+ /** The number of available buffers that have not been announced to the
producer yet. */
+ private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
+
+ /** The number of unsent buffers in the producer's sub partition. */
+ private final AtomicInteger senderBacklog = new AtomicInteger(0);
+
+ /** The tag indicates whether this channel is waiting for additional
floating buffers from the buffer pool. */
+ private final AtomicBoolean isWaitingForFloatingBuffers = new
AtomicBoolean(false);
--- End diff --
Now seeing this in action: do we really need a `AtomicBoolean`? Or is a
`volatile boolean` enough? All uses except for `notifyBufferDestroyed()` (where
only a safety-check uses the value) are actually under a `synchronized
(availableBuffers)` block...in this case, you may also annotate the variable as
`@GuardedBy("availableBuffers")` for documentation.
---