Github user zhijiangW commented on the issue:
https://github.com/apache/flink/pull/4559
Or we make the backlog as `AtomicInteger` to keep the current process,
otherwise we may need to call `decreaseStatistics` in different parts in the
region of `synchronized(buffers)` . What do you think?---
