Github user zhijiangW commented on a diff in the pull request:
https://github.com/apache/flink/pull/6238#discussion_r199676237
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
---
@@ -147,7 +151,12 @@ public void recycle(MemorySegment segment) {
this.numTotalRequiredBuffers += numRequiredBuffers;
- redistributeBuffers();
+ try {
+ redistributeBuffers();
+ } catch (Throwable t) {
+ this.numTotalRequiredBuffers -=
numRequiredBuffers;
+ ExceptionUtils.rethrowIOException(t);
+ }
}
final List<MemorySegment> segments = new
ArrayList<>(numRequiredBuffers);
--- End diff --
The following `availableMemorySegments.poll(2, TimeUnit.SECONDS) ` may
cause `InterruptedException`, and in the catch part
`recycleMemorySegments(segments)` it will do `numTotalRequiredBuffers -=
segments.size();`.
I think we should do `recycleMemorySegments(numRequiredBuffers ,segments)`,
and then call `numTotalRequiredBuffers -= numRequiredBuffers;` inside it,
otherwise the `numTotalRequiredBuffers` is leaked.
---