This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c6d7747eaef [FLINK-31639][network] Introduce the memory manager for 
tiered storage
c6d7747eaef is described below

commit c6d7747eaef166fb7577de55cb2943fa5408d54e
Author: Yuxin Tan <[email protected]>
AuthorDate: Sun Apr 23 12:55:40 2023 +0800

    [FLINK-31639][network] Introduce the memory manager for tiered storage
    
    This closes #22352
---
 .../runtime/io/network/buffer/LocalBufferPool.java |   2 +-
 .../tiered/storage/TieredStorageMemoryManager.java | 108 ++++++++
 .../storage/TieredStorageMemoryManagerImpl.java    | 282 +++++++++++++++++++++
 .../tiered/storage/TieredStorageMemorySpec.java    |  45 ++++
 .../TieredStorageMemoryManagerImplTest.java        | 282 +++++++++++++++++++++
 5 files changed, 718 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 6d6d236f902..873414c6fe2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -68,7 +68,7 @@ import static 
org.apache.flink.util.concurrent.FutureUtils.assertNoException;
  * {@link NetworkBufferPool} as long as it hasn't reached {@link 
#maxNumberOfMemorySegments} or one
  * subpartition reached the quota.
  */
-class LocalBufferPool implements BufferPool {
+public class LocalBufferPool implements BufferPool {
     private static final Logger LOG = 
LoggerFactory.getLogger(LocalBufferPool.class);
 
     private static final int UNKNOWN_CHANNEL = -1;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
new file mode 100644
index 00000000000..87835c61278
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+
+import java.util.List;
+
+/**
+ * The {@link TieredStorageMemoryManager} is to request or recycle buffers 
from {@link
+ * LocalBufferPool} for different memory owners, for example, the tiers, the 
buffer accumulator,
+ * etc. Note that the logic for requesting and recycling buffers is consistent 
for these owners.
+ *
+ * <p>The buffers managed by {@link TieredStorageMemoryManager} is categorized 
into two types:
+ * <b>non-reclaimable</b> buffers which cannot be immediately released and 
<b>reclaimable
+ * buffers</b> which can be reclaimed quickly and safely. Non-reclaimable 
buffers necessitates
+ * waiting for other operations to complete before releasing it, such as 
downstream consumption. On
+ * the other hand, reclaimable buffers can be freed up at any time, enabling 
rapid memory recycling
+ * for tasks such as flushing memory to disk or remote storage.
+ *
+ * <p>The {@link TieredStorageMemoryManager} does not provide strict memory 
limitations on any user
+ * can request. Instead, it only simply provides memory usage hints to memory 
users. It is very
+ * <b>important</b> to note that <b>only</b> users with non-reclaimable should 
check the memory
+ * hints by calling {@code getMaxNonReclaimableBuffers} before requesting 
buffers.
+ */
+public interface TieredStorageMemoryManager {
+
+    /**
+     * Setup the {@link TieredStorageMemoryManager}. When setting up the 
manager, the {@link
+     * TieredStorageMemorySpec}s for different tiered storages should be ready 
to indicate each
+     * tiered storage's memory requirement specs.
+     *
+     * @param bufferPool the local buffer pool
+     * @param storageMemorySpecs the memory specs for different tiered storages
+     */
+    void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> 
storageMemorySpecs);
+
+    /**
+     * Register a listener to listen the buffer reclaim request from the {@link
+     * TieredStorageMemoryManager}.
+     *
+     * <p>When the left buffers in the {@link BufferPool} are not enough, 
{@link
+     * TieredStorageMemoryManager} will try to reclaim the buffers from the 
memory owners.
+     *
+     * @param onBufferReclaimRequest a {@link Runnable} to process the buffer 
reclaim request
+     */
+    void listenBufferReclaimRequest(Runnable onBufferReclaimRequest);
+
+    /**
+     * Request a {@link BufferBuilder} instance from {@link BufferPool} for a 
specific owner. The
+     * {@link TieredStorageMemoryManagerImpl} will not check whether a buffer 
can be requested. The
+     * manager only records the number of requested buffers. If the buffers in 
the {@link
+     * BufferPool} is not enough, the manager will request each tiered storage 
to reclaim their
+     * requested buffers as much as possible.
+     *
+     * <p>This is not thread safe and is expected to be called only from the 
task thread.
+     *
+     * @param owner the owner to request buffer
+     * @return the requested buffer
+     */
+    BufferBuilder requestBufferBlocking(Object owner);
+
+    /**
+     * Return the number of the non-reclaimable buffers for the owner.
+     *
+     * <p>Note that the available buffers are calculated dynamically based on 
some conditions, for
+     * example, the state of the {@link BufferPool}, the {@link 
TieredStorageMemorySpec} of the
+     * owner, etc. So the caller should always check before requesting 
non-reclaimable buffers.
+     *
+     * <p>When invoking this method, the caller should be aware that the 
return value may
+     * occasionally be negative. This is due to the possibility of the buffer 
pool size shrinking to
+     * a point where it is smaller than the buffers owned by other users. In 
such cases, the maximum
+     * non-reclaimable buffer value returned may be negative.
+     */
+    int getMaxNonReclaimableBuffers(Object owner);
+
+    /**
+     * Return the number of requested buffers belonging to a specific owner.
+     *
+     * @param owner the owner of requesting buffers
+     * @return the number of requested buffers belonging to the owner.
+     */
+    int numOwnerRequestedBuffer(Object owner);
+
+    /**
+     * Release all the resources(if exists) and check the state of the {@link
+     * TieredStorageMemoryManager}.
+     */
+    void release();
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
new file mode 100644
index 00000000000..ddc86c1c78c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation for {@link TieredStorageMemoryManager}. This is to 
request or recycle buffers
+ * from {@link LocalBufferPool} for different memory owners, for example, the 
tiers, the buffer
+ * accumulator, etc.
+ *
+ * <p>Note that the {@link TieredStorageMemorySpec}s of the tiered storages 
should be ready when
+ * setting up the memory manager. Only after the setup process is finished, 
the tiered storage can
+ * request buffers from this manager.
+ */
+public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManager {
+
+    /** Time to wait for requesting new buffers before triggering buffer 
reclaiming. */
+    private static final int INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS 
= 50;
+
+    /** The tiered storage memory specs of each memory user owner. */
+    private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs;
+
+    /** Listeners used to listen the requests for reclaiming buffer in 
different tiered storage. */
+    private final List<Runnable> bufferReclaimRequestListeners;
+
+    /** The buffer pool usage ratio of triggering the registered storages to 
reclaim buffers. */
+    private final float numTriggerReclaimBuffersRatio;
+
+    /**
+     * Indicates whether reclaiming of buffers is supported. If supported, 
when there's a
+     * contention, we may try reclaim buffers from the memory owners.
+     */
+    private final boolean mayReclaimBuffer;
+
+    /**
+     * The number of requested buffers from {@link BufferPool}. This field can 
be touched both by
+     * the task thread and the netty thread, so it is an atomic type.
+     */
+    private final AtomicInteger numRequestedBuffers;
+
+    /**
+     * The number of requested buffers from {@link BufferPool} for each memory 
owner. This field
+     * should be thread-safe because it can be touched both by the task thread 
and the netty thread.
+     */
+    private final Map<Object, AtomicInteger> numOwnerRequestedBuffers;
+
+    /**
+     * This is for triggering buffer reclaiming while blocked on requesting 
new buffers.
+     *
+     * <p>Note: This can be null iff buffer reclaiming is not supported.
+     */
+    @Nullable private ScheduledExecutorService executor;
+
+    /** The buffer pool where the buffer is requested or recycled. */
+    private BufferPool bufferPool;
+
+    /**
+     * Indicate whether the {@link TieredStorageMemoryManagerImpl} is 
initialized. Before setting
+     * up, this field is false.
+     *
+     * <p>Note that before requesting buffers or getting the maximum allowed 
buffers, this
+     * initialized state should be checked.
+     */
+    private boolean isInitialized;
+
+    /**
+     * The constructor of the {@link TieredStorageMemoryManagerImpl}.
+     *
+     * @param numTriggerReclaimBuffersRatio the buffer pool usage ratio of 
requesting each tiered
+     *     storage to reclaim buffers
+     * @param mayReclaimBuffer indicate whether buffer reclaiming is supported
+     */
+    public TieredStorageMemoryManagerImpl(
+            float numTriggerReclaimBuffersRatio, boolean mayReclaimBuffer) {
+        this.numTriggerReclaimBuffersRatio = numTriggerReclaimBuffersRatio;
+        this.mayReclaimBuffer = mayReclaimBuffer;
+        this.tieredMemorySpecs = new HashMap<>();
+        this.numRequestedBuffers = new AtomicInteger(0);
+        this.numOwnerRequestedBuffers = new ConcurrentHashMap<>();
+        this.bufferReclaimRequestListeners = new ArrayList<>();
+        this.isInitialized = false;
+    }
+
+    @Override
+    public void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> 
storageMemorySpecs) {
+        this.bufferPool = bufferPool;
+        for (TieredStorageMemorySpec memorySpec : storageMemorySpecs) {
+            checkState(
+                    !tieredMemorySpecs.containsKey(memorySpec.getOwner()),
+                    "Duplicated memory spec.");
+            tieredMemorySpecs.put(memorySpec.getOwner(), memorySpec);
+        }
+
+        if (mayReclaimBuffer) {
+            this.executor =
+                    Executors.newSingleThreadScheduledExecutor(
+                            new ThreadFactoryBuilder()
+                                    .setNameFormat("buffer reclaim checker")
+                                    
.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE)
+                                    .build());
+        }
+
+        this.isInitialized = true;
+    }
+
+    @Override
+    public void listenBufferReclaimRequest(Runnable onBufferReclaimRequest) {
+        bufferReclaimRequestListeners.add(onBufferReclaimRequest);
+    }
+
+    @Override
+    public BufferBuilder requestBufferBlocking(Object owner) {
+        checkIsInitialized();
+
+        reclaimBuffersIfNeeded();
+
+        CompletableFuture<Void> requestBufferFuture = new 
CompletableFuture<>();
+        scheduleCheckRequestBufferFuture(
+                requestBufferFuture, 
INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS);
+        MemorySegment memorySegment = null;
+        try {
+            memorySegment = bufferPool.requestMemorySegmentBlocking();
+        } catch (InterruptedException e) {
+            ExceptionUtils.rethrow(e);
+        }
+        requestBufferFuture.complete(null);
+
+        incNumRequestedBuffer(owner);
+        return new BufferBuilder(
+                checkNotNull(memorySegment), segment -> recycleBuffer(owner, 
segment));
+    }
+
+    @Override
+    public int getMaxNonReclaimableBuffers(Object owner) {
+        checkIsInitialized();
+
+        int numBuffersUsedOrReservedForOtherOwners = 0;
+        for (Map.Entry<Object, TieredStorageMemorySpec> memorySpecEntry :
+                tieredMemorySpecs.entrySet()) {
+            Object userOwner = memorySpecEntry.getKey();
+            TieredStorageMemorySpec storageMemorySpec = 
memorySpecEntry.getValue();
+            if (!userOwner.equals(owner)) {
+                int numGuaranteed = 
storageMemorySpec.getNumGuaranteedBuffers();
+                int numRequested = numOwnerRequestedBuffer(userOwner);
+                numBuffersUsedOrReservedForOtherOwners += 
Math.max(numGuaranteed, numRequested);
+            }
+        }
+        // Note that a sudden reduction in the size of the buffer pool may 
result in non-reclaimable
+        // buffer memory occupying the guaranteed buffers of other users. 
However, this occurrence
+        // is limited to the memory tier, which is only utilized when 
downstream registration is in
+        // effect. Furthermore, the buffers within the memory tier can be 
recycled quickly enough,
+        // thereby minimizing the impact on the guaranteed buffers of other 
tiers.
+        return bufferPool.getNumBuffers() - 
numBuffersUsedOrReservedForOtherOwners;
+    }
+
+    @Override
+    public int numOwnerRequestedBuffer(Object owner) {
+        AtomicInteger numRequestedBuffer = numOwnerRequestedBuffers.get(owner);
+        return numRequestedBuffer == null ? 0 : numRequestedBuffer.get();
+    }
+
+    @Override
+    public void release() {
+        checkState(numRequestedBuffers.get() == 0, "Leaking buffers.");
+        if (executor != null) {
+            executor.shutdown();
+            try {
+                if (!executor.awaitTermination(5L, TimeUnit.MINUTES)) {
+                    throw new TimeoutException(
+                            "Timeout for shutting down the buffer reclaim 
checker executor.");
+                }
+            } catch (Exception e) {
+                ExceptionUtils.rethrow(e);
+            }
+        }
+    }
+
+    private void scheduleCheckRequestBufferFuture(
+            CompletableFuture<Void> requestBufferFuture, long delayMs) {
+        if (!mayReclaimBuffer || requestBufferFuture.isDone()) {
+            return;
+        }
+        checkNotNull(executor)
+                .schedule(
+                        // The delay time will be doubled after each check to 
avoid checking the
+                        // future too frequently.
+                        () -> 
internalCheckRequestBufferFuture(requestBufferFuture, delayMs * 2),
+                        delayMs,
+                        TimeUnit.MILLISECONDS);
+    }
+
+    private void internalCheckRequestBufferFuture(
+            CompletableFuture<Void> requestBufferFuture, long 
delayForNextCheckMs) {
+        if (requestBufferFuture.isDone()) {
+            return;
+        }
+        reclaimBuffersIfNeeded();
+        scheduleCheckRequestBufferFuture(requestBufferFuture, 
delayForNextCheckMs);
+    }
+
+    private void incNumRequestedBuffer(Object owner) {
+        numOwnerRequestedBuffers
+                .computeIfAbsent(owner, ignore -> new AtomicInteger(0))
+                .incrementAndGet();
+        numRequestedBuffers.incrementAndGet();
+    }
+
+    private void decNumRequestedBuffer(Object owner) {
+        AtomicInteger numOwnerRequestedBuffer = 
numOwnerRequestedBuffers.get(owner);
+        checkNotNull(numOwnerRequestedBuffer).decrementAndGet();
+        numRequestedBuffers.decrementAndGet();
+    }
+
+    private void reclaimBuffersIfNeeded() {
+        if (shouldReclaimBuffersBeforeRequesting()) {
+            bufferReclaimRequestListeners.forEach(Runnable::run);
+        }
+    }
+
+    private boolean shouldReclaimBuffersBeforeRequesting() {
+        // The accuracy of the memory usage ratio may be compromised due to 
the varying buffer pool
+        // sizes. However, this only impacts a single iteration of the buffer 
usage check. Upon the
+        // next iteration, the buffer reclaim will eventually be triggered.
+        int numTotal = bufferPool.getNumBuffers();
+        int numRequested = numRequestedBuffers.get();
+        return numRequested >= numTotal
+                // Because we do the checking before requesting buffers, we 
need add additional one
+                // buffer when calculating the usage ratio.
+                || ((numRequested + 1) * 1.0 / numTotal) > 
numTriggerReclaimBuffersRatio;
+    }
+
+    /** Note that this method may be called by the netty thread. */
+    private void recycleBuffer(Object owner, MemorySegment buffer) {
+        bufferPool.recycle(buffer);
+        decNumRequestedBuffer(owner);
+    }
+
+    private void checkIsInitialized() {
+        checkState(isInitialized, "The memory manager is not in the running 
state.");
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemorySpec.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemorySpec.java
new file mode 100644
index 00000000000..6a2b9b71f9f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemorySpec.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+/**
+ * The memory specs for a memory owner, including the owner itself, the number 
of guaranteed buffers
+ * of the memory owner, etc.
+ */
+public class TieredStorageMemorySpec {
+
+    /** The memory use owner. */
+    private final Object owner;
+
+    /** The number of guaranteed buffers of this memory owner. */
+    private final int numGuaranteedBuffers;
+
+    public TieredStorageMemorySpec(Object owner, int numGuaranteedBuffers) {
+        this.owner = owner;
+        this.numGuaranteedBuffers = numGuaranteedBuffers;
+    }
+
+    public Object getOwner() {
+        return owner;
+    }
+
+    public int getNumGuaranteedBuffers() {
+        return numGuaranteedBuffers;
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImplTest.java
new file mode 100644
index 00000000000..e4591bd0021
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImplTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** Tests for {@link TieredStorageMemoryManagerImpl}. */
+public class TieredStorageMemoryManagerImplTest {
+
+    private static final int NETWORK_BUFFER_SIZE = 1024;
+
+    private static final int NUM_TOTAL_BUFFERS = 1000;
+
+    private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f;
+
+    private NetworkBufferPool globalPool;
+
+    private List<BufferBuilder> requestedBuffers;
+
+    private CompletableFuture<Void> hasReclaimBufferFinished;
+
+    private int reclaimBufferCounter;
+
+    @BeforeEach
+    void before() {
+        globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, 
NETWORK_BUFFER_SIZE);
+        requestedBuffers = new ArrayList<>();
+        hasReclaimBufferFinished = new CompletableFuture<>();
+        reclaimBufferCounter = 0;
+    }
+
+    @AfterEach
+    void after() {
+        globalPool.destroy();
+    }
+
+    @Test
+    void testRequestAndRecycleBuffers() throws IOException {
+        int numBuffers = 1;
+
+        BufferPool bufferPool = globalPool.createBufferPool(numBuffers, 
numBuffers);
+        TieredStorageMemoryManagerImpl storageMemoryManager =
+                createStorageMemoryManager(
+                        bufferPool,
+                        Collections.singletonList(new 
TieredStorageMemorySpec(this, 0)));
+        assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(0);
+        BufferBuilder builder = 
storageMemoryManager.requestBufferBlocking(this);
+        assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(1);
+        recycleBufferBuilder(builder);
+        assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(0);
+        storageMemoryManager.release();
+    }
+
+    @Test
+    void testGetMaxNonReclaimableBuffers() throws IOException {
+        int numBuffers = 10;
+        int numExclusive = 5;
+
+        TieredStorageMemoryManagerImpl storageMemoryManager =
+                createStorageMemoryManager(
+                        numBuffers,
+                        Collections.singletonList(new 
TieredStorageMemorySpec(this, numExclusive)));
+
+        List<BufferBuilder> requestedBuffers = new ArrayList<>();
+        for (int i = 1; i <= numBuffers; i++) {
+            
requestedBuffers.add(storageMemoryManager.requestBufferBlocking(this));
+            assertThat(storageMemoryManager.getMaxNonReclaimableBuffers(this))
+                    .isEqualTo(numBuffers);
+            int numExpectedAvailable = numBuffers - i;
+            assertThat(
+                            
storageMemoryManager.getMaxNonReclaimableBuffers(this)
+                                    - 
storageMemoryManager.numOwnerRequestedBuffer(this))
+                    .isEqualTo(numExpectedAvailable);
+        }
+
+        
requestedBuffers.forEach(TieredStorageMemoryManagerImplTest::recycleBufferBuilder);
+        storageMemoryManager.release();
+    }
+
+    @Test
+    void testNumMaxNonReclaimableWhenOtherUseLessThanGuaranteed() throws 
IOException {
+        int numBuffers = 10;
+        int numExclusive = 4;
+
+        List<TieredStorageMemorySpec> storageMemorySpecs = new ArrayList<>();
+        Object otherUser = new Object();
+        storageMemorySpecs.add(new TieredStorageMemorySpec(this, 0));
+        storageMemorySpecs.add(new TieredStorageMemorySpec(otherUser, 
numExclusive));
+        TieredStorageMemoryManagerImpl storageMemoryManager =
+                createStorageMemoryManager(numBuffers, storageMemorySpecs);
+
+        List<BufferBuilder> requestedBuffers = new ArrayList<>();
+        assertThat(storageMemoryManager.getMaxNonReclaimableBuffers(this))
+                .isEqualTo(numBuffers - numExclusive);
+        for (int i = 1; i <= numBuffers; i++) {
+            
requestedBuffers.add(storageMemoryManager.requestBufferBlocking(this));
+            assertThat(storageMemoryManager.getMaxNonReclaimableBuffers(this))
+                    .isEqualTo(numBuffers - numExclusive);
+            int numExpectedAvailable = numBuffers - i - numExclusive;
+            assertThat(
+                            
storageMemoryManager.getMaxNonReclaimableBuffers(this)
+                                    - 
storageMemoryManager.numOwnerRequestedBuffer(this))
+                    .isEqualTo(numExpectedAvailable);
+        }
+
+        
requestedBuffers.forEach(TieredStorageMemoryManagerImplTest::recycleBufferBuilder);
+        storageMemoryManager.release();
+    }
+
+    @Test
+    void testNumMaxNonReclaimableWhenOtherUseMoreThanGuaranteed() throws 
IOException {
+        int numBuffers = 10;
+        int numExclusive = 4;
+
+        List<TieredStorageMemorySpec> storageMemorySpecs = new ArrayList<>();
+        Object otherUser = new Object();
+        storageMemorySpecs.add(new TieredStorageMemorySpec(this, 0));
+        storageMemorySpecs.add(new TieredStorageMemorySpec(otherUser, 
numExclusive));
+        TieredStorageMemoryManagerImpl storageMemoryManager =
+                createStorageMemoryManager(numBuffers, storageMemorySpecs);
+
+        int numRequestedByOtherUser = numExclusive + 1;
+        for (int i = 0; i < numRequestedByOtherUser; i++) {
+            
requestedBuffers.add(storageMemoryManager.requestBufferBlocking(otherUser));
+        }
+
+        assertThat(storageMemoryManager.getMaxNonReclaimableBuffers(this))
+                .isEqualTo(numBuffers - numRequestedByOtherUser);
+        for (int i = 1; i <= numBuffers - numRequestedByOtherUser; i++) {
+            
requestedBuffers.add(storageMemoryManager.requestBufferBlocking(this));
+            assertThat(storageMemoryManager.getMaxNonReclaimableBuffers(this))
+                    .isEqualTo(numBuffers - numRequestedByOtherUser);
+            int numExpectedAvailable = numBuffers - i - 
numRequestedByOtherUser;
+            assertThat(
+                            
storageMemoryManager.getMaxNonReclaimableBuffers(this)
+                                    - 
storageMemoryManager.numOwnerRequestedBuffer(this))
+                    .isEqualTo(numExpectedAvailable);
+        }
+        assertThat(storageMemoryManager.numOwnerRequestedBuffer(this))
+                .isEqualTo(numBuffers - numRequestedByOtherUser);
+        assertThat(storageMemoryManager.numOwnerRequestedBuffer(otherUser))
+                .isEqualTo(numRequestedByOtherUser);
+
+        
requestedBuffers.forEach(TieredStorageMemoryManagerImplTest::recycleBufferBuilder);
+        storageMemoryManager.release();
+    }
+
+    @Test
+    @Timeout(60)
+    void testTriggerReclaimBuffers() throws IOException {
+        int numBuffers = 5;
+
+        TieredStorageMemoryManagerImpl storageMemoryManager =
+                createStorageMemoryManager(
+                        numBuffers,
+                        Collections.singletonList(new 
TieredStorageMemorySpec(this, 0)));
+        
storageMemoryManager.listenBufferReclaimRequest(this::onBufferReclaimRequest);
+
+        int numBuffersBeforeTriggerReclaim = (int) (numBuffers * 
NUM_BUFFERS_TRIGGER_FLUSH_RATIO);
+        for (int i = 0; i < numBuffersBeforeTriggerReclaim; i++) {
+            
requestedBuffers.add(storageMemoryManager.requestBufferBlocking(this));
+        }
+
+        assertThat(reclaimBufferCounter).isEqualTo(0);
+        
assertThat(requestedBuffers.size()).isEqualTo(numBuffersBeforeTriggerReclaim);
+        requestedBuffers.add(storageMemoryManager.requestBufferBlocking(this));
+        assertThatFuture(hasReclaimBufferFinished).eventuallySucceeds();
+        assertThat(reclaimBufferCounter).isEqualTo(1);
+        assertThat(requestedBuffers.size()).isEqualTo(1);
+        recycleRequestedBuffers();
+
+        storageMemoryManager.release();
+    }
+
+    @Test
+    void testReleaseBeforeRecyclingBuffers() throws IOException {
+        int numBuffers = 5;
+
+        TieredStorageMemoryManagerImpl storageMemoryManager =
+                createStorageMemoryManager(
+                        numBuffers,
+                        Collections.singletonList(new 
TieredStorageMemorySpec(this, 0)));
+        requestedBuffers.add(storageMemoryManager.requestBufferBlocking(this));
+        
assertThatThrownBy(storageMemoryManager::release).isInstanceOf(IllegalStateException.class);
+        recycleRequestedBuffers();
+        storageMemoryManager.release();
+    }
+
+    @Test
+    void testLeakingBuffers() throws IOException {
+        int numBuffers = 10;
+
+        TieredStorageMemoryManagerImpl storageMemoryManager =
+                createStorageMemoryManager(
+                        numBuffers,
+                        Collections.singletonList(new 
TieredStorageMemorySpec(this, 0)));
+
+        requestedBuffers.add(storageMemoryManager.requestBufferBlocking(this));
+        assertThatThrownBy(storageMemoryManager::release)
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("Leaking buffers");
+        recycleRequestedBuffers();
+        storageMemoryManager.release();
+    }
+
+    public void onBufferReclaimRequest() {
+        reclaimBufferCounter++;
+        recycleRequestedBuffers();
+        hasReclaimBufferFinished.complete(null);
+    }
+
+    private void recycleRequestedBuffers() {
+        requestedBuffers.forEach(
+                builder -> {
+                    BufferConsumer bufferConsumer = 
builder.createBufferConsumer();
+                    Buffer buffer = bufferConsumer.build();
+                    buffer.getRecycler().recycle(buffer.getMemorySegment());
+                });
+        requestedBuffers.clear();
+    }
+
+    private TieredStorageMemoryManagerImpl createStorageMemoryManager(
+            int numBuffersInBufferPool, List<TieredStorageMemorySpec> 
storageMemorySpecs)
+            throws IOException {
+        BufferPool bufferPool =
+                globalPool.createBufferPool(numBuffersInBufferPool, 
numBuffersInBufferPool);
+        return createStorageMemoryManager(bufferPool, storageMemorySpecs);
+    }
+
+    private TieredStorageMemoryManagerImpl createStorageMemoryManager(
+            BufferPool bufferPool, List<TieredStorageMemorySpec> 
storageMemorySpecs) {
+        TieredStorageMemoryManagerImpl storageProducerMemoryManager =
+                new 
TieredStorageMemoryManagerImpl(NUM_BUFFERS_TRIGGER_FLUSH_RATIO, true);
+        storageProducerMemoryManager.setup(bufferPool, storageMemorySpecs);
+        return storageProducerMemoryManager;
+    }
+
+    private static void recycleBufferBuilder(BufferBuilder bufferBuilder) {
+        BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
+        Buffer buffer = bufferConsumer.build();
+        NetworkBuffer networkBuffer =
+                new NetworkBuffer(
+                        buffer.getMemorySegment(), buffer.getRecycler(), 
buffer.getDataType());
+        networkBuffer.recycleBuffer();
+    }
+}


Reply via email to