xintongsong commented on code in PR #22352:
URL: https://github.com/apache/flink/pull/22352#discussion_r1193288010


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+
+import java.util.List;
+
+/**
+ * The {@link TieredStorageMemoryManager} is to request or recycle buffers 
from {@link
+ * LocalBufferPool} for different memory owners, for example, the tiers, the 
buffer accumulator,
+ * etc. Note that the logic for requesting and recycling buffers is consistent 
for these owners.
+ *
+ * <p>The memory managed by {@link TieredStorageMemoryManager} is categorized 
into two types:
+ * long-term occupied memory which cannot be immediately released and 
short-term occupied memory
+ * which can be reclaimed quickly and safely. Long-term occupied memory usage 
necessitates waiting
+ * for other operations to complete before releasing it, such as downstream 
consumption. On the
+ * other hand, short-term occupied memory can be freed up at any time, 
enabling rapid memory
+ * recycling for tasks such as flushing memory to disk or remote storage.
+ *
+ * <p>This {@link TieredStorageMemoryManager} aim to streamline and harmonize 
memory management
+ * across various layers. Instead of tracking the number of buffers utilized 
by individual users, it
+ * dynamically calculates a user's maximum guaranteed amount based on the 
current status of the
+ * manager and the local buffer pool. Specifically, if a user is a long-term 
occupied memory user,
+ * the {@link TieredStorageMemoryManager} does not limit the user's memory 
usage, while if a user is
+ * a short-term occupied memory user, the current guaranteed buffers of the 
user is the left buffers
+ * in the buffer pool - guaranteed amount of other users (excluding the 
current user).

Review Comment:
   This is a bit hard to understand. I think the memory manager never limit 
users' memory usage. It may not need to understand the long-term and short-term 
differences.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+
+import java.util.List;
+
+/**
+ * The {@link TieredStorageMemoryManager} is to request or recycle buffers 
from {@link
+ * LocalBufferPool} for different memory owners, for example, the tiers, the 
buffer accumulator,
+ * etc. Note that the logic for requesting and recycling buffers is consistent 
for these owners.
+ *
+ * <p>The memory managed by {@link TieredStorageMemoryManager} is categorized 
into two types:
+ * long-term occupied memory which cannot be immediately released and 
short-term occupied memory
+ * which can be reclaimed quickly and safely. Long-term occupied memory usage 
necessitates waiting
+ * for other operations to complete before releasing it, such as downstream 
consumption. On the
+ * other hand, short-term occupied memory can be freed up at any time, 
enabling rapid memory
+ * recycling for tasks such as flushing memory to disk or remote storage.
+ *
+ * <p>This {@link TieredStorageMemoryManager} aim to streamline and harmonize 
memory management
+ * across various layers. Instead of tracking the number of buffers utilized 
by individual users, it
+ * dynamically calculates a user's maximum guaranteed amount based on the 
current status of the
+ * manager and the local buffer pool. Specifically, if a user is a long-term 
occupied memory user,
+ * the {@link TieredStorageMemoryManager} does not limit the user's memory 
usage, while if a user is
+ * a short-term occupied memory user, the current guaranteed buffers of the 
user is the left buffers
+ * in the buffer pool - guaranteed amount of other users (excluding the 
current user).
+ */
+public interface TieredStorageMemoryManager {
+
+    /**
+     * Setup the {@link TieredStorageMemoryManager}. When setting up the 
manager, the {@link
+     * TieredStorageMemorySpec}s for different tiered storages should be ready 
to indicate each
+     * tiered storage's memory requirement specs.
+     *
+     * @param bufferPool the local buffer pool
+     * @param storageMemorySpecs the memory specs for different tiered storages
+     */
+    void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> 
storageMemorySpecs);
+
+    /**
+     * Register a buffer flush call back to flush and recycle all the memory 
in the user when
+     * needed.
+     *
+     * <p>When the left buffers in the {@link BufferPool} are not enough, 
{@link
+     * TieredStorageMemoryManager} will flush the buffers of the user with the 
registered callback.
+     *
+     * @param userBufferFlushCallback the buffer flush call back of the memory 
user
+     */
+    void registerBufferFlushCallback(Runnable userBufferFlushCallback);
+
+    /**
+     * Request a {@link BufferBuilder} instance from {@link LocalBufferPool} 
for a specific owner.
+     *
+     * @return the requested buffer
+     */
+    BufferBuilder requestBufferBlocking();
+
+    /**
+     * Return the available buffers for the owner.
+     *
+     * <p>Note that the available buffers are calculated dynamically based on 
some conditions, for
+     * example, the state of the {@link BufferPool}, the {@link 
TieredStorageMemorySpec} of the
+     * owner, etc.
+     *
+     * <p>If a user is a long-term occupied memory user, the {@link 
TieredStorageMemoryManager} does
+     * not limit the user's memory usage, while if a user is a short-term 
occupied memory user, the
+     * max allowed buffers of the user is the left buffers in the buffer pool 
- guaranteed amount of
+     * other users (excluding the current user).

Review Comment:
   These are internal.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation for {@link TieredStorageMemoryManager}. This is to 
request or recycle buffers
+ * from {@link LocalBufferPool} for different memory owners, for example, the 
tiers, the buffer
+ * accumulator, etc.
+ *
+ * <p>Note that the memory owner should register its {@link 
TieredStorageMemorySpec} firstly before
+ * requesting buffers.
+ */
+public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManager {
+
+    /** Initial delay before checking flush. */
+    public static final int DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS = 10;
+
+    /** Check flush period. */
+    public static final int DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS = 50;
+
+    /** The tiered storage memory specs of each memory user owner. */
+    private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs;
+
+    /** The registered callbacks to flush the buffers in the registered tiered 
storages. */
+    private final List<Runnable> bufferFlushCallbacks;
+
+    /** The buffer pool usage ratio of triggering the registered storages to 
flush buffers. */
+    private final float numBuffersTriggerFlushRatio;
+
+    /**
+     * Indicate whether to start the buffer flush checker thread. If the 
memory manager is used in
+     * downstream, the field will be false because no buffer flush checker is 
needed.
+     */
+    private final boolean shouldStartBufferFlushChecker;

Review Comment:
   Why do we need periodical flush while we already have checks on requesting 
buffers?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+
+import java.util.List;
+
+/**
+ * The {@link TieredStorageMemoryManager} is to request or recycle buffers 
from {@link
+ * LocalBufferPool} for different memory owners, for example, the tiers, the 
buffer accumulator,
+ * etc. Note that the logic for requesting and recycling buffers is consistent 
for these owners.
+ *
+ * <p>The memory managed by {@link TieredStorageMemoryManager} is categorized 
into two types:
+ * long-term occupied memory which cannot be immediately released and 
short-term occupied memory
+ * which can be reclaimed quickly and safely. Long-term occupied memory usage 
necessitates waiting
+ * for other operations to complete before releasing it, such as downstream 
consumption. On the
+ * other hand, short-term occupied memory can be freed up at any time, 
enabling rapid memory
+ * recycling for tasks such as flushing memory to disk or remote storage.
+ *
+ * <p>This {@link TieredStorageMemoryManager} aim to streamline and harmonize 
memory management
+ * across various layers. Instead of tracking the number of buffers utilized 
by individual users, it
+ * dynamically calculates a user's maximum guaranteed amount based on the 
current status of the
+ * manager and the local buffer pool. Specifically, if a user is a long-term 
occupied memory user,
+ * the {@link TieredStorageMemoryManager} does not limit the user's memory 
usage, while if a user is
+ * a short-term occupied memory user, the current guaranteed buffers of the 
user is the left buffers
+ * in the buffer pool - guaranteed amount of other users (excluding the 
current user).
+ */
+public interface TieredStorageMemoryManager {
+
+    /**
+     * Setup the {@link TieredStorageMemoryManager}. When setting up the 
manager, the {@link
+     * TieredStorageMemorySpec}s for different tiered storages should be ready 
to indicate each
+     * tiered storage's memory requirement specs.
+     *
+     * @param bufferPool the local buffer pool
+     * @param storageMemorySpecs the memory specs for different tiered storages
+     */
+    void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> 
storageMemorySpecs);
+
+    /**
+     * Register a buffer flush call back to flush and recycle all the memory 
in the user when
+     * needed.
+     *
+     * <p>When the left buffers in the {@link BufferPool} are not enough, 
{@link
+     * TieredStorageMemoryManager} will flush the buffers of the user with the 
registered callback.
+     *
+     * @param userBufferFlushCallback the buffer flush call back of the memory 
user
+     */
+    void registerBufferFlushCallback(Runnable userBufferFlushCallback);
+
+    /**
+     * Request a {@link BufferBuilder} instance from {@link LocalBufferPool} 
for a specific owner.
+     *
+     * @return the requested buffer
+     */
+    BufferBuilder requestBufferBlocking();
+
+    /**
+     * Return the available buffers for the owner.
+     *
+     * <p>Note that the available buffers are calculated dynamically based on 
some conditions, for
+     * example, the state of the {@link BufferPool}, the {@link 
TieredStorageMemorySpec} of the
+     * owner, etc.
+     *
+     * <p>If a user is a long-term occupied memory user, the {@link 
TieredStorageMemoryManager} does
+     * not limit the user's memory usage, while if a user is a short-term 
occupied memory user, the
+     * max allowed buffers of the user is the left buffers in the buffer pool 
- guaranteed amount of
+     * other users (excluding the current user).
+     */
+    int getMaxAllowedBuffers(Object owner);

Review Comment:
   ```suggestion
       int getMaxNonReclaimableBuffers(Object owner);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+
+import java.util.List;
+
+/**
+ * The {@link TieredStorageMemoryManager} is to request or recycle buffers 
from {@link
+ * LocalBufferPool} for different memory owners, for example, the tiers, the 
buffer accumulator,
+ * etc. Note that the logic for requesting and recycling buffers is consistent 
for these owners.
+ *
+ * <p>The memory managed by {@link TieredStorageMemoryManager} is categorized 
into two types:
+ * long-term occupied memory which cannot be immediately released and 
short-term occupied memory
+ * which can be reclaimed quickly and safely. Long-term occupied memory usage 
necessitates waiting
+ * for other operations to complete before releasing it, such as downstream 
consumption. On the
+ * other hand, short-term occupied memory can be freed up at any time, 
enabling rapid memory
+ * recycling for tasks such as flushing memory to disk or remote storage.
+ *
+ * <p>This {@link TieredStorageMemoryManager} aim to streamline and harmonize 
memory management
+ * across various layers. Instead of tracking the number of buffers utilized 
by individual users, it
+ * dynamically calculates a user's maximum guaranteed amount based on the 
current status of the
+ * manager and the local buffer pool. Specifically, if a user is a long-term 
occupied memory user,
+ * the {@link TieredStorageMemoryManager} does not limit the user's memory 
usage, while if a user is
+ * a short-term occupied memory user, the current guaranteed buffers of the 
user is the left buffers
+ * in the buffer pool - guaranteed amount of other users (excluding the 
current user).
+ */
+public interface TieredStorageMemoryManager {
+
+    /**
+     * Setup the {@link TieredStorageMemoryManager}. When setting up the 
manager, the {@link
+     * TieredStorageMemorySpec}s for different tiered storages should be ready 
to indicate each
+     * tiered storage's memory requirement specs.
+     *
+     * @param bufferPool the local buffer pool
+     * @param storageMemorySpecs the memory specs for different tiered storages
+     */
+    void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> 
storageMemorySpecs);
+
+    /**
+     * Register a buffer flush call back to flush and recycle all the memory 
in the user when
+     * needed.
+     *
+     * <p>When the left buffers in the {@link BufferPool} are not enough, 
{@link
+     * TieredStorageMemoryManager} will flush the buffers of the user with the 
registered callback.
+     *
+     * @param userBufferFlushCallback the buffer flush call back of the memory 
user
+     */
+    void registerBufferFlushCallback(Runnable userBufferFlushCallback);

Review Comment:
   ```suggestion
       void listenBufferReclaimRequest(Runnable onBufferReclaimRequest);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+
+import java.util.List;
+
+/**
+ * The {@link TieredStorageMemoryManager} is to request or recycle buffers 
from {@link
+ * LocalBufferPool} for different memory owners, for example, the tiers, the 
buffer accumulator,
+ * etc. Note that the logic for requesting and recycling buffers is consistent 
for these owners.
+ *
+ * <p>The memory managed by {@link TieredStorageMemoryManager} is categorized 
into two types:
+ * long-term occupied memory which cannot be immediately released and 
short-term occupied memory
+ * which can be reclaimed quickly and safely. Long-term occupied memory usage 
necessitates waiting
+ * for other operations to complete before releasing it, such as downstream 
consumption. On the
+ * other hand, short-term occupied memory can be freed up at any time, 
enabling rapid memory
+ * recycling for tasks such as flushing memory to disk or remote storage.
+ *
+ * <p>This {@link TieredStorageMemoryManager} aim to streamline and harmonize 
memory management
+ * across various layers. Instead of tracking the number of buffers utilized 
by individual users, it
+ * dynamically calculates a user's maximum guaranteed amount based on the 
current status of the
+ * manager and the local buffer pool. Specifically, if a user is a long-term 
occupied memory user,
+ * the {@link TieredStorageMemoryManager} does not limit the user's memory 
usage, while if a user is
+ * a short-term occupied memory user, the current guaranteed buffers of the 
user is the left buffers
+ * in the buffer pool - guaranteed amount of other users (excluding the 
current user).
+ */
+public interface TieredStorageMemoryManager {
+
+    /**
+     * Setup the {@link TieredStorageMemoryManager}. When setting up the 
manager, the {@link
+     * TieredStorageMemorySpec}s for different tiered storages should be ready 
to indicate each
+     * tiered storage's memory requirement specs.
+     *
+     * @param bufferPool the local buffer pool
+     * @param storageMemorySpecs the memory specs for different tiered storages
+     */
+    void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> 
storageMemorySpecs);
+
+    /**
+     * Register a buffer flush call back to flush and recycle all the memory 
in the user when
+     * needed.
+     *
+     * <p>When the left buffers in the {@link BufferPool} are not enough, 
{@link
+     * TieredStorageMemoryManager} will flush the buffers of the user with the 
registered callback.
+     *
+     * @param userBufferFlushCallback the buffer flush call back of the memory 
user
+     */
+    void registerBufferFlushCallback(Runnable userBufferFlushCallback);
+
+    /**
+     * Request a {@link BufferBuilder} instance from {@link LocalBufferPool} 
for a specific owner.
+     *
+     * @return the requested buffer
+     */
+    BufferBuilder requestBufferBlocking();
+
+    /**
+     * Return the available buffers for the owner.
+     *
+     * <p>Note that the available buffers are calculated dynamically based on 
some conditions, for
+     * example, the state of the {@link BufferPool}, the {@link 
TieredStorageMemorySpec} of the
+     * owner, etc.

Review Comment:
   So the caller should always check before requesting non-reclaimable buffers.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation for {@link TieredStorageMemoryManager}. This is to 
request or recycle buffers
+ * from {@link LocalBufferPool} for different memory owners, for example, the 
tiers, the buffer
+ * accumulator, etc.
+ *
+ * <p>Note that the memory owner should register its {@link 
TieredStorageMemorySpec} firstly before
+ * requesting buffers.
+ */
+public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManager {
+
+    /** Initial delay before checking flush. */
+    public static final int DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS = 10;
+
+    /** Check flush period. */
+    public static final int DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS = 50;
+
+    /** The tiered storage memory specs of each memory user owner. */
+    private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs;
+
+    /** The registered callbacks to flush the buffers in the registered tiered 
storages. */
+    private final List<Runnable> bufferFlushCallbacks;
+
+    /** The buffer pool usage ratio of triggering the registered storages to 
flush buffers. */
+    private final float numBuffersTriggerFlushRatio;
+
+    /**
+     * Indicate whether to start the buffer flush checker thread. If the 
memory manager is used in
+     * downstream, the field will be false because no buffer flush checker is 
needed.
+     */
+    private final boolean shouldStartBufferFlushChecker;
+
+    /** The number of requested buffers from {@link BufferPool}. */
+    private final AtomicInteger numRequestedBuffers;
+
+    /** A thread to check whether to flush buffers in each tiered storage. */
+    private ScheduledExecutorService executor;
+
+    /** The total number of guaranteed buffers for all tiered storages. */
+    private int numTotalGuaranteedBuffers;
+
+    /** The buffer pool where the buffer is requested or recyceld. */
+    private BufferPool bufferPool;
+
+    /**
+     * Indicate whether the {@link TieredStorageMemoryManagerImpl} is in 
running state. Before
+     * setting up, this field is false.
+     *
+     * <p>Note that before requesting buffers or getting the maximum allowed 
buffers, this running
+     * state should be checked.
+     */
+    private boolean isRunning;
+
+    /**
+     * The constructor of the {@link TieredStorageMemoryManagerImpl}.
+     *
+     * @param numBuffersTriggerFlushRatio the buffer pool usage ratio of 
triggering each tiered
+     *     storage to flush buffers
+     * @param shouldStartBufferFlushChecker indicate whether to start the 
buffer flushing checker
+     *     thread
+     */
+    public TieredStorageMemoryManagerImpl(
+            float numBuffersTriggerFlushRatio, boolean 
shouldStartBufferFlushChecker) {
+        this.numBuffersTriggerFlushRatio = numBuffersTriggerFlushRatio;
+        this.shouldStartBufferFlushChecker = shouldStartBufferFlushChecker;
+        this.tieredMemorySpecs = new HashMap<>();
+        this.numRequestedBuffers = new AtomicInteger(0);
+        this.bufferFlushCallbacks = new ArrayList<>();
+        this.isRunning = false;
+    }
+
+    @Override
+    public void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> 
storageMemorySpecs) {
+        this.bufferPool = bufferPool;
+        for (TieredStorageMemorySpec memorySpec : storageMemorySpecs) {
+            checkState(
+                    !tieredMemorySpecs.containsKey(memorySpec.getOwner()),
+                    "Duplicated memory spec.");
+            tieredMemorySpecs.put(memorySpec.getOwner(), memorySpec);
+            numTotalGuaranteedBuffers += memorySpec.getNumGuaranteedBuffers();
+        }
+
+        if (shouldStartBufferFlushChecker) {
+            this.executor =
+                    Executors.newSingleThreadScheduledExecutor(
+                            new ThreadFactoryBuilder()
+                                    .setNameFormat("buffer flush checker")
+                                    
.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE)
+                                    .build());
+            this.executor.scheduleWithFixedDelay(
+                    this::checkShouldFlushCachedBuffers,
+                    DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS,
+                    DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS,
+                    TimeUnit.MILLISECONDS);
+        }
+
+        this.isRunning = true;
+    }
+
+    @Override
+    public void registerBufferFlushCallback(Runnable userBufferFlushCallBack) {
+        bufferFlushCallbacks.add(userBufferFlushCallBack);
+    }
+
+    /**
+     * Request a {@link BufferBuilder} instance from {@link BufferPool} for a 
specific owner. The
+     * {@link TieredStorageMemoryManagerImpl} will not check whether a buffer 
can be requested and
+     * only record the total number of requested buffers. If the buffers in 
the {@link BufferPool}
+     * is not enough, this will trigger each tiered storage to flush buffers 
as much as possible.
+     *
+     * @return the requested buffer
+     */
+    @Override
+    public BufferBuilder requestBufferBlocking() {
+        checkIsRunning();
+
+        MemorySegment requestedBuffer = null;
+        try {
+            requestedBuffer = bufferPool.requestMemorySegmentBlocking();
+        } catch (Throwable throwable) {
+            ExceptionUtils.rethrow(throwable, "Failed to request memory 
segments.");
+        }
+        numRequestedBuffers.incrementAndGet();
+        checkShouldFlushCachedBuffers();

Review Comment:
   Shouldn't this happen before requesting?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation for {@link TieredStorageMemoryManager}. This is to 
request or recycle buffers
+ * from {@link LocalBufferPool} for different memory owners, for example, the 
tiers, the buffer
+ * accumulator, etc.
+ *
+ * <p>Note that the memory owner should register its {@link 
TieredStorageMemorySpec} firstly before
+ * requesting buffers.
+ */
+public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManager {
+
+    /** Initial delay before checking flush. */
+    public static final int DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS = 10;
+
+    /** Check flush period. */
+    public static final int DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS = 50;

Review Comment:
   private



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation for {@link TieredStorageMemoryManager}. This is to 
request or recycle buffers
+ * from {@link LocalBufferPool} for different memory owners, for example, the 
tiers, the buffer
+ * accumulator, etc.
+ *
+ * <p>Note that the memory owner should register its {@link 
TieredStorageMemorySpec} firstly before
+ * requesting buffers.
+ */
+public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManager {
+
+    /** Initial delay before checking flush. */
+    public static final int DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS = 10;
+
+    /** Check flush period. */
+    public static final int DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS = 50;
+
+    /** The tiered storage memory specs of each memory user owner. */
+    private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs;
+
+    /** The registered callbacks to flush the buffers in the registered tiered 
storages. */
+    private final List<Runnable> bufferFlushCallbacks;
+
+    /** The buffer pool usage ratio of triggering the registered storages to 
flush buffers. */
+    private final float numBuffersTriggerFlushRatio;
+
+    /**
+     * Indicate whether to start the buffer flush checker thread. If the 
memory manager is used in
+     * downstream, the field will be false because no buffer flush checker is 
needed.
+     */
+    private final boolean shouldStartBufferFlushChecker;
+
+    /** The number of requested buffers from {@link BufferPool}. */
+    private final AtomicInteger numRequestedBuffers;
+
+    /** A thread to check whether to flush buffers in each tiered storage. */
+    private ScheduledExecutorService executor;
+
+    /** The total number of guaranteed buffers for all tiered storages. */
+    private int numTotalGuaranteedBuffers;
+
+    /** The buffer pool where the buffer is requested or recyceld. */
+    private BufferPool bufferPool;
+
+    /**
+     * Indicate whether the {@link TieredStorageMemoryManagerImpl} is in 
running state. Before
+     * setting up, this field is false.
+     *
+     * <p>Note that before requesting buffers or getting the maximum allowed 
buffers, this running
+     * state should be checked.
+     */
+    private boolean isRunning;
+
+    /**
+     * The constructor of the {@link TieredStorageMemoryManagerImpl}.
+     *
+     * @param numBuffersTriggerFlushRatio the buffer pool usage ratio of 
triggering each tiered
+     *     storage to flush buffers
+     * @param shouldStartBufferFlushChecker indicate whether to start the 
buffer flushing checker
+     *     thread
+     */
+    public TieredStorageMemoryManagerImpl(
+            float numBuffersTriggerFlushRatio, boolean 
shouldStartBufferFlushChecker) {
+        this.numBuffersTriggerFlushRatio = numBuffersTriggerFlushRatio;
+        this.shouldStartBufferFlushChecker = shouldStartBufferFlushChecker;
+        this.tieredMemorySpecs = new HashMap<>();
+        this.numRequestedBuffers = new AtomicInteger(0);
+        this.bufferFlushCallbacks = new ArrayList<>();
+        this.isRunning = false;
+    }
+
+    @Override
+    public void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> 
storageMemorySpecs) {
+        this.bufferPool = bufferPool;
+        for (TieredStorageMemorySpec memorySpec : storageMemorySpecs) {
+            checkState(
+                    !tieredMemorySpecs.containsKey(memorySpec.getOwner()),
+                    "Duplicated memory spec.");
+            tieredMemorySpecs.put(memorySpec.getOwner(), memorySpec);
+            numTotalGuaranteedBuffers += memorySpec.getNumGuaranteedBuffers();
+        }
+
+        if (shouldStartBufferFlushChecker) {
+            this.executor =
+                    Executors.newSingleThreadScheduledExecutor(
+                            new ThreadFactoryBuilder()
+                                    .setNameFormat("buffer flush checker")
+                                    
.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE)
+                                    .build());
+            this.executor.scheduleWithFixedDelay(
+                    this::checkShouldFlushCachedBuffers,
+                    DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS,
+                    DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS,
+                    TimeUnit.MILLISECONDS);
+        }
+
+        this.isRunning = true;
+    }
+
+    @Override
+    public void registerBufferFlushCallback(Runnable userBufferFlushCallBack) {
+        bufferFlushCallbacks.add(userBufferFlushCallBack);
+    }
+
+    /**
+     * Request a {@link BufferBuilder} instance from {@link BufferPool} for a 
specific owner. The
+     * {@link TieredStorageMemoryManagerImpl} will not check whether a buffer 
can be requested and
+     * only record the total number of requested buffers. If the buffers in 
the {@link BufferPool}
+     * is not enough, this will trigger each tiered storage to flush buffers 
as much as possible.
+     *
+     * @return the requested buffer
+     */
+    @Override
+    public BufferBuilder requestBufferBlocking() {
+        checkIsRunning();
+
+        MemorySegment requestedBuffer = null;
+        try {
+            requestedBuffer = bufferPool.requestMemorySegmentBlocking();
+        } catch (Throwable throwable) {
+            ExceptionUtils.rethrow(throwable, "Failed to request memory 
segments.");
+        }
+        numRequestedBuffers.incrementAndGet();
+        checkShouldFlushCachedBuffers();
+        return new BufferBuilder(checkNotNull(requestedBuffer), 
this::recycleBuffer);
+    }
+
+    @Override
+    public int getMaxAllowedBuffers(Object owner) {
+        checkIsRunning();
+
+        TieredStorageMemorySpec ownerMemorySpec = 
checkNotNull(tieredMemorySpecs.get(owner));
+        if (ownerMemorySpec.isMemoryReleasable()) {
+            return Integer.MAX_VALUE;
+        } else {

Review Comment:
   We may not need the releasable / unreleasable at all.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation for {@link TieredStorageMemoryManager}. This is to 
request or recycle buffers
+ * from {@link LocalBufferPool} for different memory owners, for example, the 
tiers, the buffer
+ * accumulator, etc.
+ *
+ * <p>Note that the memory owner should register its {@link 
TieredStorageMemorySpec} firstly before
+ * requesting buffers.
+ */
+public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManager {
+
+    /** Initial delay before checking flush. */
+    public static final int DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS = 10;
+
+    /** Check flush period. */
+    public static final int DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS = 50;
+
+    /** The tiered storage memory specs of each memory user owner. */
+    private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs;
+
+    /** The registered callbacks to flush the buffers in the registered tiered 
storages. */
+    private final List<Runnable> bufferFlushCallbacks;
+
+    /** The buffer pool usage ratio of triggering the registered storages to 
flush buffers. */
+    private final float numBuffersTriggerFlushRatio;
+
+    /**
+     * Indicate whether to start the buffer flush checker thread. If the 
memory manager is used in
+     * downstream, the field will be false because no buffer flush checker is 
needed.
+     */
+    private final boolean shouldStartBufferFlushChecker;

Review Comment:
   It seems the periodical flush is not part of the memory management.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation for {@link TieredStorageMemoryManager}. This is to 
request or recycle buffers
+ * from {@link LocalBufferPool} for different memory owners, for example, the 
tiers, the buffer
+ * accumulator, etc.
+ *
+ * <p>Note that the memory owner should register its {@link 
TieredStorageMemorySpec} firstly before
+ * requesting buffers.
+ */
+public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManager {
+
+    /** Initial delay before checking flush. */
+    public static final int DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS = 10;
+
+    /** Check flush period. */
+    public static final int DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS = 50;
+
+    /** The tiered storage memory specs of each memory user owner. */
+    private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs;
+
+    /** The registered callbacks to flush the buffers in the registered tiered 
storages. */
+    private final List<Runnable> bufferFlushCallbacks;
+
+    /** The buffer pool usage ratio of triggering the registered storages to 
flush buffers. */
+    private final float numBuffersTriggerFlushRatio;
+
+    /**
+     * Indicate whether to start the buffer flush checker thread. If the 
memory manager is used in
+     * downstream, the field will be false because no buffer flush checker is 
needed.
+     */
+    private final boolean shouldStartBufferFlushChecker;
+
+    /** The number of requested buffers from {@link BufferPool}. */
+    private final AtomicInteger numRequestedBuffers;
+
+    /** A thread to check whether to flush buffers in each tiered storage. */
+    private ScheduledExecutorService executor;
+
+    /** The total number of guaranteed buffers for all tiered storages. */
+    private int numTotalGuaranteedBuffers;
+
+    /** The buffer pool where the buffer is requested or recyceld. */
+    private BufferPool bufferPool;
+
+    /**
+     * Indicate whether the {@link TieredStorageMemoryManagerImpl} is in 
running state. Before
+     * setting up, this field is false.
+     *
+     * <p>Note that before requesting buffers or getting the maximum allowed 
buffers, this running
+     * state should be checked.
+     */
+    private boolean isRunning;

Review Comment:
   For `isRunning`, we should set it to `false` in `release()`. Or we can 
rename it to `initialized`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation for {@link TieredStorageMemoryManager}. This is to 
request or recycle buffers
+ * from {@link LocalBufferPool} for different memory owners, for example, the 
tiers, the buffer
+ * accumulator, etc.
+ *
+ * <p>Note that the memory owner should register its {@link 
TieredStorageMemorySpec} firstly before
+ * requesting buffers.
+ */
+public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManager {
+
+    /** Initial delay before checking flush. */
+    public static final int DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS = 10;
+
+    /** Check flush period. */
+    public static final int DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS = 50;
+
+    /** The tiered storage memory specs of each memory user owner. */
+    private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs;
+
+    /** The registered callbacks to flush the buffers in the registered tiered 
storages. */
+    private final List<Runnable> bufferFlushCallbacks;
+
+    /** The buffer pool usage ratio of triggering the registered storages to 
flush buffers. */
+    private final float numBuffersTriggerFlushRatio;
+
+    /**
+     * Indicate whether to start the buffer flush checker thread. If the 
memory manager is used in
+     * downstream, the field will be false because no buffer flush checker is 
needed.
+     */
+    private final boolean shouldStartBufferFlushChecker;
+
+    /** The number of requested buffers from {@link BufferPool}. */
+    private final AtomicInteger numRequestedBuffers;
+
+    /** A thread to check whether to flush buffers in each tiered storage. */
+    private ScheduledExecutorService executor;
+
+    /** The total number of guaranteed buffers for all tiered storages. */
+    private int numTotalGuaranteedBuffers;
+
+    /** The buffer pool where the buffer is requested or recyceld. */
+    private BufferPool bufferPool;
+
+    /**
+     * Indicate whether the {@link TieredStorageMemoryManagerImpl} is in 
running state. Before
+     * setting up, this field is false.
+     *
+     * <p>Note that before requesting buffers or getting the maximum allowed 
buffers, this running
+     * state should be checked.
+     */
+    private boolean isRunning;
+
+    /**
+     * The constructor of the {@link TieredStorageMemoryManagerImpl}.
+     *
+     * @param numBuffersTriggerFlushRatio the buffer pool usage ratio of 
triggering each tiered
+     *     storage to flush buffers
+     * @param shouldStartBufferFlushChecker indicate whether to start the 
buffer flushing checker
+     *     thread
+     */
+    public TieredStorageMemoryManagerImpl(
+            float numBuffersTriggerFlushRatio, boolean 
shouldStartBufferFlushChecker) {
+        this.numBuffersTriggerFlushRatio = numBuffersTriggerFlushRatio;
+        this.shouldStartBufferFlushChecker = shouldStartBufferFlushChecker;
+        this.tieredMemorySpecs = new HashMap<>();
+        this.numRequestedBuffers = new AtomicInteger(0);
+        this.bufferFlushCallbacks = new ArrayList<>();
+        this.isRunning = false;
+    }
+
+    @Override
+    public void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> 
storageMemorySpecs) {
+        this.bufferPool = bufferPool;
+        for (TieredStorageMemorySpec memorySpec : storageMemorySpecs) {
+            checkState(
+                    !tieredMemorySpecs.containsKey(memorySpec.getOwner()),
+                    "Duplicated memory spec.");
+            tieredMemorySpecs.put(memorySpec.getOwner(), memorySpec);
+            numTotalGuaranteedBuffers += memorySpec.getNumGuaranteedBuffers();
+        }
+
+        if (shouldStartBufferFlushChecker) {
+            this.executor =
+                    Executors.newSingleThreadScheduledExecutor(
+                            new ThreadFactoryBuilder()
+                                    .setNameFormat("buffer flush checker")
+                                    
.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE)
+                                    .build());
+            this.executor.scheduleWithFixedDelay(
+                    this::checkShouldFlushCachedBuffers,
+                    DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS,
+                    DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS,
+                    TimeUnit.MILLISECONDS);
+        }
+
+        this.isRunning = true;
+    }
+
+    @Override
+    public void registerBufferFlushCallback(Runnable userBufferFlushCallBack) {
+        bufferFlushCallbacks.add(userBufferFlushCallBack);
+    }
+
+    /**
+     * Request a {@link BufferBuilder} instance from {@link BufferPool} for a 
specific owner. The
+     * {@link TieredStorageMemoryManagerImpl} will not check whether a buffer 
can be requested and
+     * only record the total number of requested buffers. If the buffers in 
the {@link BufferPool}
+     * is not enough, this will trigger each tiered storage to flush buffers 
as much as possible.
+     *
+     * @return the requested buffer
+     */
+    @Override
+    public BufferBuilder requestBufferBlocking() {
+        checkIsRunning();
+
+        MemorySegment requestedBuffer = null;
+        try {
+            requestedBuffer = bufferPool.requestMemorySegmentBlocking();
+        } catch (Throwable throwable) {
+            ExceptionUtils.rethrow(throwable, "Failed to request memory 
segments.");
+        }
+        numRequestedBuffers.incrementAndGet();
+        checkShouldFlushCachedBuffers();
+        return new BufferBuilder(checkNotNull(requestedBuffer), 
this::recycleBuffer);
+    }
+
+    @Override
+    public int getMaxAllowedBuffers(Object owner) {
+        checkIsRunning();
+
+        TieredStorageMemorySpec ownerMemorySpec = 
checkNotNull(tieredMemorySpecs.get(owner));
+        if (ownerMemorySpec.isMemoryReleasable()) {
+            return Integer.MAX_VALUE;
+        } else {
+            int ownerGuaranteedBuffers = 
ownerMemorySpec.getNumGuaranteedBuffers();
+            return bufferPool.getNumBuffers()
+                    - numRequestedBuffers.get()
+                    - numTotalGuaranteedBuffers
+                    + ownerGuaranteedBuffers;
+        }
+    }
+
+    @Override
+    public void release() {
+        checkState(numRequestedBuffers.get() == 0, "Leaking buffers.");
+        if (executor != null) {
+            executor.shutdown();
+            try {
+                if (!executor.awaitTermination(5L, TimeUnit.MINUTES)) {
+                    throw new TimeoutException(
+                            "Timeout for shutting down the cache flush 
executor.");
+                }
+            } catch (Exception e) {
+                ExceptionUtils.rethrow(e);
+            }
+        }
+    }
+
+    private void checkShouldFlushCachedBuffers() {
+        if (shouldFlushBuffers()) {
+            bufferFlushCallbacks.forEach(Runnable::run);
+        }
+    }
+
+    private boolean shouldFlushBuffers() {
+        synchronized (this) {
+            int numTotal = bufferPool.getNumBuffers();
+            int numRequested = numRequestedBuffers.get();
+            return numRequested >= numTotal
+                    || (numRequested * 1.0 / numTotal) >= 
numBuffersTriggerFlushRatio;
+        }

Review Comment:
   This does not guarantees the atomicity, unless all access to `bufferPool` 
and `numRequestedBuffers` are guarded by `this`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to