Github user NicoK commented on the issue:
https://github.com/apache/flink/pull/4559
I'd opt for decreasing under the lock as well, or (even better?) the
following alternative:
We remove the `buffersInBacklog` member and return the size of the current
backlog along the `getNextBuffer()` calls just like `BufferAndAvailability` is
returned by the `SequenceNumberingViewReader`. This `getNextBuffer()` call is
under the lock already and (except for `toString()` methods in
`PipelinedSubpartition` and `SpillableSubpartition`) the backlog is only
requested along the `getNextBuffer()` calls.
---