Github user zhijiangW commented on a diff in the pull request:
https://github.com/apache/flink/pull/4559#discussion_r157691096
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
---
@@ -52,6 +54,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
+ /** The number of non-event buffers currently in this subpartition */
+ @GuardedBy("buffers")
+ private volatile int buffersInBacklog;
--- End diff --
The way of `ArrayDeque#size()` for `getBuffersInBacklog()` may be not
feasible because we do not know how many events in the `ArrayDeque` and they
should not be considered as backlog length.
For the new API, we may need to modify the
`ResultSubpartitionView#getNextBuffer` to return `BufferAndBacklog` wrapping
structure instead of `Buffer`, and do we also need to extend the
`BufferAndAvailability` to add backlog in it? By this way, it can get benefits
for `PipelinedSubpartition` to reduce 'volatile`, but for
`SpillableSubpartition`, the `volatile` may still be needed? Because the
`getNextBuffer` and `decreaseBacklog` are in different parts for
`SpillableSubpartitionView/SpilledSubpartitionView`.
---