Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4485#discussion_r138879343
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
---
@@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) {
this.numTotalRequiredBuffers += numRequiredBuffers;
- final List<MemorySegment> segments = new
ArrayList<>(numRequiredBuffers);
- for (int i = 0 ; i < numRequiredBuffers ; i++) {
- segments.add(availableMemorySegments.poll());
- }
+ redistributeBuffers();
+ }
+ final List<MemorySegment> segments = new
ArrayList<>(numRequiredBuffers);
+ for (int i = 0 ; i < numRequiredBuffers ; i++) {
try {
- redistributeBuffers();
- } catch (IOException e) {
- if (segments.size() > 0) {
- recycleMemorySegments(segments);
- }
-
+ segments.add(availableMemorySegments.take());
--- End diff --
I know, I was the one who suggested it, but thinking about the blocking
`take()` a bit more and with some more background I acquired over the last
weeks, I'm getting the feeling, we should do the request similar to
`LocalBufferPool#requestBuffer()` so that if (for some reason) we are waiting
forever, we may at least be stopped by the `destroy()` function being called.
Or what do you think? I'm thinking about something like this:
```
final ArrayList<MemorySegment> segments = new
ArrayList<>(numRequiredBuffers);
try {
while (segments.size() < numRequiredBuffers) {
if (isDestroyed) {
throw new IllegalStateException("Buffer
pool is destroyed.");
}
final MemorySegment segment =
availableMemorySegments.poll(2, TimeUnit.SECONDS);
if (segment != null) {
segments.add(segment);
}
}
} catch (Throwable e) {
recycleMemorySegments(segments);
ExceptionUtils.rethrowIOException(e);
}
```
(using the same timeout of 2s as in `LocalBufferPool#requestBuffer()`)
The following test (for `NetworkBufferPoolTest`) could verify this
behaviour:
```
@Rule
public ExpectedException expectedException = ExpectedException.none();
/**
* Tests {@link NetworkBufferPool#requestMemorySegments(int)},
verifying it may be aborted in
* case of a concurrent {@link NetworkBufferPool#destroy()} call.
*/
@Test
public void testRequestMemorySegmentsInterruptable() throws Exception {
final int numBuffers = 10;
NetworkBufferPool globalPool = new
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
MemorySegment segment = globalPool.requestMemorySegment();
assertNotNull(segment);
final OneShotLatch isRunning = new OneShotLatch();
CheckedThread asyncRequest = new CheckedThread() {
@Override
public void go() throws Exception {
isRunning.trigger();
globalPool.requestMemorySegments(10);
}
};
asyncRequest.start();
// We want the destroy call inside the blocking part of the
globalPool.requestMemorySegments()
// call above. We cannot guarantee this though but make it
highly probable:
isRunning.await();
Thread.sleep(10);
globalPool.destroy();
segment.free();
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("destroyed");
asyncRequest.sync();
}
```
---