1996fanrui commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128900085


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0;

Review Comment:
   Why the `numberOfRequestedOverdraftMemorySegments  == 0` can be removed? I 
think there is a constraint here: The `availableMemorySegments` is always empty 
when `numberOfRequestedOverdraftMemorySegments > 0`. 
   
   Sometimes this constraint does not hold before this PR, and we want to hold 
the constraint in the future, right?
   
   If yes, could you add some comments for 
`numberOfRequestedOverdraftMemorySegments`? It's helpful for other developers 
to understand the constraint.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() {
         localBufferPool.lazyDestroy();
 
         // All buffers have been requested, but can not be returned yet.
-        assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+        
assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
         // Recycle should return buffers to memory segment pool
         for (Buffer buffer : requests) {
             buffer.recycleBuffer();
         }
     }
 
+    @Test
+    void testDecreasePoolSize() throws Exception {
+        final int maxMemorySegments = 10;
+        final int requiredMemorySegments = 4;
+        final int maxOverdraftBuffers = 2;
+        final int largePoolSize = 5;
+        final int smallPoolSize = 4;
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(
+                        networkBufferPool,
+                        requiredMemorySegments,
+                        maxMemorySegments,
+                        0,
+                        Integer.MAX_VALUE,
+                        maxOverdraftBuffers);
+        Queue<MemorySegment> buffers = new LinkedList<>();
+
+        // set a larger pool size.
+        bufferPool.setNumBuffers(largePoolSize);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize);
+
+        // request all buffer.
+        for (int i = 0; i < largePoolSize; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 1 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set a small pool size.
+        bufferPool.setNumBuffers(smallPoolSize);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+        
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return all overdraft buffers.
+        bufferPool.recycle(buffers.poll());
+        
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        bufferPool.recycle(buffers.poll());
+        
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return the excess buffer.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.isAvailable()).isFalse();
+        // return non-excess buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isTrue();
+
+        while (!buffers.isEmpty()) {
+            bufferPool.recycle(buffers.poll());
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    @Test
+    void testIncreasePoolSize() throws Exception {

Review Comment:
   I prefer change the `testIncreasePoolSize` to the 
`testIncreasePoolSizeExceedTotalBuffers`. And the 
`testIncreasePoolSizeExceedTotalBuffers` and 
`testIncreasePoolSizeNotExceedTotalBuffers` as the normal method instead of 
test method. 
   
   And create a new `testIncreasePoolSize()` method as the test method, it 
calls `testIncreasePoolSizeExceedTotalBuffers` and 
`testIncreasePoolSizeNotExceedTotalBuffers`. We can define these parameters 
inside of the `testIncreasePoolSize()`, such as: `largePoolSize`, 
`smallPoolSize` and `maxOverdraftBuffers`.
   
   Why do I prefer this? We can test more cases for increasing pool size, 
especially some boundary conditions, such as `largePoolSize = 7` and 
`largePoolSize = 8`.  The 6 and 10 are not boundary values, and some corner 
bugs cannot be tested. 
   
   WDYT?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -669,6 +669,13 @@ public void setNumBuffers(int numBuffers) {
 
             currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);
 
+            // reset overdraft buffers
+            while (numberOfRequestedOverdraftMemorySegments > 0
+                    && numberOfRequestedMemorySegments < currentPoolSize) {

Review Comment:
   I see it can work now due to we return overdraft buffer first, right?
   
   However I think convert `numberOfRequestedMemorySegments` to 
`numberOfRequestedOverdraftMemorySegments ` is more clear, and the semantic of 
these fields are more explicit.
   
   I'm not sure which one is better. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to