reswqa commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1129022590
########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java: ########## @@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() { localBufferPool.lazyDestroy(); // All buffers have been requested, but can not be returned yet. - assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool()); + assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers); // Recycle should return buffers to memory segment pool for (Buffer buffer : requests) { buffer.recycleBuffer(); } } + @Test + void testDecreasePoolSize() throws Exception { + final int maxMemorySegments = 10; + final int requiredMemorySegments = 4; + final int maxOverdraftBuffers = 2; + final int largePoolSize = 5; + final int smallPoolSize = 4; + LocalBufferPool bufferPool = + new LocalBufferPool( + networkBufferPool, + requiredMemorySegments, + maxMemorySegments, + 0, + Integer.MAX_VALUE, + maxOverdraftBuffers); + Queue<MemorySegment> buffers = new LinkedList<>(); + + // set a larger pool size. + bufferPool.setNumBuffers(largePoolSize); + assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize); + + // request all buffer. + for (int i = 0; i < largePoolSize; i++) { + buffers.add(bufferPool.requestMemorySegmentBlocking()); + } + assertThat(bufferPool.isAvailable()).isFalse(); + + // request 1 overdraft buffers. + buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); + assertThat(bufferPool.isAvailable()).isFalse(); + + // set a small pool size. + bufferPool.setNumBuffers(smallPoolSize); + assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize); + assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero(); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); + assertThat(bufferPool.isAvailable()).isFalse(); + buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2); + assertThat(bufferPool.isAvailable()).isFalse(); + + // return all overdraft buffers. + bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); + assertThat(bufferPool.isAvailable()).isFalse(); + bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero(); + assertThat(bufferPool.isAvailable()).isFalse(); + + // return the excess buffer. + bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.isAvailable()).isFalse(); + // return non-excess buffers. + bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne(); + assertThat(bufferPool.isAvailable()).isTrue(); + + while (!buffers.isEmpty()) { + bufferPool.recycle(buffers.poll()); + } + bufferPool.lazyDestroy(); + } + + @Test + void testIncreasePoolSize() throws Exception { Review Comment: Sure, I added a test for `largePoolSize = 7`. As for `largePoolSize=8`, I think it is no different from the current situation of `largePoolSize = 10`. It also exceeds the total number of buffers but does not reach `maxMemorySegments`. But I changed the previous test to `largePoolSize = 8`, and added test for the case of total number of buffers reached `maxMemorySegments`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org