[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16166323#comment-16166323 ]
ASF GitHub Bot commented on FLINK-7378: --------------------------------------- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138879343 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) { this.numTotalRequiredBuffers += numRequiredBuffers; - final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers); - for (int i = 0 ; i < numRequiredBuffers ; i++) { - segments.add(availableMemorySegments.poll()); - } + redistributeBuffers(); + } + final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers); + for (int i = 0 ; i < numRequiredBuffers ; i++) { try { - redistributeBuffers(); - } catch (IOException e) { - if (segments.size() > 0) { - recycleMemorySegments(segments); - } - + segments.add(availableMemorySegments.take()); --- End diff -- I know, I was the one who suggested it, but thinking about the blocking `take()` a bit more and with some more background I acquired over the last weeks, I'm getting the feeling, we should do the request similar to `LocalBufferPool#requestBuffer()` so that if (for some reason) we are waiting forever, we may at least be stopped by the `destroy()` function being called. Or what do you think? I'm thinking about something like this: ``` final ArrayList<MemorySegment> segments = new ArrayList<>(numRequiredBuffers); try { while (segments.size() < numRequiredBuffers) { if (isDestroyed) { throw new IllegalStateException("Buffer pool is destroyed."); } final MemorySegment segment = availableMemorySegments.poll(2, TimeUnit.SECONDS); if (segment != null) { segments.add(segment); } } } catch (Throwable e) { recycleMemorySegments(segments); ExceptionUtils.rethrowIOException(e); } ``` (using the same timeout of 2s as in `LocalBufferPool#requestBuffer()`) The following test (for `NetworkBufferPoolTest`) could verify this behaviour: ``` @Rule public ExpectedException expectedException = ExpectedException.none(); /** * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted in * case of a concurrent {@link NetworkBufferPool#destroy()} call. */ @Test public void testRequestMemorySegmentsInterruptable() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); MemorySegment segment = globalPool.requestMemorySegment(); assertNotNull(segment); final OneShotLatch isRunning = new OneShotLatch(); CheckedThread asyncRequest = new CheckedThread() { @Override public void go() throws Exception { isRunning.trigger(); globalPool.requestMemorySegments(10); } }; asyncRequest.start(); // We want the destroy call inside the blocking part of the globalPool.requestMemorySegments() // call above. We cannot guarantee this though but make it highly probable: isRunning.await(); Thread.sleep(10); globalPool.destroy(); segment.free(); expectedException.expect(IllegalStateException.class); expectedException.expectMessage("destroyed"); asyncRequest.sync(); } ``` > Create a fix size (non rebalancing) buffer pool type for the floating buffers > ----------------------------------------------------------------------------- > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core > Reporter: zhijiang > Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * <number of channels> + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}}s directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)