Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4559#discussion_r157548033
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
---
@@ -52,6 +54,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
+ /** The number of non-event buffers currently in this subpartition */
+ @GuardedBy("buffers")
+ private volatile int buffersInBacklog;
--- End diff --
I shortly thought about relying on `buffers.size()` here to reduce
complexity and code, but `ArrayDeque#size()` (for `getBuffersInBacklog()`) may
show some race conditions then without synchronisation. However, if we picked
up the idea again of returning the backlog size with the buffer itself (which
is retrieved under the lock), i.e. similar to `BufferAndAvailability` being
returned by the `SequenceNumberingViewReader`, this would work and we would not
need the `volatile` here. Since you split the implementations into
`PipelinedSubpartition` and `SpillableSubpartition` anyway, this would be a
viable approach again.
What do you think? What would you prefer?
---