Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/5558#discussion_r170028724
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
---
@@ -396,32 +399,49 @@ public void
testAvailableBuffersLessThanRequiredBuffers() throws Exception {
18, inputChannel.getNumberOfRequiredBuffers());
assertEquals("There should be 0 buffer available in
local pool",
0,
bufferPool.getNumberOfAvailableMemorySegments());
+ assertTrue(inputChannel.isWaitingForFloatingBuffers());
// Decrease the backlog
- inputChannel.onSenderBacklog(15);
+ inputChannel.onSenderBacklog(13);
// Only the number of required buffers is changed by
(backlog + numExclusiveBuffers)
verify(bufferPool, times(15)).requestBuffer();
verify(bufferPool,
times(1)).addBufferListener(inputChannel);
assertEquals("There should be 15 buffers available in
the channel",
15, inputChannel.getNumberOfAvailableBuffers());
- assertEquals("There should be 17 buffers required in
the channel",
- 17, inputChannel.getNumberOfRequiredBuffers());
+ assertEquals("There should be 15 buffers required in
the channel",
+ 15, inputChannel.getNumberOfRequiredBuffers());
assertEquals("There should be 0 buffer available in
local pool",
0,
bufferPool.getNumberOfAvailableMemorySegments());
+ assertTrue(inputChannel.isWaitingForFloatingBuffers());
- // Recycle one exclusive buffer
- exclusiveBuffer.recycleBuffer();
+ // Recycle one more floating buffer
+ floatingBufferQueue.poll().recycleBuffer();
- // The exclusive buffer is returned to the channel
directly
+ // Return the floating buffer to the buffer pool and
the channel is not waiting for more floating buffers
verify(bufferPool, times(15)).requestBuffer();
verify(bufferPool,
times(1)).addBufferListener(inputChannel);
+ assertEquals("There should be 15 buffers available in
the channel",
+ 15, inputChannel.getNumberOfAvailableBuffers());
+ assertEquals("There should be 15 buffers required in
the channel",
+ 15, inputChannel.getNumberOfRequiredBuffers());
+ assertEquals("There should be 1 buffers available in
local pool",
+ 1,
bufferPool.getNumberOfAvailableMemorySegments());
+ assertFalse(inputChannel.isWaitingForFloatingBuffers());
+
+ // Increase the backlog again
+ inputChannel.onSenderBacklog(15);
+
+ // The floating buffer is requested from the buffer
pool and the channel is registered as listener again.
+ verify(bufferPool, times(17)).requestBuffer();
+ verify(bufferPool,
times(2)).addBufferListener(inputChannel);
assertEquals("There should be 16 buffers available in
the channel",
16, inputChannel.getNumberOfAvailableBuffers());
assertEquals("There should be 17 buffers required in
the channel",
17, inputChannel.getNumberOfRequiredBuffers());
- assertEquals("There should be 0 buffers available in
local pool",
+ assertEquals("There should be 0 buffer available in
local pool",
--- End diff --
`buffers` is correct here
---