Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4509#discussion_r141906648
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
---
@@ -374,6 +376,64 @@ public void testReleaseExclusiveBuffers() throws
Exception {
verify(inputGate,
times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class));
}
+ /**
+ * Tests {@link BufferPool#requestBuffer()}, verifying the remote input
channel tries to request
+ * floating buffers once receiving the producer's backlog.
+ */
+ @Test
+ public void testRequestFloatingBuffersOnBuffer() throws Exception {
+ // Setup
+ final BufferPool bufferPool = mock(BufferPool.class);
+
when(bufferPool.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
+
+ final SingleInputGate inputGate = mock(SingleInputGate.class);
+ when(inputGate.getBufferPool()).thenReturn(bufferPool);
+
+ final RemoteInputChannel inputChannel =
createRemoteInputChannel(inputGate);
+
+ // Receive the producer's backlog
+ inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10);
+ // Need to request 10 floating buffers from buffer pool
+ verify(bufferPool, times(10)).requestBuffer();
+
+ inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 8);
+ // No need to request extra floating buffers from pool because
+ // there are already 10 available buffers in queue now
+ verify(bufferPool, times(10)).requestBuffer();
+
+ inputChannel.onBuffer(TestBufferFactory.createBuffer(), 2, 11);
+ // Need to request another floating buffer from pool
+ verify(bufferPool, times(11)).requestBuffer();
+ }
+
+ /**
+ * Tests {@link BufferPool#requestBuffer()}, verifying the remote input
channel tries to request
+ * floating buffers once receiving the producer's backlog. And it
requests from pool only once
+ * and registers as listener if there are currently no available
buffers in the pool.
+ */
+ @Test
+ public void testWaitForFloatingBuffersOnBuffer() throws Exception {
+ // Setup
+ final BufferPool bufferPool = mock(BufferPool.class);
+ when(bufferPool.requestBuffer()).thenReturn(null);
+
when(bufferPool.addBufferListener(any(BufferListener.class))).thenReturn(true);
--- End diff --
How about using the real `NetworkBufferPool#createBufferPool()` here with a
limited set of buffers? Then you could start retrieving buffers as in the test
method above and continue with verifying the expected behaviour in case the
buffer limit was reached (no need for two test methods, I guess). I'd prefer
this over a mock so that you can also verify the interaction with the real
methods such as `addBufferListener()`.
---