Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4509#discussion_r141908388
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
---
@@ -374,6 +376,64 @@ public void testReleaseExclusiveBuffers() throws
Exception {
verify(inputGate,
times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class));
}
+ /**
+ * Tests {@link BufferPool#requestBuffer()}, verifying the remote input
channel tries to request
+ * floating buffers once receiving the producer's backlog.
+ */
+ @Test
+ public void testRequestFloatingBuffersOnBuffer() throws Exception {
+ // Setup
+ final BufferPool bufferPool = mock(BufferPool.class);
+
when(bufferPool.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
+
+ final SingleInputGate inputGate = mock(SingleInputGate.class);
+ when(inputGate.getBufferPool()).thenReturn(bufferPool);
+
+ final RemoteInputChannel inputChannel =
createRemoteInputChannel(inputGate);
+
+ // Receive the producer's backlog
+ inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10);
+ // Need to request 10 floating buffers from buffer pool
+ verify(bufferPool, times(10)).requestBuffer();
+
+ inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 8);
+ // No need to request extra floating buffers from pool because
+ // there are already 10 available buffers in queue now
+ verify(bufferPool, times(10)).requestBuffer();
+
+ inputChannel.onBuffer(TestBufferFactory.createBuffer(), 2, 11);
+ // Need to request another floating buffer from pool
+ verify(bufferPool, times(11)).requestBuffer();
+ }
+
+ /**
+ * Tests {@link BufferPool#requestBuffer()}, verifying the remote input
channel tries to request
+ * floating buffers once receiving the producer's backlog. And it
requests from pool only once
+ * and registers as listener if there are currently no available
buffers in the pool.
+ */
+ @Test
+ public void testWaitForFloatingBuffersOnBuffer() throws Exception {
+ // Setup
+ final BufferPool bufferPool = mock(BufferPool.class);
+ when(bufferPool.requestBuffer()).thenReturn(null);
+
when(bufferPool.addBufferListener(any(BufferListener.class))).thenReturn(true);
+
+ final SingleInputGate inputGate = mock(SingleInputGate.class);
+ when(inputGate.getBufferPool()).thenReturn(bufferPool);
+ when(inputGate.getBufferProvider()).thenReturn(bufferPool);
+
+ final RemoteInputChannel inputChannel =
createRemoteInputChannel(inputGate);
+
+ // Receive the producer's backlog
+ inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10);
+ // Request from pool only once if there are no available
floating buffers
+ verify(bufferPool, times(1)).requestBuffer();
+
+ inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 10);
+ // Already registers as listener to wait for notifications and
will not request any more
+ verify(bufferPool, times(1)).requestBuffer();
+ }
+
--- End diff --
actually, some more tests would be nice:
- ensuring a fair distribution of buffers to `BufferListener`s
- to verify that there is no race condition with two things running in
parallel: `onSenderBacklog()` or `notifyBufferAvailable()` requesting buffers
and some other thread recycling them (floating and/or exclusive ones).
---