[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174493#comment-16174493 ]
ASF GitHub Bot commented on FLINK-7378: --------------------------------------- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r140187056 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java --- @@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws IOException { globalPool.destroy(); } + /** + * Tests the interaction of requesting memory segments and creating local buffer pool and + * verifies the number of assigned buffers match after redistributing buffers because of newly + * requested memory segments or new buffer pools created. + */ + @Test + public void testUniformDistributionBounded4() throws IOException { + NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP); + + BufferPool first = globalPool.createBufferPool(0, 10); + assertEquals(10, first.getNumBuffers()); + + List<MemorySegment> segmentList1 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList1.size()); + assertEquals(8, first.getNumBuffers()); + + BufferPool second = globalPool.createBufferPool(0, 10); + assertEquals(4, first.getNumBuffers()); + assertEquals(4, second.getNumBuffers()); + + List<MemorySegment> segmentList2 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList2.size()); + assertEquals(3, first.getNumBuffers()); + assertEquals(3, second.getNumBuffers()); + + List<MemorySegment> segmentList3 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList3.size()); + assertEquals(2, first.getNumBuffers()); + assertEquals(2, second.getNumBuffers()); + + String msg = "Did not return all buffers to network buffer pool after test."; + assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments()); + + globalPool.recycleMemorySegments(segmentList1); + assertEquals(msg, 6, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(3, first.getNumBuffers()); + assertEquals(3, second.getNumBuffers()); + + globalPool.recycleMemorySegments(segmentList2); + assertEquals(msg, 8, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(4, first.getNumBuffers()); + assertEquals(4, second.getNumBuffers()); + + globalPool.recycleMemorySegments(segmentList3); + assertEquals(msg, 10, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(5, first.getNumBuffers()); + assertEquals(5, second.getNumBuffers()); + + globalPool.destroy(); --- End diff -- Oh yes, you're right. I guess we were lucky after all since (after quickly scanning over the class) it does not seem that any buffers are requested from the created pools. The `verifyAllBuffersReturned()` method verifies that by checking `networkBufferPool.getNumberOfAvailableMemorySegments()` (as did you) but in case of failures, the buffer pools are not cleaned up either. Maybe we should put the cleanup into the `finally` block of a surrounding try-catch as the following (and similar in your new test)? ``` @After public void verifyAllBuffersReturned() { String msg = "Did not return all buffers to network buffer pool after test."; try { assertEquals(msg, numBuffers, networkBufferPool.getNumberOfAvailableMemorySegments()); } finally { // in case buffers have actually been requested, we must release them again networkBufferPool.destroyAllBufferPools(); networkBufferPool.destroy(); } } ``` > Create a fix size (non rebalancing) buffer pool type for the floating buffers > ----------------------------------------------------------------------------- > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core > Reporter: zhijiang > Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * <number of channels> + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}} directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)