1996fanrui commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128996203


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
         mayNotifyAvailable(toNotify);
     }
 
+    @GuardedBy("availableMemorySegments")
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty()
-                && unavailableSubpartitionsCount == 0
-                && numberOfRequestedOverdraftMemorySegments == 0;
+        return !availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0;

Review Comment:
   > Actually, before overdraft buffer was introduced, the definition of 
available was very clear:
   There is at least one availableMemorySegment and no subpartitions has 
reached maxBuffersPerChannel.
   IMO, Introducing the overdraft mechanism should not break this protocol, 
overdraft buffer should not affect the judgment of availability.
   
   Sorry, overdraft buffer should affect the judgment of availability.
   
   As we discussed before: overdraft buffer is just used when requested buffer 
reached the upper limit(pool size). In the other word: overdraft buffer should 
be used after LocalBufferPool is unavailable. 
   
   And why the name is `overdraft`? It temporarily uses some extra buffers 
outside the LocalBufferPool. From the semantics of overdraft, if 
`numberOfRequestedOverdraftMemorySegments > 0`, then LocalBufferPool must be 
unavailable. That's why I add it here.
   
   Why you want to remove it? I guess it has bug before, that is, the overdraft 
buffer is used when the `requested buffer` does not reach the upper limit, and 
you have fixed it in this PR.
   
   ```
               if (!availableMemorySegments.isEmpty()) {
                   segment = availableMemorySegments.poll();
               } else if (isRequestedSizeReached()) {
                   // Only when the buffer request reaches the upper limit(i.e. 
current pool size),
                   // requests an overdraft buffer.
                   segment = requestOverdraftMemorySegmentFromGlobal();
               }
   ```
   
   > For me, the key point is that if we think "The availableMemorySegments is 
always empty when numberOfRequestedOverdraftMemorySegments != 0." is tenable.
   
   If it's tenable now and future, we can remove the ` && 
numberOfRequestedOverdraftMemorySegments == 0` here, if not, it cannot be 
removed.
   
   Based on your feedback, I prefer keep it, because the root cause is: 
overdraft is misused in some cases, and you have fixed it.
   
   If there is other bug that misuse overdraft buffer, we should fix the bug to 
ensure overdraft buffer is used correctly instead of mark LocalBufferPool is 
available. Marking the LocalBufferPool is available directly may cause other 
unexpected bugs.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() {
         localBufferPool.lazyDestroy();
 
         // All buffers have been requested, but can not be returned yet.
-        assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+        
assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
         // Recycle should return buffers to memory segment pool
         for (Buffer buffer : requests) {
             buffer.recycleBuffer();
         }
     }
 
+    @Test
+    void testDecreasePoolSize() throws Exception {
+        final int maxMemorySegments = 10;
+        final int requiredMemorySegments = 4;
+        final int maxOverdraftBuffers = 2;
+        final int largePoolSize = 5;
+        final int smallPoolSize = 4;
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(
+                        networkBufferPool,
+                        requiredMemorySegments,
+                        maxMemorySegments,
+                        0,
+                        Integer.MAX_VALUE,
+                        maxOverdraftBuffers);
+        Queue<MemorySegment> buffers = new LinkedList<>();
+
+        // set a larger pool size.
+        bufferPool.setNumBuffers(largePoolSize);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize);
+
+        // request all buffer.
+        for (int i = 0; i < largePoolSize; i++) {
+            buffers.add(bufferPool.requestMemorySegmentBlocking());
+        }
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // request 1 overdraft buffers.
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // set a small pool size.
+        bufferPool.setNumBuffers(smallPoolSize);
+        assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+        
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        buffers.add(bufferPool.requestMemorySegmentBlocking());
+        
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return all overdraft buffers.
+        bufferPool.recycle(buffers.poll());
+        
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isFalse();
+        bufferPool.recycle(buffers.poll());
+        
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+        assertThat(bufferPool.isAvailable()).isFalse();
+
+        // return the excess buffer.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.isAvailable()).isFalse();
+        // return non-excess buffers.
+        bufferPool.recycle(buffers.poll());
+        assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+        assertThat(bufferPool.isAvailable()).isTrue();
+
+        while (!buffers.isEmpty()) {
+            bufferPool.recycle(buffers.poll());
+        }
+        bufferPool.lazyDestroy();
+    }
+
+    @Test
+    void testIncreasePoolSize() throws Exception {

Review Comment:
   > We can test more cases for increasing pool size, especially some boundary 
conditions, such as largePoolSize = 7 and largePoolSize = 8. The 6 and 10 are 
not boundary values, and some corner bugs cannot be tested.
   
   Could you add `largePoolSize = 7 and largePoolSize = 8` in this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to