[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147000#comment-16147000 ]
ASF GitHub Bot commented on FLINK-7378: --------------------------------------- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r136018729 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -131,6 +135,59 @@ public void recycle(MemorySegment segment) { availableMemorySegments.add(segment); } + public List<MemorySegment> requestMemorySegments(int numRequiredBuffers) throws IOException { + checkArgument(numRequiredBuffers > 0, "The number of required buffers should be larger than 0."); + + synchronized (factoryLock) { + if (isDestroyed) { + throw new IllegalStateException("Network buffer pool has already been destroyed."); + } + + if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) { + throw new IOException(String.format("Insufficient number of network buffers: " + + "required %d, but only %d available. The total number of network " + + "buffers is currently set to %d of %d bytes each. You can increase this " + + "number by setting the configuration keys '%s', '%s', and '%s'.", + numRequiredBuffers, + totalNumberOfMemorySegments - numTotalRequiredBuffers, + totalNumberOfMemorySegments, + memorySegmentSize, + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(), + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(), + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key())); + } + + this.numTotalRequiredBuffers += numRequiredBuffers; + + final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers); + for (int i = 0 ; i < numRequiredBuffers ; i++) { + segments.add(availableMemorySegments.poll()); + } + + try { + redistributeBuffers(); --- End diff -- There are still some corner cases not handled properly by this implementation: consider the following unit test (please also add it to `NetworkBufferPoolTest` or `BufferPoolFactoryTest`): ``` /** * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} * currently not containing the number of required free segments (currently occupied by a buffer * pool). */ @Test public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException { final int numBuffers = 10; NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); final List<Buffer> buffers = new ArrayList<>(numBuffers); List<MemorySegment> memorySegments = Collections.emptyList(); Thread bufferRecycler = null; BufferPool lbp1 = null; try { lbp1 = networkBufferPool.createBufferPool(numBuffers / 2, numBuffers); // take all buffers (more than the minimum required) for (int i = 0; i < numBuffers; ++i) { Buffer buffer = lbp1.requestBuffer(); buffers.add(buffer); assertNotNull(buffer); } // if requestMemorySegments() blocks, this will make sure that enough buffers are freed // eventually for it to continue bufferRecycler = new Thread(() -> { try { Thread.sleep(100); } catch (InterruptedException ignored) { } for (Buffer buffer : buffers) { buffer.recycle(); } }); bufferRecycler.start(); // take more buffers than are freely available at the moment via requestMemorySegments() memorySegments = networkBufferPool.requestMemorySegments(numBuffers / 2); assertThat(memorySegments, not(hasItem(nullValue()))); } finally { if (bufferRecycler != null) { bufferRecycler.join(); } if (lbp1 != null) { lbp1.lazyDestroy(); } networkBufferPool.recycleMemorySegments(memorySegments); } } ``` Either all code using this method or the resulting list handles `null` elements in the returned list (similar to `requestMemorySegment()` used by `LocalBufferPool#requestBuffer()`) or you must block until you can deliver all requested elements. I prefer the second option (a method Javadoc should be added to describe the implemented behaviour) in which case, however, you will have to call `redistributeBuffers()` before acquiring the segments and you also have to be careful with the `factoryLock` - I suppose the following may be correct: ``` public List<MemorySegment> requestMemorySegments(int numRequiredBuffers) throws IOException { checkArgument(numRequiredBuffers > 0, "The number of required buffers should be larger than 0."); synchronized (factoryLock) { // ... this.numTotalRequiredBuffers += numRequiredBuffers; redistributeBuffers(); } final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers); for (int i = 0 ; i < numRequiredBuffers ; i++) { try { segments.add(availableMemorySegments.take()); } catch (InterruptedException e) { recycleMemorySegments(segments); ExceptionUtils.rethrowIOException(e); } } return segments; } ``` > Create a fix size (non rebalancing) buffer pool type for the floating buffers > ----------------------------------------------------------------------------- > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core > Reporter: zhijiang > Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * <number of channels> + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}}s directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)