1996fanrui commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1128900085
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ########## @@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() { mayNotifyAvailable(toNotify); } + @GuardedBy("availableMemorySegments") private boolean shouldBeAvailable() { assert Thread.holdsLock(availableMemorySegments); - return !availableMemorySegments.isEmpty() - && unavailableSubpartitionsCount == 0 - && numberOfRequestedOverdraftMemorySegments == 0; + return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0; Review Comment: Why the `numberOfRequestedOverdraftMemorySegments == 0` can be removed? I think there is a constraint here: The `availableMemorySegments` is always empty when `numberOfRequestedOverdraftMemorySegments > 0`. Sometimes this constraint does not hold before this PR, and we want to hold the constraint in the future, right? If yes, could you add some comments for `numberOfRequestedOverdraftMemorySegments`? It's helpful for other developers to understand the constraint. ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java: ########## @@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() { localBufferPool.lazyDestroy(); // All buffers have been requested, but can not be returned yet. - assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool()); + assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers); // Recycle should return buffers to memory segment pool for (Buffer buffer : requests) { buffer.recycleBuffer(); } } + @Test + void testDecreasePoolSize() throws Exception { + final int maxMemorySegments = 10; + final int requiredMemorySegments = 4; + final int maxOverdraftBuffers = 2; + final int largePoolSize = 5; + final int smallPoolSize = 4; + LocalBufferPool bufferPool = + new LocalBufferPool( + networkBufferPool, + requiredMemorySegments, + maxMemorySegments, + 0, + Integer.MAX_VALUE, + maxOverdraftBuffers); + Queue<MemorySegment> buffers = new LinkedList<>(); + + // set a larger pool size. + bufferPool.setNumBuffers(largePoolSize); + assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize); + + // request all buffer. + for (int i = 0; i < largePoolSize; i++) { + buffers.add(bufferPool.requestMemorySegmentBlocking()); + } + assertThat(bufferPool.isAvailable()).isFalse(); + + // request 1 overdraft buffers. + buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); + assertThat(bufferPool.isAvailable()).isFalse(); + + // set a small pool size. + bufferPool.setNumBuffers(smallPoolSize); + assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize); + assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero(); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); + assertThat(bufferPool.isAvailable()).isFalse(); + buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2); + assertThat(bufferPool.isAvailable()).isFalse(); + + // return all overdraft buffers. + bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); + assertThat(bufferPool.isAvailable()).isFalse(); + bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero(); + assertThat(bufferPool.isAvailable()).isFalse(); + + // return the excess buffer. + bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.isAvailable()).isFalse(); + // return non-excess buffers. + bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne(); + assertThat(bufferPool.isAvailable()).isTrue(); + + while (!buffers.isEmpty()) { + bufferPool.recycle(buffers.poll()); + } + bufferPool.lazyDestroy(); + } + + @Test + void testIncreasePoolSize() throws Exception { Review Comment: I prefer change the `testIncreasePoolSize` to the `testIncreasePoolSizeExceedTotalBuffers`. And the `testIncreasePoolSizeExceedTotalBuffers` and `testIncreasePoolSizeNotExceedTotalBuffers` as the normal method instead of test method. And create a new `testIncreasePoolSize()` method as the test method, it calls `testIncreasePoolSizeExceedTotalBuffers` and `testIncreasePoolSizeNotExceedTotalBuffers`. We can define these parameters inside of the `testIncreasePoolSize()`, such as: `largePoolSize`, `smallPoolSize` and `maxOverdraftBuffers`. Why do I prefer this? We can test more cases for increasing pool size, especially some boundary conditions, such as `largePoolSize = 7` and `largePoolSize = 8`. The 6 and 10 are not boundary values, and some corner bugs cannot be tested. WDYT? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ########## @@ -669,6 +669,13 @@ public void setNumBuffers(int numBuffers) { currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments); + // reset overdraft buffers + while (numberOfRequestedOverdraftMemorySegments > 0 + && numberOfRequestedMemorySegments < currentPoolSize) { Review Comment: I see it can work now due to we return overdraft buffer first, right? However I think convert `numberOfRequestedMemorySegments` to `numberOfRequestedOverdraftMemorySegments ` is more clear, and the semantic of these fields are more explicit. I'm not sure which one is better. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org