Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/4509#discussion_r141945644
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
---
@@ -131,6 +136,63 @@ public void recycle(MemorySegment segment) {
availableMemorySegments.add(segment);
}
+ public List<MemorySegment> requestMemorySegments(int
numRequiredBuffers) throws IOException {
+ checkArgument(numRequiredBuffers > 0, "The number of required
buffers should be larger than 0.");
+
+ synchronized (factoryLock) {
+ if (isDestroyed) {
+ throw new IllegalStateException("Network buffer
pool has already been destroyed.");
+ }
+
+ if (numTotalRequiredBuffers + numRequiredBuffers >
totalNumberOfMemorySegments) {
+ throw new
IOException(String.format("Insufficient number of network buffers: " +
--- End diff --
How about adding a `FlinkNetworkException` or `FlinkNetworkIOException` for
all network/netty related exceptions? That can help users better identify root
causes of problems.
---