[ 
https://issues.apache.org/jira/browse/FLINK-8747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373092#comment-16373092
 ] 

ASF GitHub Bot commented on FLINK-8747:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5558#discussion_r170028724
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
    @@ -396,32 +399,49 @@ public void 
testAvailableBuffersLessThanRequiredBuffers() throws Exception {
                                18, inputChannel.getNumberOfRequiredBuffers());
                        assertEquals("There should be 0 buffer available in 
local pool",
                                0, 
bufferPool.getNumberOfAvailableMemorySegments());
    +                   assertTrue(inputChannel.isWaitingForFloatingBuffers());
     
                        // Decrease the backlog
    -                   inputChannel.onSenderBacklog(15);
    +                   inputChannel.onSenderBacklog(13);
     
                        // Only the number of required buffers is changed by 
(backlog + numExclusiveBuffers)
                        verify(bufferPool, times(15)).requestBuffer();
                        verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
                        assertEquals("There should be 15 buffers available in 
the channel",
                                15, inputChannel.getNumberOfAvailableBuffers());
    -                   assertEquals("There should be 17 buffers required in 
the channel",
    -                           17, inputChannel.getNumberOfRequiredBuffers());
    +                   assertEquals("There should be 15 buffers required in 
the channel",
    +                           15, inputChannel.getNumberOfRequiredBuffers());
                        assertEquals("There should be 0 buffer available in 
local pool",
                                0, 
bufferPool.getNumberOfAvailableMemorySegments());
    +                   assertTrue(inputChannel.isWaitingForFloatingBuffers());
     
    -                   // Recycle one exclusive buffer
    -                   exclusiveBuffer.recycleBuffer();
    +                   // Recycle one more floating buffer
    +                   floatingBufferQueue.poll().recycleBuffer();
     
    -                   // The exclusive buffer is returned to the channel 
directly
    +                   // Return the floating buffer to the buffer pool and 
the channel is not waiting for more floating buffers
                        verify(bufferPool, times(15)).requestBuffer();
                        verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
    +                   assertEquals("There should be 15 buffers available in 
the channel",
    +                           15, inputChannel.getNumberOfAvailableBuffers());
    +                   assertEquals("There should be 15 buffers required in 
the channel",
    +                           15, inputChannel.getNumberOfRequiredBuffers());
    +                   assertEquals("There should be 1 buffers available in 
local pool",
    +                           1, 
bufferPool.getNumberOfAvailableMemorySegments());
    +                   assertFalse(inputChannel.isWaitingForFloatingBuffers());
    +
    +                   // Increase the backlog again
    +                   inputChannel.onSenderBacklog(15);
    +
    +                   // The floating buffer is requested from the buffer 
pool and the channel is registered as listener again.
    +                   verify(bufferPool, times(17)).requestBuffer();
    +                   verify(bufferPool, 
times(2)).addBufferListener(inputChannel);
                        assertEquals("There should be 16 buffers available in 
the channel",
                                16, inputChannel.getNumberOfAvailableBuffers());
                        assertEquals("There should be 17 buffers required in 
the channel",
                                17, inputChannel.getNumberOfRequiredBuffers());
    -                   assertEquals("There should be 0 buffers available in 
local pool",
    +                   assertEquals("There should be 0 buffer available in 
local pool",
    --- End diff --
    
    `buffers` is correct here


> The tag of waiting for floating buffers in RemoteInputChannel should be 
> updated properly
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-8747
>                 URL: https://issues.apache.org/jira/browse/FLINK-8747
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>    Affects Versions: 1.5.0
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Minor
>
> In credit-based flow control mode, when the number of available buffers is 
> less than required buffers (backlog + initialCredit), the 
> {{RemoteInputChannel}} will request floating buffers from {{BufferProvider}}. 
> If not get enough available floating buffers, the {{RemoteInputChannel}} 
> registers itself as listener in {{BufferProvider}} and updates the tag 
> {{isWaitingForFloatingBuffers}} as {{true}} to avoid registration repeatedly.
>  
> When a floating buffer is recycled to {{BufferProvider}}, it will notify the 
> listener of available buffer. But the listener may not need floating buffers 
> currently if the available buffers is not less than required buffers, then 
> the floating buffers will be returned to {{BufferProvider}} directly. Most 
> importantly, the tag {{isWaitingForFloatingBuffers}} should also be updated 
> as {{false, otherwise the RemoteInputChannel}} will not request floating 
> buffers any more after the available buffers less than required buffers.
>  
> There are two scenarios for causing the above issue:
>  * The recycled exclusive buffers increase the total available buffers which 
> is equal to or more than required buffers.
>  * The decreased sender's backlog resulting the available buffers equal to 
> required buffers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to