Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4509#discussion_r143425927
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
---
@@ -434,6 +435,52 @@ public void testWaitForFloatingBuffersOnBuffer()
throws Exception {
verify(bufferPool, times(1)).requestBuffer();
}
+ /**
+ * Tests to verify that there is no race condition with two things
running in parallel:
+ * requesting floating buffers and some other thread recycling them.
+ */
+ @Test
+ public void testConcurrentRequestBufferAndNotifyBufferAvailable()
throws Exception {
+ // Setup
+ final ExecutorService executor =
Executors.newFixedThreadPool(1);
+ final Buffer buffer = TestBufferFactory.createBuffer();
+ final BufferPool bufferPool = mock(BufferPool.class);
+ when(bufferPool.requestBuffer()).thenReturn(null);
+
when(bufferPool.addBufferListener(any(BufferListener.class))).thenReturn(true);
+
+ final SingleInputGate inputGate = mock(SingleInputGate.class);
+ when(inputGate.getBufferPool()).thenReturn(bufferPool);
+ when(inputGate.getBufferProvider()).thenReturn(bufferPool);
+ try {
+ final RemoteInputChannel inputChannel =
createRemoteInputChannel(inputGate);
+ // Trigger to request one floating buffer on sender
backlog
+ inputChannel.onBuffer(buffer, 0, 1);
+
+ final CountDownLatch sync = new CountDownLatch(1);
+
+ // Submit task and wait to finish
+ Future<Void> result = executor.submit(new
Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+
inputChannel.notifyBufferAvailable(buffer);
+ sync.countDown();
+
+ return null;
+ }
+ });
--- End diff --
This will not run the code in parallel - `onBuffer()` is always executed
before `inputChannel.notifyBufferAvailable(buffer);`.
Among the tests I recently stumbled upon,
`BlobServerGetTest#testConcurrentGetOperations()` may be a good base to start a
new concurrency test with.
---