This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c6d7747eaef [FLINK-31639][network] Introduce the memory manager for
tiered storage
c6d7747eaef is described below
commit c6d7747eaef166fb7577de55cb2943fa5408d54e
Author: Yuxin Tan <[email protected]>
AuthorDate: Sun Apr 23 12:55:40 2023 +0800
[FLINK-31639][network] Introduce the memory manager for tiered storage
This closes #22352
---
.../runtime/io/network/buffer/LocalBufferPool.java | 2 +-
.../tiered/storage/TieredStorageMemoryManager.java | 108 ++++++++
.../storage/TieredStorageMemoryManagerImpl.java | 282 +++++++++++++++++++++
.../tiered/storage/TieredStorageMemorySpec.java | 45 ++++
.../TieredStorageMemoryManagerImplTest.java | 282 +++++++++++++++++++++
5 files changed, 718 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 6d6d236f902..873414c6fe2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -68,7 +68,7 @@ import static
org.apache.flink.util.concurrent.FutureUtils.assertNoException;
* {@link NetworkBufferPool} as long as it hasn't reached {@link
#maxNumberOfMemorySegments} or one
* subpartition reached the quota.
*/
-class LocalBufferPool implements BufferPool {
+public class LocalBufferPool implements BufferPool {
private static final Logger LOG =
LoggerFactory.getLogger(LocalBufferPool.class);
private static final int UNKNOWN_CHANNEL = -1;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
new file mode 100644
index 00000000000..87835c61278
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+
+import java.util.List;
+
+/**
+ * The {@link TieredStorageMemoryManager} is to request or recycle buffers
from {@link
+ * LocalBufferPool} for different memory owners, for example, the tiers, the
buffer accumulator,
+ * etc. Note that the logic for requesting and recycling buffers is consistent
for these owners.
+ *
+ * <p>The buffers managed by {@link TieredStorageMemoryManager} is categorized
into two types:
+ * <b>non-reclaimable</b> buffers which cannot be immediately released and
<b>reclaimable
+ * buffers</b> which can be reclaimed quickly and safely. Non-reclaimable
buffers necessitates
+ * waiting for other operations to complete before releasing it, such as
downstream consumption. On
+ * the other hand, reclaimable buffers can be freed up at any time, enabling
rapid memory recycling
+ * for tasks such as flushing memory to disk or remote storage.
+ *
+ * <p>The {@link TieredStorageMemoryManager} does not provide strict memory
limitations on any user
+ * can request. Instead, it only simply provides memory usage hints to memory
users. It is very
+ * <b>important</b> to note that <b>only</b> users with non-reclaimable should
check the memory
+ * hints by calling {@code getMaxNonReclaimableBuffers} before requesting
buffers.
+ */
+public interface TieredStorageMemoryManager {
+
+ /**
+ * Setup the {@link TieredStorageMemoryManager}. When setting up the
manager, the {@link
+ * TieredStorageMemorySpec}s for different tiered storages should be ready
to indicate each
+ * tiered storage's memory requirement specs.
+ *
+ * @param bufferPool the local buffer pool
+ * @param storageMemorySpecs the memory specs for different tiered storages
+ */
+ void setup(BufferPool bufferPool, List<TieredStorageMemorySpec>
storageMemorySpecs);
+
+ /**
+ * Register a listener to listen the buffer reclaim request from the {@link
+ * TieredStorageMemoryManager}.
+ *
+ * <p>When the left buffers in the {@link BufferPool} are not enough,
{@link
+ * TieredStorageMemoryManager} will try to reclaim the buffers from the
memory owners.
+ *
+ * @param onBufferReclaimRequest a {@link Runnable} to process the buffer
reclaim request
+ */
+ void listenBufferReclaimRequest(Runnable onBufferReclaimRequest);
+
+ /**
+ * Request a {@link BufferBuilder} instance from {@link BufferPool} for a
specific owner. The
+ * {@link TieredStorageMemoryManagerImpl} will not check whether a buffer
can be requested. The
+ * manager only records the number of requested buffers. If the buffers in
the {@link
+ * BufferPool} is not enough, the manager will request each tiered storage
to reclaim their
+ * requested buffers as much as possible.
+ *
+ * <p>This is not thread safe and is expected to be called only from the
task thread.
+ *
+ * @param owner the owner to request buffer
+ * @return the requested buffer
+ */
+ BufferBuilder requestBufferBlocking(Object owner);
+
+ /**
+ * Return the number of the non-reclaimable buffers for the owner.
+ *
+ * <p>Note that the available buffers are calculated dynamically based on
some conditions, for
+ * example, the state of the {@link BufferPool}, the {@link
TieredStorageMemorySpec} of the
+ * owner, etc. So the caller should always check before requesting
non-reclaimable buffers.
+ *
+ * <p>When invoking this method, the caller should be aware that the
return value may
+ * occasionally be negative. This is due to the possibility of the buffer
pool size shrinking to
+ * a point where it is smaller than the buffers owned by other users. In
such cases, the maximum
+ * non-reclaimable buffer value returned may be negative.
+ */
+ int getMaxNonReclaimableBuffers(Object owner);
+
+ /**
+ * Return the number of requested buffers belonging to a specific owner.
+ *
+ * @param owner the owner of requesting buffers
+ * @return the number of requested buffers belonging to the owner.
+ */
+ int numOwnerRequestedBuffer(Object owner);
+
+ /**
+ * Release all the resources(if exists) and check the state of the {@link
+ * TieredStorageMemoryManager}.
+ */
+ void release();
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
new file mode 100644
index 00000000000..ddc86c1c78c
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+
+import
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation for {@link TieredStorageMemoryManager}. This is to
request or recycle buffers
+ * from {@link LocalBufferPool} for different memory owners, for example, the
tiers, the buffer
+ * accumulator, etc.
+ *
+ * <p>Note that the {@link TieredStorageMemorySpec}s of the tiered storages
should be ready when
+ * setting up the memory manager. Only after the setup process is finished,
the tiered storage can
+ * request buffers from this manager.
+ */
+public class TieredStorageMemoryManagerImpl implements
TieredStorageMemoryManager {
+
+ /** Time to wait for requesting new buffers before triggering buffer
reclaiming. */
+ private static final int INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS
= 50;
+
+ /** The tiered storage memory specs of each memory user owner. */
+ private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs;
+
+ /** Listeners used to listen the requests for reclaiming buffer in
different tiered storage. */
+ private final List<Runnable> bufferReclaimRequestListeners;
+
+ /** The buffer pool usage ratio of triggering the registered storages to
reclaim buffers. */
+ private final float numTriggerReclaimBuffersRatio;
+
+ /**
+ * Indicates whether reclaiming of buffers is supported. If supported,
when there's a
+ * contention, we may try reclaim buffers from the memory owners.
+ */
+ private final boolean mayReclaimBuffer;
+
+ /**
+ * The number of requested buffers from {@link BufferPool}. This field can
be touched both by
+ * the task thread and the netty thread, so it is an atomic type.
+ */
+ private final AtomicInteger numRequestedBuffers;
+
+ /**
+ * The number of requested buffers from {@link BufferPool} for each memory
owner. This field
+ * should be thread-safe because it can be touched both by the task thread
and the netty thread.
+ */
+ private final Map<Object, AtomicInteger> numOwnerRequestedBuffers;
+
+ /**
+ * This is for triggering buffer reclaiming while blocked on requesting
new buffers.
+ *
+ * <p>Note: This can be null iff buffer reclaiming is not supported.
+ */
+ @Nullable private ScheduledExecutorService executor;
+
+ /** The buffer pool where the buffer is requested or recycled. */
+ private BufferPool bufferPool;
+
+ /**
+ * Indicate whether the {@link TieredStorageMemoryManagerImpl} is
initialized. Before setting
+ * up, this field is false.
+ *
+ * <p>Note that before requesting buffers or getting the maximum allowed
buffers, this
+ * initialized state should be checked.
+ */
+ private boolean isInitialized;
+
+ /**
+ * The constructor of the {@link TieredStorageMemoryManagerImpl}.
+ *
+ * @param numTriggerReclaimBuffersRatio the buffer pool usage ratio of
requesting each tiered
+ * storage to reclaim buffers
+ * @param mayReclaimBuffer indicate whether buffer reclaiming is supported
+ */
+ public TieredStorageMemoryManagerImpl(
+ float numTriggerReclaimBuffersRatio, boolean mayReclaimBuffer) {
+ this.numTriggerReclaimBuffersRatio = numTriggerReclaimBuffersRatio;
+ this.mayReclaimBuffer = mayReclaimBuffer;
+ this.tieredMemorySpecs = new HashMap<>();
+ this.numRequestedBuffers = new AtomicInteger(0);
+ this.numOwnerRequestedBuffers = new ConcurrentHashMap<>();
+ this.bufferReclaimRequestListeners = new ArrayList<>();
+ this.isInitialized = false;
+ }
+
+ @Override
+ public void setup(BufferPool bufferPool, List<TieredStorageMemorySpec>
storageMemorySpecs) {
+ this.bufferPool = bufferPool;
+ for (TieredStorageMemorySpec memorySpec : storageMemorySpecs) {
+ checkState(
+ !tieredMemorySpecs.containsKey(memorySpec.getOwner()),
+ "Duplicated memory spec.");
+ tieredMemorySpecs.put(memorySpec.getOwner(), memorySpec);
+ }
+
+ if (mayReclaimBuffer) {
+ this.executor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("buffer reclaim checker")
+
.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE)
+ .build());
+ }
+
+ this.isInitialized = true;
+ }
+
+ @Override
+ public void listenBufferReclaimRequest(Runnable onBufferReclaimRequest) {
+ bufferReclaimRequestListeners.add(onBufferReclaimRequest);
+ }
+
+ @Override
+ public BufferBuilder requestBufferBlocking(Object owner) {
+ checkIsInitialized();
+
+ reclaimBuffersIfNeeded();
+
+ CompletableFuture<Void> requestBufferFuture = new
CompletableFuture<>();
+ scheduleCheckRequestBufferFuture(
+ requestBufferFuture,
INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS);
+ MemorySegment memorySegment = null;
+ try {
+ memorySegment = bufferPool.requestMemorySegmentBlocking();
+ } catch (InterruptedException e) {
+ ExceptionUtils.rethrow(e);
+ }
+ requestBufferFuture.complete(null);
+
+ incNumRequestedBuffer(owner);
+ return new BufferBuilder(
+ checkNotNull(memorySegment), segment -> recycleBuffer(owner,
segment));
+ }
+
+ @Override
+ public int getMaxNonReclaimableBuffers(Object owner) {
+ checkIsInitialized();
+
+ int numBuffersUsedOrReservedForOtherOwners = 0;
+ for (Map.Entry<Object, TieredStorageMemorySpec> memorySpecEntry :
+ tieredMemorySpecs.entrySet()) {
+ Object userOwner = memorySpecEntry.getKey();
+ TieredStorageMemorySpec storageMemorySpec =
memorySpecEntry.getValue();
+ if (!userOwner.equals(owner)) {
+ int numGuaranteed =
storageMemorySpec.getNumGuaranteedBuffers();
+ int numRequested = numOwnerRequestedBuffer(userOwner);
+ numBuffersUsedOrReservedForOtherOwners +=
Math.max(numGuaranteed, numRequested);
+ }
+ }
+ // Note that a sudden reduction in the size of the buffer pool may
result in non-reclaimable
+ // buffer memory occupying the guaranteed buffers of other users.
However, this occurrence
+ // is limited to the memory tier, which is only utilized when
downstream registration is in
+ // effect. Furthermore, the buffers within the memory tier can be
recycled quickly enough,
+ // thereby minimizing the impact on the guaranteed buffers of other
tiers.
+ return bufferPool.getNumBuffers() -
numBuffersUsedOrReservedForOtherOwners;
+ }
+
+ @Override
+ public int numOwnerRequestedBuffer(Object owner) {
+ AtomicInteger numRequestedBuffer = numOwnerRequestedBuffers.get(owner);
+ return numRequestedBuffer == null ? 0 : numRequestedBuffer.get();
+ }
+
+ @Override
+ public void release() {
+ checkState(numRequestedBuffers.get() == 0, "Leaking buffers.");
+ if (executor != null) {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(5L, TimeUnit.MINUTES)) {
+ throw new TimeoutException(
+ "Timeout for shutting down the buffer reclaim
checker executor.");
+ }
+ } catch (Exception e) {
+ ExceptionUtils.rethrow(e);
+ }
+ }
+ }
+
+ private void scheduleCheckRequestBufferFuture(
+ CompletableFuture<Void> requestBufferFuture, long delayMs) {
+ if (!mayReclaimBuffer || requestBufferFuture.isDone()) {
+ return;
+ }
+ checkNotNull(executor)
+ .schedule(
+ // The delay time will be doubled after each check to
avoid checking the
+ // future too frequently.
+ () ->
internalCheckRequestBufferFuture(requestBufferFuture, delayMs * 2),
+ delayMs,
+ TimeUnit.MILLISECONDS);
+ }
+
+ private void internalCheckRequestBufferFuture(
+ CompletableFuture<Void> requestBufferFuture, long
delayForNextCheckMs) {
+ if (requestBufferFuture.isDone()) {
+ return;
+ }
+ reclaimBuffersIfNeeded();
+ scheduleCheckRequestBufferFuture(requestBufferFuture,
delayForNextCheckMs);
+ }
+
+ private void incNumRequestedBuffer(Object owner) {
+ numOwnerRequestedBuffers
+ .computeIfAbsent(owner, ignore -> new AtomicInteger(0))
+ .incrementAndGet();
+ numRequestedBuffers.incrementAndGet();
+ }
+
+ private void decNumRequestedBuffer(Object owner) {
+ AtomicInteger numOwnerRequestedBuffer =
numOwnerRequestedBuffers.get(owner);
+ checkNotNull(numOwnerRequestedBuffer).decrementAndGet();
+ numRequestedBuffers.decrementAndGet();
+ }
+
+ private void reclaimBuffersIfNeeded() {
+ if (shouldReclaimBuffersBeforeRequesting()) {
+ bufferReclaimRequestListeners.forEach(Runnable::run);
+ }
+ }
+
+ private boolean shouldReclaimBuffersBeforeRequesting() {
+ // The accuracy of the memory usage ratio may be compromised due to
the varying buffer pool
+ // sizes. However, this only impacts a single iteration of the buffer
usage check. Upon the
+ // next iteration, the buffer reclaim will eventually be triggered.
+ int numTotal = bufferPool.getNumBuffers();
+ int numRequested = numRequestedBuffers.get();
+ return numRequested >= numTotal
+ // Because we do the checking before requesting buffers, we
need add additional one
+ // buffer when calculating the usage ratio.
+ || ((numRequested + 1) * 1.0 / numTotal) >
numTriggerReclaimBuffersRatio;
+ }
+
+ /** Note that this method may be called by the netty thread. */
+ private void recycleBuffer(Object owner, MemorySegment buffer) {
+ bufferPool.recycle(buffer);
+ decNumRequestedBuffer(owner);
+ }
+
+ private void checkIsInitialized() {
+ checkState(isInitialized, "The memory manager is not in the running
state.");
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemorySpec.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemorySpec.java
new file mode 100644
index 00000000000..6a2b9b71f9f
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemorySpec.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+/**
+ * The memory specs for a memory owner, including the owner itself, the number
of guaranteed buffers
+ * of the memory owner, etc.
+ */
+public class TieredStorageMemorySpec {
+
+ /** The memory use owner. */
+ private final Object owner;
+
+ /** The number of guaranteed buffers of this memory owner. */
+ private final int numGuaranteedBuffers;
+
+ public TieredStorageMemorySpec(Object owner, int numGuaranteedBuffers) {
+ this.owner = owner;
+ this.numGuaranteedBuffers = numGuaranteedBuffers;
+ }
+
+ public Object getOwner() {
+ return owner;
+ }
+
+ public int getNumGuaranteedBuffers() {
+ return numGuaranteedBuffers;
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImplTest.java
new file mode 100644
index 00000000000..e4591bd0021
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImplTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** Tests for {@link TieredStorageMemoryManagerImpl}. */
+public class TieredStorageMemoryManagerImplTest {
+
+ private static final int NETWORK_BUFFER_SIZE = 1024;
+
+ private static final int NUM_TOTAL_BUFFERS = 1000;
+
+ private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f;
+
+ private NetworkBufferPool globalPool;
+
+ private List<BufferBuilder> requestedBuffers;
+
+ private CompletableFuture<Void> hasReclaimBufferFinished;
+
+ private int reclaimBufferCounter;
+
+ @BeforeEach
+ void before() {
+ globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS,
NETWORK_BUFFER_SIZE);
+ requestedBuffers = new ArrayList<>();
+ hasReclaimBufferFinished = new CompletableFuture<>();
+ reclaimBufferCounter = 0;
+ }
+
+ @AfterEach
+ void after() {
+ globalPool.destroy();
+ }
+
+ @Test
+ void testRequestAndRecycleBuffers() throws IOException {
+ int numBuffers = 1;
+
+ BufferPool bufferPool = globalPool.createBufferPool(numBuffers,
numBuffers);
+ TieredStorageMemoryManagerImpl storageMemoryManager =
+ createStorageMemoryManager(
+ bufferPool,
+ Collections.singletonList(new
TieredStorageMemorySpec(this, 0)));
+ assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(0);
+ BufferBuilder builder =
storageMemoryManager.requestBufferBlocking(this);
+ assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(1);
+ recycleBufferBuilder(builder);
+ assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(0);
+ storageMemoryManager.release();
+ }
+
+ @Test
+ void testGetMaxNonReclaimableBuffers() throws IOException {
+ int numBuffers = 10;
+ int numExclusive = 5;
+
+ TieredStorageMemoryManagerImpl storageMemoryManager =
+ createStorageMemoryManager(
+ numBuffers,
+ Collections.singletonList(new
TieredStorageMemorySpec(this, numExclusive)));
+
+ List<BufferBuilder> requestedBuffers = new ArrayList<>();
+ for (int i = 1; i <= numBuffers; i++) {
+
requestedBuffers.add(storageMemoryManager.requestBufferBlocking(this));
+ assertThat(storageMemoryManager.getMaxNonReclaimableBuffers(this))
+ .isEqualTo(numBuffers);
+ int numExpectedAvailable = numBuffers - i;
+ assertThat(
+
storageMemoryManager.getMaxNonReclaimableBuffers(this)
+ -
storageMemoryManager.numOwnerRequestedBuffer(this))
+ .isEqualTo(numExpectedAvailable);
+ }
+
+
requestedBuffers.forEach(TieredStorageMemoryManagerImplTest::recycleBufferBuilder);
+ storageMemoryManager.release();
+ }
+
+ @Test
+ void testNumMaxNonReclaimableWhenOtherUseLessThanGuaranteed() throws
IOException {
+ int numBuffers = 10;
+ int numExclusive = 4;
+
+ List<TieredStorageMemorySpec> storageMemorySpecs = new ArrayList<>();
+ Object otherUser = new Object();
+ storageMemorySpecs.add(new TieredStorageMemorySpec(this, 0));
+ storageMemorySpecs.add(new TieredStorageMemorySpec(otherUser,
numExclusive));
+ TieredStorageMemoryManagerImpl storageMemoryManager =
+ createStorageMemoryManager(numBuffers, storageMemorySpecs);
+
+ List<BufferBuilder> requestedBuffers = new ArrayList<>();
+ assertThat(storageMemoryManager.getMaxNonReclaimableBuffers(this))
+ .isEqualTo(numBuffers - numExclusive);
+ for (int i = 1; i <= numBuffers; i++) {
+
requestedBuffers.add(storageMemoryManager.requestBufferBlocking(this));
+ assertThat(storageMemoryManager.getMaxNonReclaimableBuffers(this))
+ .isEqualTo(numBuffers - numExclusive);
+ int numExpectedAvailable = numBuffers - i - numExclusive;
+ assertThat(
+
storageMemoryManager.getMaxNonReclaimableBuffers(this)
+ -
storageMemoryManager.numOwnerRequestedBuffer(this))
+ .isEqualTo(numExpectedAvailable);
+ }
+
+
requestedBuffers.forEach(TieredStorageMemoryManagerImplTest::recycleBufferBuilder);
+ storageMemoryManager.release();
+ }
+
+ @Test
+ void testNumMaxNonReclaimableWhenOtherUseMoreThanGuaranteed() throws
IOException {
+ int numBuffers = 10;
+ int numExclusive = 4;
+
+ List<TieredStorageMemorySpec> storageMemorySpecs = new ArrayList<>();
+ Object otherUser = new Object();
+ storageMemorySpecs.add(new TieredStorageMemorySpec(this, 0));
+ storageMemorySpecs.add(new TieredStorageMemorySpec(otherUser,
numExclusive));
+ TieredStorageMemoryManagerImpl storageMemoryManager =
+ createStorageMemoryManager(numBuffers, storageMemorySpecs);
+
+ int numRequestedByOtherUser = numExclusive + 1;
+ for (int i = 0; i < numRequestedByOtherUser; i++) {
+
requestedBuffers.add(storageMemoryManager.requestBufferBlocking(otherUser));
+ }
+
+ assertThat(storageMemoryManager.getMaxNonReclaimableBuffers(this))
+ .isEqualTo(numBuffers - numRequestedByOtherUser);
+ for (int i = 1; i <= numBuffers - numRequestedByOtherUser; i++) {
+
requestedBuffers.add(storageMemoryManager.requestBufferBlocking(this));
+ assertThat(storageMemoryManager.getMaxNonReclaimableBuffers(this))
+ .isEqualTo(numBuffers - numRequestedByOtherUser);
+ int numExpectedAvailable = numBuffers - i -
numRequestedByOtherUser;
+ assertThat(
+
storageMemoryManager.getMaxNonReclaimableBuffers(this)
+ -
storageMemoryManager.numOwnerRequestedBuffer(this))
+ .isEqualTo(numExpectedAvailable);
+ }
+ assertThat(storageMemoryManager.numOwnerRequestedBuffer(this))
+ .isEqualTo(numBuffers - numRequestedByOtherUser);
+ assertThat(storageMemoryManager.numOwnerRequestedBuffer(otherUser))
+ .isEqualTo(numRequestedByOtherUser);
+
+
requestedBuffers.forEach(TieredStorageMemoryManagerImplTest::recycleBufferBuilder);
+ storageMemoryManager.release();
+ }
+
+ @Test
+ @Timeout(60)
+ void testTriggerReclaimBuffers() throws IOException {
+ int numBuffers = 5;
+
+ TieredStorageMemoryManagerImpl storageMemoryManager =
+ createStorageMemoryManager(
+ numBuffers,
+ Collections.singletonList(new
TieredStorageMemorySpec(this, 0)));
+
storageMemoryManager.listenBufferReclaimRequest(this::onBufferReclaimRequest);
+
+ int numBuffersBeforeTriggerReclaim = (int) (numBuffers *
NUM_BUFFERS_TRIGGER_FLUSH_RATIO);
+ for (int i = 0; i < numBuffersBeforeTriggerReclaim; i++) {
+
requestedBuffers.add(storageMemoryManager.requestBufferBlocking(this));
+ }
+
+ assertThat(reclaimBufferCounter).isEqualTo(0);
+
assertThat(requestedBuffers.size()).isEqualTo(numBuffersBeforeTriggerReclaim);
+ requestedBuffers.add(storageMemoryManager.requestBufferBlocking(this));
+ assertThatFuture(hasReclaimBufferFinished).eventuallySucceeds();
+ assertThat(reclaimBufferCounter).isEqualTo(1);
+ assertThat(requestedBuffers.size()).isEqualTo(1);
+ recycleRequestedBuffers();
+
+ storageMemoryManager.release();
+ }
+
+ @Test
+ void testReleaseBeforeRecyclingBuffers() throws IOException {
+ int numBuffers = 5;
+
+ TieredStorageMemoryManagerImpl storageMemoryManager =
+ createStorageMemoryManager(
+ numBuffers,
+ Collections.singletonList(new
TieredStorageMemorySpec(this, 0)));
+ requestedBuffers.add(storageMemoryManager.requestBufferBlocking(this));
+
assertThatThrownBy(storageMemoryManager::release).isInstanceOf(IllegalStateException.class);
+ recycleRequestedBuffers();
+ storageMemoryManager.release();
+ }
+
+ @Test
+ void testLeakingBuffers() throws IOException {
+ int numBuffers = 10;
+
+ TieredStorageMemoryManagerImpl storageMemoryManager =
+ createStorageMemoryManager(
+ numBuffers,
+ Collections.singletonList(new
TieredStorageMemorySpec(this, 0)));
+
+ requestedBuffers.add(storageMemoryManager.requestBufferBlocking(this));
+ assertThatThrownBy(storageMemoryManager::release)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Leaking buffers");
+ recycleRequestedBuffers();
+ storageMemoryManager.release();
+ }
+
+ public void onBufferReclaimRequest() {
+ reclaimBufferCounter++;
+ recycleRequestedBuffers();
+ hasReclaimBufferFinished.complete(null);
+ }
+
+ private void recycleRequestedBuffers() {
+ requestedBuffers.forEach(
+ builder -> {
+ BufferConsumer bufferConsumer =
builder.createBufferConsumer();
+ Buffer buffer = bufferConsumer.build();
+ buffer.getRecycler().recycle(buffer.getMemorySegment());
+ });
+ requestedBuffers.clear();
+ }
+
+ private TieredStorageMemoryManagerImpl createStorageMemoryManager(
+ int numBuffersInBufferPool, List<TieredStorageMemorySpec>
storageMemorySpecs)
+ throws IOException {
+ BufferPool bufferPool =
+ globalPool.createBufferPool(numBuffersInBufferPool,
numBuffersInBufferPool);
+ return createStorageMemoryManager(bufferPool, storageMemorySpecs);
+ }
+
+ private TieredStorageMemoryManagerImpl createStorageMemoryManager(
+ BufferPool bufferPool, List<TieredStorageMemorySpec>
storageMemorySpecs) {
+ TieredStorageMemoryManagerImpl storageProducerMemoryManager =
+ new
TieredStorageMemoryManagerImpl(NUM_BUFFERS_TRIGGER_FLUSH_RATIO, true);
+ storageProducerMemoryManager.setup(bufferPool, storageMemorySpecs);
+ return storageProducerMemoryManager;
+ }
+
+ private static void recycleBufferBuilder(BufferBuilder bufferBuilder) {
+ BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
+ Buffer buffer = bufferConsumer.build();
+ NetworkBuffer networkBuffer =
+ new NetworkBuffer(
+ buffer.getMemorySegment(), buffer.getRecycler(),
buffer.getDataType());
+ networkBuffer.recycleBuffer();
+ }
+}