Github user zhijiangW commented on a diff in the pull request:
https://github.com/apache/flink/pull/4485#discussion_r139076168
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
---
@@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws
IOException {
globalPool.destroy();
}
+ /**
+ * Tests the interaction of requesting memory segments and creating
local buffer pool and
+ * verifies the number of assigned buffers match after redistributing
buffers because of newly
+ * requested memory segments or new buffer pools created.
+ */
+ @Test
+ public void testUniformDistributionBounded4() throws IOException {
+ NetworkBufferPool globalPool = new NetworkBufferPool(10, 128,
MemoryType.HEAP);
+
+ BufferPool first = globalPool.createBufferPool(0, 10);
+ assertEquals(10, first.getNumBuffers());
+
+ List<MemorySegment> segmentList1 =
globalPool.requestMemorySegments(2);
+ assertEquals(2, segmentList1.size());
+ assertEquals(8, first.getNumBuffers());
+
+ BufferPool second = globalPool.createBufferPool(0, 10);
+ assertEquals(4, first.getNumBuffers());
+ assertEquals(4, second.getNumBuffers());
+
+ List<MemorySegment> segmentList2 =
globalPool.requestMemorySegments(2);
+ assertEquals(2, segmentList2.size());
+ assertEquals(3, first.getNumBuffers());
+ assertEquals(3, second.getNumBuffers());
+
+ List<MemorySegment> segmentList3 =
globalPool.requestMemorySegments(2);
+ assertEquals(2, segmentList3.size());
+ assertEquals(2, first.getNumBuffers());
+ assertEquals(2, second.getNumBuffers());
+
+ String msg = "Did not return all buffers to network buffer pool
after test.";
+ assertEquals(msg, 4,
globalPool.getNumberOfAvailableMemorySegments());
+
+ globalPool.recycleMemorySegments(segmentList1);
+ assertEquals(msg, 6,
globalPool.getNumberOfAvailableMemorySegments());
+ assertEquals(3, first.getNumBuffers());
+ assertEquals(3, second.getNumBuffers());
+
+ globalPool.recycleMemorySegments(segmentList2);
+ assertEquals(msg, 8,
globalPool.getNumberOfAvailableMemorySegments());
+ assertEquals(4, first.getNumBuffers());
+ assertEquals(4, second.getNumBuffers());
+
+ globalPool.recycleMemorySegments(segmentList3);
+ assertEquals(msg, 10,
globalPool.getNumberOfAvailableMemorySegments());
+ assertEquals(5, first.getNumBuffers());
+ assertEquals(5, second.getNumBuffers());
+
+ globalPool.destroy();
--- End diff --
The other tests in `BufferPoolFactoryTest` also has this issue. I will add
`destroyAllBufferPools()` in this new test.
---