This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new be0f9293c2c [FLINK-31763][runtime] Convert requested buffers to
overdraft buffers when pool size is decreased
be0f9293c2c is described below
commit be0f9293c2ce00465154ef03f7cef29dd3116b8e
Author: Weijie Guo <[email protected]>
AuthorDate: Tue Apr 11 23:33:57 2023 +0800
[FLINK-31763][runtime] Convert requested buffers to overdraft buffers when
pool size is decreased
---
.../runtime/io/network/buffer/LocalBufferPool.java | 14 +++-
.../io/network/buffer/LocalBufferPoolTest.java | 85 ++++++++++++----------
2 files changed, 59 insertions(+), 40 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 87b8a11e2aa..6506ab9f942 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -49,7 +49,11 @@ import static
org.apache.flink.util.concurrent.FutureUtils.assertNoException;
*
* <p>The size of this pool can be dynamically changed at runtime ({@link
#setNumBuffers(int)}. It
* will then lazily return the required number of buffers to the {@link
NetworkBufferPool} to match
- * its new size.
+ * its new size. New buffers can be requested only when {@code
numberOfRequestedMemorySegments +
+ * numberOfRequestedOverdraftMemorySegments < currentPoolSize +
maxOverdraftBuffersPerGate}. In
+ * order to meet this requirement, when the size of this pool changes,
+ * numberOfRequestedMemorySegments and
numberOfRequestedOverdraftMemorySegments can be converted to
+ * each other.
*
* <p>Availability is defined as returning a non-overdraft segment on a
subsequent {@link
* #requestBuffer()}/ {@link #requestBufferBuilder()} and heaving a
non-blocking {@link
@@ -671,13 +675,19 @@ class LocalBufferPool implements BufferPool {
currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);
- // reset overdraft buffers
+ // If pool size increases, try to convert overdraft buffer to
ordinary buffer.
while (numberOfRequestedOverdraftMemorySegments > 0
&& numberOfRequestedMemorySegments < currentPoolSize) {
numberOfRequestedOverdraftMemorySegments--;
numberOfRequestedMemorySegments++;
}
+ // If pool size decreases, try to convert ordinary buffer to
overdraft buffer.
+ while (numberOfRequestedMemorySegments > currentPoolSize) {
+ numberOfRequestedMemorySegments--;
+ numberOfRequestedOverdraftMemorySegments++;
+ }
+
returnExcessMemorySegments();
if (isDestroyed) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 956d55e5948..6c0fcf13b47 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -38,10 +38,8 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -255,9 +253,38 @@ class LocalBufferPoolTest {
void testDecreasePoolSize() throws Exception {
final int maxMemorySegments = 10;
final int requiredMemorySegments = 4;
- final int maxOverdraftBuffers = 2;
- final int largePoolSize = 5;
- final int smallPoolSize = 4;
+
+ // requested buffers is equal to small pool size.
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 7, 5, 2, 5, 0, 5,
0);
+ // requested buffers is less than small pool size.
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 6, 4, 2, 2, 0, 3,
1);
+ // exceed buffers is equal to maxOverdraftBuffers
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 7, 5, 2, 7, 2, 5,
0);
+ // exceed buffers is greater than maxOverdraftBuffers
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 9, 5, 3, 9, 4, 5,
0);
+ // exceed buffers is less than maxOverdraftBuffers
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 7, 5, 4, 7, 2, 5,
0);
+ // decrease pool size with overdraft buffer.
+ testDecreasePoolSizeInternal(
+ maxMemorySegments, requiredMemorySegments, 7, 5, 6, 9, 4, 5,
0);
+ }
+
+ void testDecreasePoolSizeInternal(
+ int maxMemorySegments,
+ int requiredMemorySegments,
+ int largePoolSize,
+ int smallPoolSize,
+ int maxOverdraftBuffers,
+ int numBuffersToRequest,
+ int numRequestedOverdraftBuffersAfterDecreasing,
+ int numRequestedOrdinaryBuffersAfterDecreasing,
+ int numAvailableBuffersAfterDecreasing)
+ throws Exception {
LocalBufferPool bufferPool =
new LocalBufferPool(
networkBufferPool,
@@ -266,51 +293,33 @@ class LocalBufferPoolTest {
0,
Integer.MAX_VALUE,
maxOverdraftBuffers);
- Queue<MemorySegment> buffers = new LinkedList<>();
+ List<MemorySegment> buffers = new ArrayList<>();
// set a larger pool size.
bufferPool.setNumBuffers(largePoolSize);
assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize);
- // request all buffer.
- for (int i = 0; i < largePoolSize; i++) {
+ // request buffers.
+ for (int i = 0; i < numBuffersToRequest; i++) {
buffers.add(bufferPool.requestMemorySegmentBlocking());
}
- assertThat(bufferPool.isAvailable()).isFalse();
-
- // request 1 overdraft buffers.
- buffers.add(bufferPool.requestMemorySegmentBlocking());
-
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
- assertThat(bufferPool.isAvailable()).isFalse();
// set a small pool size.
bufferPool.setNumBuffers(smallPoolSize);
assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
- assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
-
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
- assertThat(bufferPool.isAvailable()).isFalse();
- buffers.add(bufferPool.requestMemorySegmentBlocking());
-
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
- assertThat(bufferPool.isAvailable()).isFalse();
-
- // return all overdraft buffers.
- bufferPool.recycle(buffers.poll());
-
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
- assertThat(bufferPool.isAvailable()).isFalse();
- bufferPool.recycle(buffers.poll());
-
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
- assertThat(bufferPool.isAvailable()).isFalse();
-
- // return the excess buffer.
- bufferPool.recycle(buffers.poll());
- assertThat(bufferPool.isAvailable()).isFalse();
- // return non-excess buffers.
- bufferPool.recycle(buffers.poll());
- assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
- assertThat(bufferPool.isAvailable()).isTrue();
+ assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments())
+ .isEqualTo(numRequestedOverdraftBuffersAfterDecreasing);
+ assertThat(
+ bufferPool.bestEffortGetNumOfUsedBuffers()
+ +
bufferPool.getNumberOfAvailableMemorySegments()
+ -
bufferPool.getNumberOfRequestedOverdraftMemorySegments())
+ .isEqualTo(numRequestedOrdinaryBuffersAfterDecreasing);
+ assertThat(bufferPool.getNumberOfAvailableMemorySegments())
+ .isEqualTo(numAvailableBuffersAfterDecreasing);
+
assertThat(bufferPool.isAvailable()).isEqualTo(numAvailableBuffersAfterDecreasing
> 0);
- while (!buffers.isEmpty()) {
- bufferPool.recycle(buffers.poll());
+ for (MemorySegment buffer : buffers) {
+ bufferPool.recycle(buffer);
}
bufferPool.lazyDestroy();
}