[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147000#comment-16147000
 ] 

ASF GitHub Bot commented on FLINK-7378:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r136018729
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
    @@ -131,6 +135,59 @@ public void recycle(MemorySegment segment) {
                availableMemorySegments.add(segment);
        }
     
    +   public List<MemorySegment> requestMemorySegments(int 
numRequiredBuffers) throws IOException {
    +           checkArgument(numRequiredBuffers > 0, "The number of required 
buffers should be larger than 0.");
    +
    +           synchronized (factoryLock) {
    +                   if (isDestroyed) {
    +                           throw new IllegalStateException("Network buffer 
pool has already been destroyed.");
    +                   }
    +
    +                   if (numTotalRequiredBuffers + numRequiredBuffers > 
totalNumberOfMemorySegments) {
    +                           throw new 
IOException(String.format("Insufficient number of network buffers: " +
    +                                                           "required %d, 
but only %d available. The total number of network " +
    +                                                           "buffers is 
currently set to %d of %d bytes each. You can increase this " +
    +                                                           "number by 
setting the configuration keys '%s', '%s', and '%s'.",
    +                                           numRequiredBuffers,
    +                                           totalNumberOfMemorySegments - 
numTotalRequiredBuffers,
    +                                           totalNumberOfMemorySegments,
    +                                           memorySegmentSize,
    +                                           
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
    +                                           
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
    +                                           
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
    +                   }
    +
    +                   this.numTotalRequiredBuffers += numRequiredBuffers;
    +
    +                   final List<MemorySegment> segments = new 
ArrayList<>(numRequiredBuffers);
    +                   for (int i = 0 ; i < numRequiredBuffers ; i++) {
    +                           segments.add(availableMemorySegments.poll());
    +                   }
    +
    +                   try {
    +                           redistributeBuffers();
    --- End diff --
    
    There are still some corner cases not handled properly by this 
implementation: consider the following unit test (please also add it to 
`NetworkBufferPoolTest` or `BufferPoolFactoryTest`):
    ```
        /**
         * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
         * currently not containing the number of required free segments 
(currently occupied by a buffer
         * pool).
         */
        @Test
        public void testRequestMemorySegmentsWithBuffersTaken() throws 
IOException, InterruptedException {
                final int numBuffers = 10;
    
                NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    
                final List<Buffer> buffers = new ArrayList<>(numBuffers);
                List<MemorySegment> memorySegments = Collections.emptyList();
                Thread bufferRecycler = null;
                BufferPool lbp1 = null;
                try {
                        lbp1 = networkBufferPool.createBufferPool(numBuffers / 
2, numBuffers);
    
                        // take all buffers (more than the minimum required)
                        for (int i = 0; i < numBuffers; ++i) {
                                Buffer buffer = lbp1.requestBuffer();
                                buffers.add(buffer);
                                assertNotNull(buffer);
                        }
    
                        // if requestMemorySegments() blocks, this will make 
sure that enough buffers are freed
                        // eventually for it to continue
                        bufferRecycler = new Thread(() -> {
                                try {
                                        Thread.sleep(100);
                                } catch (InterruptedException ignored) {
                                }
    
                                for (Buffer buffer : buffers) {
                                        buffer.recycle();
                                }
                        });
                        bufferRecycler.start();
    
                        // take more buffers than are freely available at the 
moment via requestMemorySegments()
                        memorySegments = 
networkBufferPool.requestMemorySegments(numBuffers / 2);
                        assertThat(memorySegments, not(hasItem(nullValue())));
                } finally {
                        if (bufferRecycler != null) {
                                bufferRecycler.join();
                        }
                        if (lbp1 != null) {
                                lbp1.lazyDestroy();
                        }
                        networkBufferPool.recycleMemorySegments(memorySegments);
                }
        }
    ```
    
    Either all code using this method or the resulting list handles `null` 
elements in the returned list (similar to `requestMemorySegment()` used by 
`LocalBufferPool#requestBuffer()`) or you must block until you can deliver all 
requested elements. I prefer the second option (a method Javadoc should be 
added to describe the implemented behaviour) in which case, however, you will 
have to call `redistributeBuffers()` before acquiring the segments and you also 
have to be careful with the `factoryLock` - I suppose the following may be 
correct:
    
    ```
        public List<MemorySegment> requestMemorySegments(int 
numRequiredBuffers) throws IOException {
                checkArgument(numRequiredBuffers > 0, "The number of required 
buffers should be larger than 0.");
    
                synchronized (factoryLock) {
                        // ...
    
                        this.numTotalRequiredBuffers += numRequiredBuffers;
    
                        redistributeBuffers();
                }
    
                final List<MemorySegment> segments = new 
ArrayList<>(numRequiredBuffers);
                for (int i = 0 ; i < numRequiredBuffers ; i++) {
                        try {
                                segments.add(availableMemorySegments.take());
                        } catch (InterruptedException e) {
                                recycleMemorySegments(segments);
                                ExceptionUtils.rethrowIOException(e);
                        }
                }
    
                return segments;
        }
    ```


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-7378
>                 URL: https://issues.apache.org/jira/browse/FLINK-7378
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Core
>            Reporter: zhijiang
>            Assignee: zhijiang
>             Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a * <number of channels> + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to