Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r136018729 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -131,6 +135,59 @@ public void recycle(MemorySegment segment) { availableMemorySegments.add(segment); } + public List<MemorySegment> requestMemorySegments(int numRequiredBuffers) throws IOException { + checkArgument(numRequiredBuffers > 0, "The number of required buffers should be larger than 0."); + + synchronized (factoryLock) { + if (isDestroyed) { + throw new IllegalStateException("Network buffer pool has already been destroyed."); + } + + if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) { + throw new IOException(String.format("Insufficient number of network buffers: " + + "required %d, but only %d available. The total number of network " + + "buffers is currently set to %d of %d bytes each. You can increase this " + + "number by setting the configuration keys '%s', '%s', and '%s'.", + numRequiredBuffers, + totalNumberOfMemorySegments - numTotalRequiredBuffers, + totalNumberOfMemorySegments, + memorySegmentSize, + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(), + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(), + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key())); + } + + this.numTotalRequiredBuffers += numRequiredBuffers; + + final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers); + for (int i = 0 ; i < numRequiredBuffers ; i++) { + segments.add(availableMemorySegments.poll()); + } + + try { + redistributeBuffers(); --- End diff -- There are still some corner cases not handled properly by this implementation: consider the following unit test (please also add it to `NetworkBufferPoolTest` or `BufferPoolFactoryTest`): ``` /** * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} * currently not containing the number of required free segments (currently occupied by a buffer * pool). */ @Test public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException { final int numBuffers = 10; NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); final List<Buffer> buffers = new ArrayList<>(numBuffers); List<MemorySegment> memorySegments = Collections.emptyList(); Thread bufferRecycler = null; BufferPool lbp1 = null; try { lbp1 = networkBufferPool.createBufferPool(numBuffers / 2, numBuffers); // take all buffers (more than the minimum required) for (int i = 0; i < numBuffers; ++i) { Buffer buffer = lbp1.requestBuffer(); buffers.add(buffer); assertNotNull(buffer); } // if requestMemorySegments() blocks, this will make sure that enough buffers are freed // eventually for it to continue bufferRecycler = new Thread(() -> { try { Thread.sleep(100); } catch (InterruptedException ignored) { } for (Buffer buffer : buffers) { buffer.recycle(); } }); bufferRecycler.start(); // take more buffers than are freely available at the moment via requestMemorySegments() memorySegments = networkBufferPool.requestMemorySegments(numBuffers / 2); assertThat(memorySegments, not(hasItem(nullValue()))); } finally { if (bufferRecycler != null) { bufferRecycler.join(); } if (lbp1 != null) { lbp1.lazyDestroy(); } networkBufferPool.recycleMemorySegments(memorySegments); } } ``` Either all code using this method or the resulting list handles `null` elements in the returned list (similar to `requestMemorySegment()` used by `LocalBufferPool#requestBuffer()`) or you must block until you can deliver all requested elements. I prefer the second option (a method Javadoc should be added to describe the implemented behaviour) in which case, however, you will have to call `redistributeBuffers()` before acquiring the segments and you also have to be careful with the `factoryLock` - I suppose the following may be correct: ``` public List<MemorySegment> requestMemorySegments(int numRequiredBuffers) throws IOException { checkArgument(numRequiredBuffers > 0, "The number of required buffers should be larger than 0."); synchronized (factoryLock) { // ... this.numTotalRequiredBuffers += numRequiredBuffers; redistributeBuffers(); } final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers); for (int i = 0 ; i < numRequiredBuffers ; i++) { try { segments.add(availableMemorySegments.take()); } catch (InterruptedException e) { recycleMemorySegments(segments); ExceptionUtils.rethrowIOException(e); } } return segments; } ```
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---