Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6238#discussion_r199813070 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -396,4 +398,48 @@ public void go() throws Exception { globalPool.destroy(); } } + + /** + * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted and + * remains in a defined state even if the waiting is interrupted. + */ + @Test + public void testRequestMemorySegmentsInterruptable2() throws Exception { + final int numBuffers = 10; + + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128); + MemorySegment segment = globalPool.requestMemorySegment(); + assertNotNull(segment); + + final OneShotLatch isRunning = new OneShotLatch(); + CheckedThread asyncRequest = new CheckedThread() { + @Override + public void go() throws Exception { + isRunning.trigger(); + globalPool.requestMemorySegments(10); + } + }; + asyncRequest.start(); + + // We want the destroy call inside the blocking part of the globalPool.requestMemorySegments() + // call above. We cannot guarantee this though but make it highly probable: + isRunning.await(); + Thread.sleep(10); + asyncRequest.interrupt(); + + globalPool.recycle(segment); + + try { + asyncRequest.sync(); + } catch (IOException e) { + assertThat(e, hasProperty("cause", instanceOf(InterruptedException.class))); + + // test indirectly for NetworkBufferPool#numTotalRequiredBuffers being correct: + // -> creating a new buffer pool should not fail + globalPool.createBufferPool(10, 10); + } finally { + globalPool.destroy(); + --- End diff -- Remove line break.
---