1996fanrui commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1165094621
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -255,9 +253,38 @@ void testRecycleAfterDestroy() {
void testDecreasePoolSize() throws Exception {
final int maxMemorySegments = 10;
final int requiredMemorySegments = 4;
- final int maxOverdraftBuffers = 2;
- final int largePoolSize = 5;
- final int smallPoolSize = 4;
+
+ // requested buffers is equal to small pool size.
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 7, 5, 2, 5, 0, 5,
0);
+ // requested buffers is less than small pool size.
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 6, 4, 2, 2, 0, 3,
1);
+ // exceed buffers is equal to maxOverdraftBuffers
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 7, 5, 2, 7, 2, 5,
0);
+ // exceed buffers is greater than maxOverdraftBuffers
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 9, 5, 3, 9, 4, 5,
0);
+ // exceed buffers is less than maxOverdraftBuffers
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 7, 5, 4, 7, 2, 5,
0);
+ // decrease pool size with overdraft buffer.
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 7, 5, 6, 9, 4, 5,
0);
Review Comment:
These 2 tests are testing the same case that `exceed buffers is less than
maxOverdraftBuffers`, right? Could the last one be removed?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -49,7 +49,11 @@
*
* <p>The size of this pool can be dynamically changed at runtime ({@link
#setNumBuffers(int)}. It
* will then lazily return the required number of buffers to the {@link
NetworkBufferPool} to match
- * its new size.
+ * its new size. New buffers can be requested only when {@code
numberOfRequestedMemorySegments +
+ * numberOfRequestedOverdraftMemorySegments < currentPoolSize +
maxOverdraftBuffersPerGate}. In
Review Comment:
Yes, we cannot return these buffers when task is using them .
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -255,9 +253,38 @@ void testRecycleAfterDestroy() {
void testDecreasePoolSize() throws Exception {
final int maxMemorySegments = 10;
final int requiredMemorySegments = 4;
- final int maxOverdraftBuffers = 2;
- final int largePoolSize = 5;
- final int smallPoolSize = 4;
+
+ // requested buffers is equal to small pool size.
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 7, 5, 2, 5, 0, 5,
0);
+ // requested buffers is less than small pool size.
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 6, 4, 2, 2, 0, 3,
1);
+ // exceed buffers is equal to maxOverdraftBuffers
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 7, 5, 2, 7, 2, 5,
0);
+ // exceed buffers is greater than maxOverdraftBuffers
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 9, 5, 3, 9, 4, 5,
0);
+ // exceed buffers is less than maxOverdraftBuffers
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 7, 5, 4, 7, 2, 5,
0);
+ // decrease pool size with overdraft buffer.
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 7, 5, 6, 9, 4, 5,
0);
+ }
+
+ void testDecreasePoolSizeInternal(
+ int maxMemorySegments,
+ int requiredMemorySegments,
+ int largePoolSize,
+ int smallPoolSize,
+ int maxOverdraftBuffers,
+ int numBuffersToRequest,
+ int numOverdraftBuffersAfterDecreasePoolSize,
+ int numRequestedBuffersAfterDecreasePoolSize,
Review Comment:
Field names may need to be renamed, they are not very clear. Especially, the
`numRequestedBuffersAfterDecreasePoolSize`, I thought it's the total number of
buffers requested(ordinary + overdraft) by the client from the LocalBufferPool.
How about rename them to `numRequestedOverdraftBuffersAfterDecreasing` and
`numRequestedOrdinaryBuffersAfterDecreasing`? The `poolSize` is removed due to
the method name has included the `DecreasePoolSize` and the filed name is too
long.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -766,13 +776,12 @@ private void returnExcessMemorySegments() {
@GuardedBy("availableMemorySegments")
private boolean hasExcessBuffers() {
- return numberOfRequestedOverdraftMemorySegments > 0
- || numberOfRequestedMemorySegments > currentPoolSize;
+ return numberOfRequestedOverdraftMemorySegments > 0;
Review Comment:
I have checked, and it's ok from my side. However, I prefer inviting more
experts to review this PR, it will be more reliable.
Hi @pnowojski @akalash , would you mind take a look this PR in your free
time? thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]