[
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16166323#comment-16166323
]
ASF GitHub Bot commented on FLINK-7378:
---------------------------------------
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4485#discussion_r138879343
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
---
@@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) {
this.numTotalRequiredBuffers += numRequiredBuffers;
- final List<MemorySegment> segments = new
ArrayList<>(numRequiredBuffers);
- for (int i = 0 ; i < numRequiredBuffers ; i++) {
- segments.add(availableMemorySegments.poll());
- }
+ redistributeBuffers();
+ }
+ final List<MemorySegment> segments = new
ArrayList<>(numRequiredBuffers);
+ for (int i = 0 ; i < numRequiredBuffers ; i++) {
try {
- redistributeBuffers();
- } catch (IOException e) {
- if (segments.size() > 0) {
- recycleMemorySegments(segments);
- }
-
+ segments.add(availableMemorySegments.take());
--- End diff --
I know, I was the one who suggested it, but thinking about the blocking
`take()` a bit more and with some more background I acquired over the last
weeks, I'm getting the feeling, we should do the request similar to
`LocalBufferPool#requestBuffer()` so that if (for some reason) we are waiting
forever, we may at least be stopped by the `destroy()` function being called.
Or what do you think? I'm thinking about something like this:
```
final ArrayList<MemorySegment> segments = new
ArrayList<>(numRequiredBuffers);
try {
while (segments.size() < numRequiredBuffers) {
if (isDestroyed) {
throw new IllegalStateException("Buffer
pool is destroyed.");
}
final MemorySegment segment =
availableMemorySegments.poll(2, TimeUnit.SECONDS);
if (segment != null) {
segments.add(segment);
}
}
} catch (Throwable e) {
recycleMemorySegments(segments);
ExceptionUtils.rethrowIOException(e);
}
```
(using the same timeout of 2s as in `LocalBufferPool#requestBuffer()`)
The following test (for `NetworkBufferPoolTest`) could verify this
behaviour:
```
@Rule
public ExpectedException expectedException = ExpectedException.none();
/**
* Tests {@link NetworkBufferPool#requestMemorySegments(int)},
verifying it may be aborted in
* case of a concurrent {@link NetworkBufferPool#destroy()} call.
*/
@Test
public void testRequestMemorySegmentsInterruptable() throws Exception {
final int numBuffers = 10;
NetworkBufferPool globalPool = new
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
MemorySegment segment = globalPool.requestMemorySegment();
assertNotNull(segment);
final OneShotLatch isRunning = new OneShotLatch();
CheckedThread asyncRequest = new CheckedThread() {
@Override
public void go() throws Exception {
isRunning.trigger();
globalPool.requestMemorySegments(10);
}
};
asyncRequest.start();
// We want the destroy call inside the blocking part of the
globalPool.requestMemorySegments()
// call above. We cannot guarantee this though but make it
highly probable:
isRunning.await();
Thread.sleep(10);
globalPool.destroy();
segment.free();
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("destroyed");
asyncRequest.sync();
}
```
> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -----------------------------------------------------------------------------
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
> Issue Type: Sub-task
> Components: Core
> Reporter: zhijiang
> Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for
> {{SingleInputGate}} is limited by {{a * <number of channels> + b}}, where a
> is the number of exclusive buffers for each channel and b is the number of
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}.
> And the exclusive buffers are assigned to {{InputChannel}}s directly.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)