Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6238#discussion_r199813070
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
---
@@ -396,4 +398,48 @@ public void go() throws Exception {
globalPool.destroy();
}
}
+
+ /**
+ * Tests {@link NetworkBufferPool#requestMemorySegments(int)},
verifying it may be aborted and
+ * remains in a defined state even if the waiting is interrupted.
+ */
+ @Test
+ public void testRequestMemorySegmentsInterruptable2() throws Exception {
+ final int numBuffers = 10;
+
+ NetworkBufferPool globalPool = new
NetworkBufferPool(numBuffers, 128);
+ MemorySegment segment = globalPool.requestMemorySegment();
+ assertNotNull(segment);
+
+ final OneShotLatch isRunning = new OneShotLatch();
+ CheckedThread asyncRequest = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ isRunning.trigger();
+ globalPool.requestMemorySegments(10);
+ }
+ };
+ asyncRequest.start();
+
+ // We want the destroy call inside the blocking part of the
globalPool.requestMemorySegments()
+ // call above. We cannot guarantee this though but make it
highly probable:
+ isRunning.await();
+ Thread.sleep(10);
+ asyncRequest.interrupt();
+
+ globalPool.recycle(segment);
+
+ try {
+ asyncRequest.sync();
+ } catch (IOException e) {
+ assertThat(e, hasProperty("cause",
instanceOf(InterruptedException.class)));
+
+ // test indirectly for
NetworkBufferPool#numTotalRequiredBuffers being correct:
+ // -> creating a new buffer pool should not fail
+ globalPool.createBufferPool(10, 10);
+ } finally {
+ globalPool.destroy();
+
--- End diff --
Remove line break.
---