Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r136018729
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
    @@ -131,6 +135,59 @@ public void recycle(MemorySegment segment) {
                availableMemorySegments.add(segment);
        }
     
    +   public List<MemorySegment> requestMemorySegments(int 
numRequiredBuffers) throws IOException {
    +           checkArgument(numRequiredBuffers > 0, "The number of required 
buffers should be larger than 0.");
    +
    +           synchronized (factoryLock) {
    +                   if (isDestroyed) {
    +                           throw new IllegalStateException("Network buffer 
pool has already been destroyed.");
    +                   }
    +
    +                   if (numTotalRequiredBuffers + numRequiredBuffers > 
totalNumberOfMemorySegments) {
    +                           throw new 
IOException(String.format("Insufficient number of network buffers: " +
    +                                                           "required %d, 
but only %d available. The total number of network " +
    +                                                           "buffers is 
currently set to %d of %d bytes each. You can increase this " +
    +                                                           "number by 
setting the configuration keys '%s', '%s', and '%s'.",
    +                                           numRequiredBuffers,
    +                                           totalNumberOfMemorySegments - 
numTotalRequiredBuffers,
    +                                           totalNumberOfMemorySegments,
    +                                           memorySegmentSize,
    +                                           
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
    +                                           
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
    +                                           
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
    +                   }
    +
    +                   this.numTotalRequiredBuffers += numRequiredBuffers;
    +
    +                   final List<MemorySegment> segments = new 
ArrayList<>(numRequiredBuffers);
    +                   for (int i = 0 ; i < numRequiredBuffers ; i++) {
    +                           segments.add(availableMemorySegments.poll());
    +                   }
    +
    +                   try {
    +                           redistributeBuffers();
    --- End diff --
    
    There are still some corner cases not handled properly by this 
implementation: consider the following unit test (please also add it to 
`NetworkBufferPoolTest` or `BufferPoolFactoryTest`):
    ```
        /**
         * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
         * currently not containing the number of required free segments 
(currently occupied by a buffer
         * pool).
         */
        @Test
        public void testRequestMemorySegmentsWithBuffersTaken() throws 
IOException, InterruptedException {
                final int numBuffers = 10;
    
                NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    
                final List<Buffer> buffers = new ArrayList<>(numBuffers);
                List<MemorySegment> memorySegments = Collections.emptyList();
                Thread bufferRecycler = null;
                BufferPool lbp1 = null;
                try {
                        lbp1 = networkBufferPool.createBufferPool(numBuffers / 
2, numBuffers);
    
                        // take all buffers (more than the minimum required)
                        for (int i = 0; i < numBuffers; ++i) {
                                Buffer buffer = lbp1.requestBuffer();
                                buffers.add(buffer);
                                assertNotNull(buffer);
                        }
    
                        // if requestMemorySegments() blocks, this will make 
sure that enough buffers are freed
                        // eventually for it to continue
                        bufferRecycler = new Thread(() -> {
                                try {
                                        Thread.sleep(100);
                                } catch (InterruptedException ignored) {
                                }
    
                                for (Buffer buffer : buffers) {
                                        buffer.recycle();
                                }
                        });
                        bufferRecycler.start();
    
                        // take more buffers than are freely available at the 
moment via requestMemorySegments()
                        memorySegments = 
networkBufferPool.requestMemorySegments(numBuffers / 2);
                        assertThat(memorySegments, not(hasItem(nullValue())));
                } finally {
                        if (bufferRecycler != null) {
                                bufferRecycler.join();
                        }
                        if (lbp1 != null) {
                                lbp1.lazyDestroy();
                        }
                        networkBufferPool.recycleMemorySegments(memorySegments);
                }
        }
    ```
    
    Either all code using this method or the resulting list handles `null` 
elements in the returned list (similar to `requestMemorySegment()` used by 
`LocalBufferPool#requestBuffer()`) or you must block until you can deliver all 
requested elements. I prefer the second option (a method Javadoc should be 
added to describe the implemented behaviour) in which case, however, you will 
have to call `redistributeBuffers()` before acquiring the segments and you also 
have to be careful with the `factoryLock` - I suppose the following may be 
correct:
    
    ```
        public List<MemorySegment> requestMemorySegments(int 
numRequiredBuffers) throws IOException {
                checkArgument(numRequiredBuffers > 0, "The number of required 
buffers should be larger than 0.");
    
                synchronized (factoryLock) {
                        // ...
    
                        this.numTotalRequiredBuffers += numRequiredBuffers;
    
                        redistributeBuffers();
                }
    
                final List<MemorySegment> segments = new 
ArrayList<>(numRequiredBuffers);
                for (int i = 0 ; i < numRequiredBuffers ; i++) {
                        try {
                                segments.add(availableMemorySegments.take());
                        } catch (InterruptedException e) {
                                recycleMemorySegments(segments);
                                ExceptionUtils.rethrowIOException(e);
                        }
                }
    
                return segments;
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to