[ https://issues.apache.org/jira/browse/FLINK-8747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373811#comment-16373811 ]
ASF GitHub Bot commented on FLINK-8747: --------------------------------------- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5558#discussion_r170148337 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -396,32 +399,49 @@ public void testAvailableBuffersLessThanRequiredBuffers() throws Exception { 18, inputChannel.getNumberOfRequiredBuffers()); assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + assertTrue(inputChannel.isWaitingForFloatingBuffers()); // Decrease the backlog - inputChannel.onSenderBacklog(15); + inputChannel.onSenderBacklog(13); // Only the number of required buffers is changed by (backlog + numExclusiveBuffers) verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); assertEquals("There should be 15 buffers available in the channel", 15, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 17 buffers required in the channel", - 17, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 15 buffers required in the channel", + 15, inputChannel.getNumberOfRequiredBuffers()); assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + assertTrue(inputChannel.isWaitingForFloatingBuffers()); - // Recycle one exclusive buffer - exclusiveBuffer.recycleBuffer(); + // Recycle one more floating buffer + floatingBufferQueue.poll().recycleBuffer(); - // The exclusive buffer is returned to the channel directly + // Return the floating buffer to the buffer pool and the channel is not waiting for more floating buffers verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); + assertEquals("There should be 15 buffers available in the channel", + 15, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 15 buffers required in the channel", + 15, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 1 buffers available in local pool", + 1, bufferPool.getNumberOfAvailableMemorySegments()); + assertFalse(inputChannel.isWaitingForFloatingBuffers()); + + // Increase the backlog again + inputChannel.onSenderBacklog(15); + + // The floating buffer is requested from the buffer pool and the channel is registered as listener again. + verify(bufferPool, times(17)).requestBuffer(); + verify(bufferPool, times(2)).addBufferListener(inputChannel); assertEquals("There should be 16 buffers available in the channel", 16, inputChannel.getNumberOfAvailableBuffers()); assertEquals("There should be 17 buffers required in the channel", 17, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffers available in local pool", + assertEquals("There should be 0 buffer available in local pool", --- End diff -- I also fixed other points with `buffers`. > The tag of waiting for floating buffers in RemoteInputChannel should be > updated properly > ---------------------------------------------------------------------------------------- > > Key: FLINK-8747 > URL: https://issues.apache.org/jira/browse/FLINK-8747 > Project: Flink > Issue Type: Sub-task > Components: Network > Affects Versions: 1.5.0 > Reporter: zhijiang > Assignee: zhijiang > Priority: Minor > > In credit-based flow control mode, when the number of available buffers is > less than required buffers (backlog + initialCredit), the > {{RemoteInputChannel}} will request floating buffers from {{BufferProvider}}. > If not get enough available floating buffers, the {{RemoteInputChannel}} > registers itself as listener in {{BufferProvider}} and updates the tag > {{isWaitingForFloatingBuffers}} as {{true}} to avoid registration repeatedly. > > When a floating buffer is recycled to {{BufferProvider}}, it will notify the > listener of available buffer. But the listener may not need floating buffers > currently if the available buffers is not less than required buffers, then > the floating buffers will be returned to {{BufferProvider}} directly. Most > importantly, the tag {{isWaitingForFloatingBuffers}} should also be updated > as {{false, otherwise the RemoteInputChannel}} will not request floating > buffers any more after the available buffers less than required buffers. > > There are two scenarios for causing the above issue: > * The recycled exclusive buffers increase the total available buffers which > is equal to or more than required buffers. > * The decreased sender's backlog resulting the available buffers equal to > required buffers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)