1996fanrui commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1128996203
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ########## @@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() { mayNotifyAvailable(toNotify); } + @GuardedBy("availableMemorySegments") private boolean shouldBeAvailable() { assert Thread.holdsLock(availableMemorySegments); - return !availableMemorySegments.isEmpty() - && unavailableSubpartitionsCount == 0 - && numberOfRequestedOverdraftMemorySegments == 0; + return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0; Review Comment: > Actually, before overdraft buffer was introduced, the definition of available was very clear: There is at least one availableMemorySegment and no subpartitions has reached maxBuffersPerChannel. IMO, Introducing the overdraft mechanism should not break this protocol, overdraft buffer should not affect the judgment of availability. Sorry, overdraft buffer should affect the judgment of availability. As we discussed before: overdraft buffer is just used when requested buffer reached the upper limit(pool size). In the other word: overdraft buffer should be used after LocalBufferPool is unavailable. And why the name is `overdraft`? It temporarily uses some extra buffers outside the LocalBufferPool. From the semantics of overdraft, if `numberOfRequestedOverdraftMemorySegments > 0`, then LocalBufferPool must be unavailable. That's why I add it here. Why you want to remove it? I guess it has bug before, that is, the overdraft buffer is used when the `requested buffer` does not reach the upper limit, and you have fixed it in this PR. ``` if (!availableMemorySegments.isEmpty()) { segment = availableMemorySegments.poll(); } else if (isRequestedSizeReached()) { // Only when the buffer request reaches the upper limit(i.e. current pool size), // requests an overdraft buffer. segment = requestOverdraftMemorySegmentFromGlobal(); } ``` > For me, the key point is that if we think "The availableMemorySegments is always empty when numberOfRequestedOverdraftMemorySegments != 0." is tenable. If it's tenable now and future, we can remove the ` && numberOfRequestedOverdraftMemorySegments == 0` here, if not, it cannot be removed. Based on your feedback, I prefer keep it, because the root cause is: overdraft is misused in some cases, and you have fixed it. If there is other bug that misuse overdraft buffer, we should fix the bug to ensure overdraft buffer is used correctly instead of mark LocalBufferPool is available. Marking the LocalBufferPool is available directly may cause other unexpected bugs. ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java: ########## @@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() { localBufferPool.lazyDestroy(); // All buffers have been requested, but can not be returned yet. - assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool()); + assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers); // Recycle should return buffers to memory segment pool for (Buffer buffer : requests) { buffer.recycleBuffer(); } } + @Test + void testDecreasePoolSize() throws Exception { + final int maxMemorySegments = 10; + final int requiredMemorySegments = 4; + final int maxOverdraftBuffers = 2; + final int largePoolSize = 5; + final int smallPoolSize = 4; + LocalBufferPool bufferPool = + new LocalBufferPool( + networkBufferPool, + requiredMemorySegments, + maxMemorySegments, + 0, + Integer.MAX_VALUE, + maxOverdraftBuffers); + Queue<MemorySegment> buffers = new LinkedList<>(); + + // set a larger pool size. + bufferPool.setNumBuffers(largePoolSize); + assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize); + + // request all buffer. + for (int i = 0; i < largePoolSize; i++) { + buffers.add(bufferPool.requestMemorySegmentBlocking()); + } + assertThat(bufferPool.isAvailable()).isFalse(); + + // request 1 overdraft buffers. + buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); + assertThat(bufferPool.isAvailable()).isFalse(); + + // set a small pool size. + bufferPool.setNumBuffers(smallPoolSize); + assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize); + assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero(); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); + assertThat(bufferPool.isAvailable()).isFalse(); + buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2); + assertThat(bufferPool.isAvailable()).isFalse(); + + // return all overdraft buffers. + bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); + assertThat(bufferPool.isAvailable()).isFalse(); + bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero(); + assertThat(bufferPool.isAvailable()).isFalse(); + + // return the excess buffer. + bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.isAvailable()).isFalse(); + // return non-excess buffers. + bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne(); + assertThat(bufferPool.isAvailable()).isTrue(); + + while (!buffers.isEmpty()) { + bufferPool.recycle(buffers.poll()); + } + bufferPool.lazyDestroy(); + } + + @Test + void testIncreasePoolSize() throws Exception { Review Comment: > We can test more cases for increasing pool size, especially some boundary conditions, such as largePoolSize = 7 and largePoolSize = 8. The 6 and 10 are not boundary values, and some corner bugs cannot be tested. Could you add `largePoolSize = 7 and largePoolSize = 8` in this test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org