xintongsong commented on code in PR #22352: URL: https://github.com/apache/flink/pull/22352#discussion_r1193288010
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; + +import java.util.List; + +/** + * The {@link TieredStorageMemoryManager} is to request or recycle buffers from {@link + * LocalBufferPool} for different memory owners, for example, the tiers, the buffer accumulator, + * etc. Note that the logic for requesting and recycling buffers is consistent for these owners. + * + * <p>The memory managed by {@link TieredStorageMemoryManager} is categorized into two types: + * long-term occupied memory which cannot be immediately released and short-term occupied memory + * which can be reclaimed quickly and safely. Long-term occupied memory usage necessitates waiting + * for other operations to complete before releasing it, such as downstream consumption. On the + * other hand, short-term occupied memory can be freed up at any time, enabling rapid memory + * recycling for tasks such as flushing memory to disk or remote storage. + * + * <p>This {@link TieredStorageMemoryManager} aim to streamline and harmonize memory management + * across various layers. Instead of tracking the number of buffers utilized by individual users, it + * dynamically calculates a user's maximum guaranteed amount based on the current status of the + * manager and the local buffer pool. Specifically, if a user is a long-term occupied memory user, + * the {@link TieredStorageMemoryManager} does not limit the user's memory usage, while if a user is + * a short-term occupied memory user, the current guaranteed buffers of the user is the left buffers + * in the buffer pool - guaranteed amount of other users (excluding the current user). Review Comment: This is a bit hard to understand. I think the memory manager never limit users' memory usage. It may not need to understand the long-term and short-term differences. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; + +import java.util.List; + +/** + * The {@link TieredStorageMemoryManager} is to request or recycle buffers from {@link + * LocalBufferPool} for different memory owners, for example, the tiers, the buffer accumulator, + * etc. Note that the logic for requesting and recycling buffers is consistent for these owners. + * + * <p>The memory managed by {@link TieredStorageMemoryManager} is categorized into two types: + * long-term occupied memory which cannot be immediately released and short-term occupied memory + * which can be reclaimed quickly and safely. Long-term occupied memory usage necessitates waiting + * for other operations to complete before releasing it, such as downstream consumption. On the + * other hand, short-term occupied memory can be freed up at any time, enabling rapid memory + * recycling for tasks such as flushing memory to disk or remote storage. + * + * <p>This {@link TieredStorageMemoryManager} aim to streamline and harmonize memory management + * across various layers. Instead of tracking the number of buffers utilized by individual users, it + * dynamically calculates a user's maximum guaranteed amount based on the current status of the + * manager and the local buffer pool. Specifically, if a user is a long-term occupied memory user, + * the {@link TieredStorageMemoryManager} does not limit the user's memory usage, while if a user is + * a short-term occupied memory user, the current guaranteed buffers of the user is the left buffers + * in the buffer pool - guaranteed amount of other users (excluding the current user). + */ +public interface TieredStorageMemoryManager { + + /** + * Setup the {@link TieredStorageMemoryManager}. When setting up the manager, the {@link + * TieredStorageMemorySpec}s for different tiered storages should be ready to indicate each + * tiered storage's memory requirement specs. + * + * @param bufferPool the local buffer pool + * @param storageMemorySpecs the memory specs for different tiered storages + */ + void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> storageMemorySpecs); + + /** + * Register a buffer flush call back to flush and recycle all the memory in the user when + * needed. + * + * <p>When the left buffers in the {@link BufferPool} are not enough, {@link + * TieredStorageMemoryManager} will flush the buffers of the user with the registered callback. + * + * @param userBufferFlushCallback the buffer flush call back of the memory user + */ + void registerBufferFlushCallback(Runnable userBufferFlushCallback); + + /** + * Request a {@link BufferBuilder} instance from {@link LocalBufferPool} for a specific owner. + * + * @return the requested buffer + */ + BufferBuilder requestBufferBlocking(); + + /** + * Return the available buffers for the owner. + * + * <p>Note that the available buffers are calculated dynamically based on some conditions, for + * example, the state of the {@link BufferPool}, the {@link TieredStorageMemorySpec} of the + * owner, etc. + * + * <p>If a user is a long-term occupied memory user, the {@link TieredStorageMemoryManager} does + * not limit the user's memory usage, while if a user is a short-term occupied memory user, the + * max allowed buffers of the user is the left buffers in the buffer pool - guaranteed amount of + * other users (excluding the current user). Review Comment: These are internal. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The implementation for {@link TieredStorageMemoryManager}. This is to request or recycle buffers + * from {@link LocalBufferPool} for different memory owners, for example, the tiers, the buffer + * accumulator, etc. + * + * <p>Note that the memory owner should register its {@link TieredStorageMemorySpec} firstly before + * requesting buffers. + */ +public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManager { + + /** Initial delay before checking flush. */ + public static final int DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS = 10; + + /** Check flush period. */ + public static final int DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS = 50; + + /** The tiered storage memory specs of each memory user owner. */ + private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs; + + /** The registered callbacks to flush the buffers in the registered tiered storages. */ + private final List<Runnable> bufferFlushCallbacks; + + /** The buffer pool usage ratio of triggering the registered storages to flush buffers. */ + private final float numBuffersTriggerFlushRatio; + + /** + * Indicate whether to start the buffer flush checker thread. If the memory manager is used in + * downstream, the field will be false because no buffer flush checker is needed. + */ + private final boolean shouldStartBufferFlushChecker; Review Comment: Why do we need periodical flush while we already have checks on requesting buffers? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; + +import java.util.List; + +/** + * The {@link TieredStorageMemoryManager} is to request or recycle buffers from {@link + * LocalBufferPool} for different memory owners, for example, the tiers, the buffer accumulator, + * etc. Note that the logic for requesting and recycling buffers is consistent for these owners. + * + * <p>The memory managed by {@link TieredStorageMemoryManager} is categorized into two types: + * long-term occupied memory which cannot be immediately released and short-term occupied memory + * which can be reclaimed quickly and safely. Long-term occupied memory usage necessitates waiting + * for other operations to complete before releasing it, such as downstream consumption. On the + * other hand, short-term occupied memory can be freed up at any time, enabling rapid memory + * recycling for tasks such as flushing memory to disk or remote storage. + * + * <p>This {@link TieredStorageMemoryManager} aim to streamline and harmonize memory management + * across various layers. Instead of tracking the number of buffers utilized by individual users, it + * dynamically calculates a user's maximum guaranteed amount based on the current status of the + * manager and the local buffer pool. Specifically, if a user is a long-term occupied memory user, + * the {@link TieredStorageMemoryManager} does not limit the user's memory usage, while if a user is + * a short-term occupied memory user, the current guaranteed buffers of the user is the left buffers + * in the buffer pool - guaranteed amount of other users (excluding the current user). + */ +public interface TieredStorageMemoryManager { + + /** + * Setup the {@link TieredStorageMemoryManager}. When setting up the manager, the {@link + * TieredStorageMemorySpec}s for different tiered storages should be ready to indicate each + * tiered storage's memory requirement specs. + * + * @param bufferPool the local buffer pool + * @param storageMemorySpecs the memory specs for different tiered storages + */ + void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> storageMemorySpecs); + + /** + * Register a buffer flush call back to flush and recycle all the memory in the user when + * needed. + * + * <p>When the left buffers in the {@link BufferPool} are not enough, {@link + * TieredStorageMemoryManager} will flush the buffers of the user with the registered callback. + * + * @param userBufferFlushCallback the buffer flush call back of the memory user + */ + void registerBufferFlushCallback(Runnable userBufferFlushCallback); + + /** + * Request a {@link BufferBuilder} instance from {@link LocalBufferPool} for a specific owner. + * + * @return the requested buffer + */ + BufferBuilder requestBufferBlocking(); + + /** + * Return the available buffers for the owner. + * + * <p>Note that the available buffers are calculated dynamically based on some conditions, for + * example, the state of the {@link BufferPool}, the {@link TieredStorageMemorySpec} of the + * owner, etc. + * + * <p>If a user is a long-term occupied memory user, the {@link TieredStorageMemoryManager} does + * not limit the user's memory usage, while if a user is a short-term occupied memory user, the + * max allowed buffers of the user is the left buffers in the buffer pool - guaranteed amount of + * other users (excluding the current user). + */ + int getMaxAllowedBuffers(Object owner); Review Comment: ```suggestion int getMaxNonReclaimableBuffers(Object owner); ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; + +import java.util.List; + +/** + * The {@link TieredStorageMemoryManager} is to request or recycle buffers from {@link + * LocalBufferPool} for different memory owners, for example, the tiers, the buffer accumulator, + * etc. Note that the logic for requesting and recycling buffers is consistent for these owners. + * + * <p>The memory managed by {@link TieredStorageMemoryManager} is categorized into two types: + * long-term occupied memory which cannot be immediately released and short-term occupied memory + * which can be reclaimed quickly and safely. Long-term occupied memory usage necessitates waiting + * for other operations to complete before releasing it, such as downstream consumption. On the + * other hand, short-term occupied memory can be freed up at any time, enabling rapid memory + * recycling for tasks such as flushing memory to disk or remote storage. + * + * <p>This {@link TieredStorageMemoryManager} aim to streamline and harmonize memory management + * across various layers. Instead of tracking the number of buffers utilized by individual users, it + * dynamically calculates a user's maximum guaranteed amount based on the current status of the + * manager and the local buffer pool. Specifically, if a user is a long-term occupied memory user, + * the {@link TieredStorageMemoryManager} does not limit the user's memory usage, while if a user is + * a short-term occupied memory user, the current guaranteed buffers of the user is the left buffers + * in the buffer pool - guaranteed amount of other users (excluding the current user). + */ +public interface TieredStorageMemoryManager { + + /** + * Setup the {@link TieredStorageMemoryManager}. When setting up the manager, the {@link + * TieredStorageMemorySpec}s for different tiered storages should be ready to indicate each + * tiered storage's memory requirement specs. + * + * @param bufferPool the local buffer pool + * @param storageMemorySpecs the memory specs for different tiered storages + */ + void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> storageMemorySpecs); + + /** + * Register a buffer flush call back to flush and recycle all the memory in the user when + * needed. + * + * <p>When the left buffers in the {@link BufferPool} are not enough, {@link + * TieredStorageMemoryManager} will flush the buffers of the user with the registered callback. + * + * @param userBufferFlushCallback the buffer flush call back of the memory user + */ + void registerBufferFlushCallback(Runnable userBufferFlushCallback); Review Comment: ```suggestion void listenBufferReclaimRequest(Runnable onBufferReclaimRequest); ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; + +import java.util.List; + +/** + * The {@link TieredStorageMemoryManager} is to request or recycle buffers from {@link + * LocalBufferPool} for different memory owners, for example, the tiers, the buffer accumulator, + * etc. Note that the logic for requesting and recycling buffers is consistent for these owners. + * + * <p>The memory managed by {@link TieredStorageMemoryManager} is categorized into two types: + * long-term occupied memory which cannot be immediately released and short-term occupied memory + * which can be reclaimed quickly and safely. Long-term occupied memory usage necessitates waiting + * for other operations to complete before releasing it, such as downstream consumption. On the + * other hand, short-term occupied memory can be freed up at any time, enabling rapid memory + * recycling for tasks such as flushing memory to disk or remote storage. + * + * <p>This {@link TieredStorageMemoryManager} aim to streamline and harmonize memory management + * across various layers. Instead of tracking the number of buffers utilized by individual users, it + * dynamically calculates a user's maximum guaranteed amount based on the current status of the + * manager and the local buffer pool. Specifically, if a user is a long-term occupied memory user, + * the {@link TieredStorageMemoryManager} does not limit the user's memory usage, while if a user is + * a short-term occupied memory user, the current guaranteed buffers of the user is the left buffers + * in the buffer pool - guaranteed amount of other users (excluding the current user). + */ +public interface TieredStorageMemoryManager { + + /** + * Setup the {@link TieredStorageMemoryManager}. When setting up the manager, the {@link + * TieredStorageMemorySpec}s for different tiered storages should be ready to indicate each + * tiered storage's memory requirement specs. + * + * @param bufferPool the local buffer pool + * @param storageMemorySpecs the memory specs for different tiered storages + */ + void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> storageMemorySpecs); + + /** + * Register a buffer flush call back to flush and recycle all the memory in the user when + * needed. + * + * <p>When the left buffers in the {@link BufferPool} are not enough, {@link + * TieredStorageMemoryManager} will flush the buffers of the user with the registered callback. + * + * @param userBufferFlushCallback the buffer flush call back of the memory user + */ + void registerBufferFlushCallback(Runnable userBufferFlushCallback); + + /** + * Request a {@link BufferBuilder} instance from {@link LocalBufferPool} for a specific owner. + * + * @return the requested buffer + */ + BufferBuilder requestBufferBlocking(); + + /** + * Return the available buffers for the owner. + * + * <p>Note that the available buffers are calculated dynamically based on some conditions, for + * example, the state of the {@link BufferPool}, the {@link TieredStorageMemorySpec} of the + * owner, etc. Review Comment: So the caller should always check before requesting non-reclaimable buffers. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The implementation for {@link TieredStorageMemoryManager}. This is to request or recycle buffers + * from {@link LocalBufferPool} for different memory owners, for example, the tiers, the buffer + * accumulator, etc. + * + * <p>Note that the memory owner should register its {@link TieredStorageMemorySpec} firstly before + * requesting buffers. + */ +public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManager { + + /** Initial delay before checking flush. */ + public static final int DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS = 10; + + /** Check flush period. */ + public static final int DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS = 50; + + /** The tiered storage memory specs of each memory user owner. */ + private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs; + + /** The registered callbacks to flush the buffers in the registered tiered storages. */ + private final List<Runnable> bufferFlushCallbacks; + + /** The buffer pool usage ratio of triggering the registered storages to flush buffers. */ + private final float numBuffersTriggerFlushRatio; + + /** + * Indicate whether to start the buffer flush checker thread. If the memory manager is used in + * downstream, the field will be false because no buffer flush checker is needed. + */ + private final boolean shouldStartBufferFlushChecker; + + /** The number of requested buffers from {@link BufferPool}. */ + private final AtomicInteger numRequestedBuffers; + + /** A thread to check whether to flush buffers in each tiered storage. */ + private ScheduledExecutorService executor; + + /** The total number of guaranteed buffers for all tiered storages. */ + private int numTotalGuaranteedBuffers; + + /** The buffer pool where the buffer is requested or recyceld. */ + private BufferPool bufferPool; + + /** + * Indicate whether the {@link TieredStorageMemoryManagerImpl} is in running state. Before + * setting up, this field is false. + * + * <p>Note that before requesting buffers or getting the maximum allowed buffers, this running + * state should be checked. + */ + private boolean isRunning; + + /** + * The constructor of the {@link TieredStorageMemoryManagerImpl}. + * + * @param numBuffersTriggerFlushRatio the buffer pool usage ratio of triggering each tiered + * storage to flush buffers + * @param shouldStartBufferFlushChecker indicate whether to start the buffer flushing checker + * thread + */ + public TieredStorageMemoryManagerImpl( + float numBuffersTriggerFlushRatio, boolean shouldStartBufferFlushChecker) { + this.numBuffersTriggerFlushRatio = numBuffersTriggerFlushRatio; + this.shouldStartBufferFlushChecker = shouldStartBufferFlushChecker; + this.tieredMemorySpecs = new HashMap<>(); + this.numRequestedBuffers = new AtomicInteger(0); + this.bufferFlushCallbacks = new ArrayList<>(); + this.isRunning = false; + } + + @Override + public void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> storageMemorySpecs) { + this.bufferPool = bufferPool; + for (TieredStorageMemorySpec memorySpec : storageMemorySpecs) { + checkState( + !tieredMemorySpecs.containsKey(memorySpec.getOwner()), + "Duplicated memory spec."); + tieredMemorySpecs.put(memorySpec.getOwner(), memorySpec); + numTotalGuaranteedBuffers += memorySpec.getNumGuaranteedBuffers(); + } + + if (shouldStartBufferFlushChecker) { + this.executor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("buffer flush checker") + .setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE) + .build()); + this.executor.scheduleWithFixedDelay( + this::checkShouldFlushCachedBuffers, + DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS, + DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS, + TimeUnit.MILLISECONDS); + } + + this.isRunning = true; + } + + @Override + public void registerBufferFlushCallback(Runnable userBufferFlushCallBack) { + bufferFlushCallbacks.add(userBufferFlushCallBack); + } + + /** + * Request a {@link BufferBuilder} instance from {@link BufferPool} for a specific owner. The + * {@link TieredStorageMemoryManagerImpl} will not check whether a buffer can be requested and + * only record the total number of requested buffers. If the buffers in the {@link BufferPool} + * is not enough, this will trigger each tiered storage to flush buffers as much as possible. + * + * @return the requested buffer + */ + @Override + public BufferBuilder requestBufferBlocking() { + checkIsRunning(); + + MemorySegment requestedBuffer = null; + try { + requestedBuffer = bufferPool.requestMemorySegmentBlocking(); + } catch (Throwable throwable) { + ExceptionUtils.rethrow(throwable, "Failed to request memory segments."); + } + numRequestedBuffers.incrementAndGet(); + checkShouldFlushCachedBuffers(); Review Comment: Shouldn't this happen before requesting? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The implementation for {@link TieredStorageMemoryManager}. This is to request or recycle buffers + * from {@link LocalBufferPool} for different memory owners, for example, the tiers, the buffer + * accumulator, etc. + * + * <p>Note that the memory owner should register its {@link TieredStorageMemorySpec} firstly before + * requesting buffers. + */ +public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManager { + + /** Initial delay before checking flush. */ + public static final int DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS = 10; + + /** Check flush period. */ + public static final int DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS = 50; Review Comment: private ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The implementation for {@link TieredStorageMemoryManager}. This is to request or recycle buffers + * from {@link LocalBufferPool} for different memory owners, for example, the tiers, the buffer + * accumulator, etc. + * + * <p>Note that the memory owner should register its {@link TieredStorageMemorySpec} firstly before + * requesting buffers. + */ +public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManager { + + /** Initial delay before checking flush. */ + public static final int DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS = 10; + + /** Check flush period. */ + public static final int DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS = 50; + + /** The tiered storage memory specs of each memory user owner. */ + private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs; + + /** The registered callbacks to flush the buffers in the registered tiered storages. */ + private final List<Runnable> bufferFlushCallbacks; + + /** The buffer pool usage ratio of triggering the registered storages to flush buffers. */ + private final float numBuffersTriggerFlushRatio; + + /** + * Indicate whether to start the buffer flush checker thread. If the memory manager is used in + * downstream, the field will be false because no buffer flush checker is needed. + */ + private final boolean shouldStartBufferFlushChecker; + + /** The number of requested buffers from {@link BufferPool}. */ + private final AtomicInteger numRequestedBuffers; + + /** A thread to check whether to flush buffers in each tiered storage. */ + private ScheduledExecutorService executor; + + /** The total number of guaranteed buffers for all tiered storages. */ + private int numTotalGuaranteedBuffers; + + /** The buffer pool where the buffer is requested or recyceld. */ + private BufferPool bufferPool; + + /** + * Indicate whether the {@link TieredStorageMemoryManagerImpl} is in running state. Before + * setting up, this field is false. + * + * <p>Note that before requesting buffers or getting the maximum allowed buffers, this running + * state should be checked. + */ + private boolean isRunning; + + /** + * The constructor of the {@link TieredStorageMemoryManagerImpl}. + * + * @param numBuffersTriggerFlushRatio the buffer pool usage ratio of triggering each tiered + * storage to flush buffers + * @param shouldStartBufferFlushChecker indicate whether to start the buffer flushing checker + * thread + */ + public TieredStorageMemoryManagerImpl( + float numBuffersTriggerFlushRatio, boolean shouldStartBufferFlushChecker) { + this.numBuffersTriggerFlushRatio = numBuffersTriggerFlushRatio; + this.shouldStartBufferFlushChecker = shouldStartBufferFlushChecker; + this.tieredMemorySpecs = new HashMap<>(); + this.numRequestedBuffers = new AtomicInteger(0); + this.bufferFlushCallbacks = new ArrayList<>(); + this.isRunning = false; + } + + @Override + public void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> storageMemorySpecs) { + this.bufferPool = bufferPool; + for (TieredStorageMemorySpec memorySpec : storageMemorySpecs) { + checkState( + !tieredMemorySpecs.containsKey(memorySpec.getOwner()), + "Duplicated memory spec."); + tieredMemorySpecs.put(memorySpec.getOwner(), memorySpec); + numTotalGuaranteedBuffers += memorySpec.getNumGuaranteedBuffers(); + } + + if (shouldStartBufferFlushChecker) { + this.executor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("buffer flush checker") + .setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE) + .build()); + this.executor.scheduleWithFixedDelay( + this::checkShouldFlushCachedBuffers, + DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS, + DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS, + TimeUnit.MILLISECONDS); + } + + this.isRunning = true; + } + + @Override + public void registerBufferFlushCallback(Runnable userBufferFlushCallBack) { + bufferFlushCallbacks.add(userBufferFlushCallBack); + } + + /** + * Request a {@link BufferBuilder} instance from {@link BufferPool} for a specific owner. The + * {@link TieredStorageMemoryManagerImpl} will not check whether a buffer can be requested and + * only record the total number of requested buffers. If the buffers in the {@link BufferPool} + * is not enough, this will trigger each tiered storage to flush buffers as much as possible. + * + * @return the requested buffer + */ + @Override + public BufferBuilder requestBufferBlocking() { + checkIsRunning(); + + MemorySegment requestedBuffer = null; + try { + requestedBuffer = bufferPool.requestMemorySegmentBlocking(); + } catch (Throwable throwable) { + ExceptionUtils.rethrow(throwable, "Failed to request memory segments."); + } + numRequestedBuffers.incrementAndGet(); + checkShouldFlushCachedBuffers(); + return new BufferBuilder(checkNotNull(requestedBuffer), this::recycleBuffer); + } + + @Override + public int getMaxAllowedBuffers(Object owner) { + checkIsRunning(); + + TieredStorageMemorySpec ownerMemorySpec = checkNotNull(tieredMemorySpecs.get(owner)); + if (ownerMemorySpec.isMemoryReleasable()) { + return Integer.MAX_VALUE; + } else { Review Comment: We may not need the releasable / unreleasable at all. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The implementation for {@link TieredStorageMemoryManager}. This is to request or recycle buffers + * from {@link LocalBufferPool} for different memory owners, for example, the tiers, the buffer + * accumulator, etc. + * + * <p>Note that the memory owner should register its {@link TieredStorageMemorySpec} firstly before + * requesting buffers. + */ +public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManager { + + /** Initial delay before checking flush. */ + public static final int DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS = 10; + + /** Check flush period. */ + public static final int DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS = 50; + + /** The tiered storage memory specs of each memory user owner. */ + private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs; + + /** The registered callbacks to flush the buffers in the registered tiered storages. */ + private final List<Runnable> bufferFlushCallbacks; + + /** The buffer pool usage ratio of triggering the registered storages to flush buffers. */ + private final float numBuffersTriggerFlushRatio; + + /** + * Indicate whether to start the buffer flush checker thread. If the memory manager is used in + * downstream, the field will be false because no buffer flush checker is needed. + */ + private final boolean shouldStartBufferFlushChecker; Review Comment: It seems the periodical flush is not part of the memory management. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The implementation for {@link TieredStorageMemoryManager}. This is to request or recycle buffers + * from {@link LocalBufferPool} for different memory owners, for example, the tiers, the buffer + * accumulator, etc. + * + * <p>Note that the memory owner should register its {@link TieredStorageMemorySpec} firstly before + * requesting buffers. + */ +public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManager { + + /** Initial delay before checking flush. */ + public static final int DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS = 10; + + /** Check flush period. */ + public static final int DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS = 50; + + /** The tiered storage memory specs of each memory user owner. */ + private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs; + + /** The registered callbacks to flush the buffers in the registered tiered storages. */ + private final List<Runnable> bufferFlushCallbacks; + + /** The buffer pool usage ratio of triggering the registered storages to flush buffers. */ + private final float numBuffersTriggerFlushRatio; + + /** + * Indicate whether to start the buffer flush checker thread. If the memory manager is used in + * downstream, the field will be false because no buffer flush checker is needed. + */ + private final boolean shouldStartBufferFlushChecker; + + /** The number of requested buffers from {@link BufferPool}. */ + private final AtomicInteger numRequestedBuffers; + + /** A thread to check whether to flush buffers in each tiered storage. */ + private ScheduledExecutorService executor; + + /** The total number of guaranteed buffers for all tiered storages. */ + private int numTotalGuaranteedBuffers; + + /** The buffer pool where the buffer is requested or recyceld. */ + private BufferPool bufferPool; + + /** + * Indicate whether the {@link TieredStorageMemoryManagerImpl} is in running state. Before + * setting up, this field is false. + * + * <p>Note that before requesting buffers or getting the maximum allowed buffers, this running + * state should be checked. + */ + private boolean isRunning; Review Comment: For `isRunning`, we should set it to `false` in `release()`. Or we can rename it to `initialized`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The implementation for {@link TieredStorageMemoryManager}. This is to request or recycle buffers + * from {@link LocalBufferPool} for different memory owners, for example, the tiers, the buffer + * accumulator, etc. + * + * <p>Note that the memory owner should register its {@link TieredStorageMemorySpec} firstly before + * requesting buffers. + */ +public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManager { + + /** Initial delay before checking flush. */ + public static final int DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS = 10; + + /** Check flush period. */ + public static final int DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS = 50; + + /** The tiered storage memory specs of each memory user owner. */ + private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs; + + /** The registered callbacks to flush the buffers in the registered tiered storages. */ + private final List<Runnable> bufferFlushCallbacks; + + /** The buffer pool usage ratio of triggering the registered storages to flush buffers. */ + private final float numBuffersTriggerFlushRatio; + + /** + * Indicate whether to start the buffer flush checker thread. If the memory manager is used in + * downstream, the field will be false because no buffer flush checker is needed. + */ + private final boolean shouldStartBufferFlushChecker; + + /** The number of requested buffers from {@link BufferPool}. */ + private final AtomicInteger numRequestedBuffers; + + /** A thread to check whether to flush buffers in each tiered storage. */ + private ScheduledExecutorService executor; + + /** The total number of guaranteed buffers for all tiered storages. */ + private int numTotalGuaranteedBuffers; + + /** The buffer pool where the buffer is requested or recyceld. */ + private BufferPool bufferPool; + + /** + * Indicate whether the {@link TieredStorageMemoryManagerImpl} is in running state. Before + * setting up, this field is false. + * + * <p>Note that before requesting buffers or getting the maximum allowed buffers, this running + * state should be checked. + */ + private boolean isRunning; + + /** + * The constructor of the {@link TieredStorageMemoryManagerImpl}. + * + * @param numBuffersTriggerFlushRatio the buffer pool usage ratio of triggering each tiered + * storage to flush buffers + * @param shouldStartBufferFlushChecker indicate whether to start the buffer flushing checker + * thread + */ + public TieredStorageMemoryManagerImpl( + float numBuffersTriggerFlushRatio, boolean shouldStartBufferFlushChecker) { + this.numBuffersTriggerFlushRatio = numBuffersTriggerFlushRatio; + this.shouldStartBufferFlushChecker = shouldStartBufferFlushChecker; + this.tieredMemorySpecs = new HashMap<>(); + this.numRequestedBuffers = new AtomicInteger(0); + this.bufferFlushCallbacks = new ArrayList<>(); + this.isRunning = false; + } + + @Override + public void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> storageMemorySpecs) { + this.bufferPool = bufferPool; + for (TieredStorageMemorySpec memorySpec : storageMemorySpecs) { + checkState( + !tieredMemorySpecs.containsKey(memorySpec.getOwner()), + "Duplicated memory spec."); + tieredMemorySpecs.put(memorySpec.getOwner(), memorySpec); + numTotalGuaranteedBuffers += memorySpec.getNumGuaranteedBuffers(); + } + + if (shouldStartBufferFlushChecker) { + this.executor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("buffer flush checker") + .setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE) + .build()); + this.executor.scheduleWithFixedDelay( + this::checkShouldFlushCachedBuffers, + DEFAULT_CHECK_FLUSH_INITIAL_DELAY_MS, + DEFAULT_CHECK_FLUSH_PERIOD_DURATION_MS, + TimeUnit.MILLISECONDS); + } + + this.isRunning = true; + } + + @Override + public void registerBufferFlushCallback(Runnable userBufferFlushCallBack) { + bufferFlushCallbacks.add(userBufferFlushCallBack); + } + + /** + * Request a {@link BufferBuilder} instance from {@link BufferPool} for a specific owner. The + * {@link TieredStorageMemoryManagerImpl} will not check whether a buffer can be requested and + * only record the total number of requested buffers. If the buffers in the {@link BufferPool} + * is not enough, this will trigger each tiered storage to flush buffers as much as possible. + * + * @return the requested buffer + */ + @Override + public BufferBuilder requestBufferBlocking() { + checkIsRunning(); + + MemorySegment requestedBuffer = null; + try { + requestedBuffer = bufferPool.requestMemorySegmentBlocking(); + } catch (Throwable throwable) { + ExceptionUtils.rethrow(throwable, "Failed to request memory segments."); + } + numRequestedBuffers.incrementAndGet(); + checkShouldFlushCachedBuffers(); + return new BufferBuilder(checkNotNull(requestedBuffer), this::recycleBuffer); + } + + @Override + public int getMaxAllowedBuffers(Object owner) { + checkIsRunning(); + + TieredStorageMemorySpec ownerMemorySpec = checkNotNull(tieredMemorySpecs.get(owner)); + if (ownerMemorySpec.isMemoryReleasable()) { + return Integer.MAX_VALUE; + } else { + int ownerGuaranteedBuffers = ownerMemorySpec.getNumGuaranteedBuffers(); + return bufferPool.getNumBuffers() + - numRequestedBuffers.get() + - numTotalGuaranteedBuffers + + ownerGuaranteedBuffers; + } + } + + @Override + public void release() { + checkState(numRequestedBuffers.get() == 0, "Leaking buffers."); + if (executor != null) { + executor.shutdown(); + try { + if (!executor.awaitTermination(5L, TimeUnit.MINUTES)) { + throw new TimeoutException( + "Timeout for shutting down the cache flush executor."); + } + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + } + } + + private void checkShouldFlushCachedBuffers() { + if (shouldFlushBuffers()) { + bufferFlushCallbacks.forEach(Runnable::run); + } + } + + private boolean shouldFlushBuffers() { + synchronized (this) { + int numTotal = bufferPool.getNumBuffers(); + int numRequested = numRequestedBuffers.get(); + return numRequested >= numTotal + || (numRequested * 1.0 / numTotal) >= numBuffersTriggerFlushRatio; + } Review Comment: This does not guarantees the atomicity, unless all access to `bufferPool` and `numRequestedBuffers` are guarded by `this`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
