Github user zhijiangW commented on the issue:
https://github.com/apache/flink/pull/4559
@NicoK , thanks for suggestions.
I understand your point of wrapping the buffer and backlog together in a
new structure returned by `getNextBuffer()` and it really makes sense for
`PipelinedSubpartition`. But for `SpillableSubpartition`, when it begins to
write the buffer to disk, we can not get the total backlog from that. We can
only get the precise backlog by decreasing one for `getNextBuffer()` and
increasing one for `add(Buffer)` .
So I think we can put the `decreaseStatistics` under the lock which can
cover all the subpartitions.
---