xintongsong commented on code in PR #22352: URL: https://github.com/apache/flink/pull/22352#discussion_r1191854430
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; +import org.apache.flink.util.ExceptionUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The implementation for {@link TieredStorageMemoryManager}. This is to request or recycle buffers + * from {@link LocalBufferPool} for different memory owners, for example, the tiers, the buffer + * accumulator, etc. + * + * <p>Note that the memory owner should register its {@link TieredStorageMemorySpec} firstly before + * requesting buffers. + */ +public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManager { + + private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs; + + private int numTotalExclusiveBuffers; + + private BufferPool bufferPool; + + private final AtomicInteger numRequestedBuffers; + + public TieredStorageMemoryManagerImpl() { + this.tieredMemorySpecs = new HashMap<>(); + this.numRequestedBuffers = new AtomicInteger(0); + } + + @Override + public void setup(BufferPool bufferPool) { + this.bufferPool = bufferPool; + } + + @Override + public void registerMemorySpec(TieredStorageMemorySpec memorySpec) { + checkState( + !tieredMemorySpecs.containsKey(memorySpec.getOwner()), + "Duplicated memory spec registration."); + tieredMemorySpecs.put(memorySpec.getOwner(), memorySpec); + numTotalExclusiveBuffers += memorySpec.getNumExclusiveBuffers(); + } + + @Override + public BufferBuilder requestBufferBlocking() { + MemorySegment requestedBuffer = null; + try { + requestedBuffer = bufferPool.requestMemorySegmentBlocking(); + } catch (Throwable throwable) { + ExceptionUtils.rethrow(throwable, "Failed to request memory segments."); + } + numRequestedBuffers.incrementAndGet(); + return new BufferBuilder(checkNotNull(requestedBuffer), this::recycleBuffer); + } Review Comment: We are not checking whether a buffer can be requested. This should be explained. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/CacheFlushManager.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A manager to check whether the cached buffers need to be flushed. + * + * <p>Cached buffers refer to the buffers stored in a tier that can be released by flushing them to + * the disk or remote storage. If the requested buffers from the buffer pool reach the ratio limit, + * {@link CacheFlushManager} will trigger a process wherein all these buffers are flushed to the + * disk or remote storage to recycling these buffers. + * + * <p>Note that when initializing a tier with cached buffers, the tier should register a {@link + * CacheBufferFlushTrigger}, as a listener to flush cached buffers. + */ +public class CacheFlushManager { + private final float numBuffersTriggerFlushRatio; + + private final List<CacheBufferFlushTrigger> flushTriggers; + + private TieredStorageMemoryManager storageMemoryManager; + + private final ScheduledExecutorService executor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("cache flush trigger") + .setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE) + .build()); + + public CacheFlushManager(float numBuffersTriggerFlushRatio) { + this.numBuffersTriggerFlushRatio = numBuffersTriggerFlushRatio; + this.flushTriggers = new ArrayList<>(); + + executor.scheduleWithFixedDelay( + this::checkNeedTriggerFlushCachedBuffers, 10, 50, TimeUnit.MILLISECONDS); + } + + public void setup(TieredStorageMemoryManager storageMemoryManager) { + this.storageMemoryManager = storageMemoryManager; + } + + public void registerCacheBufferFlushTrigger(CacheBufferFlushTrigger cacheBufferFlushTrigger) { + flushTriggers.add(cacheBufferFlushTrigger); + } + + public void triggerFlushCachedBuffers() { Review Comment: Should be private. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/CacheFlushManager.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A manager to check whether the cached buffers need to be flushed. + * + * <p>Cached buffers refer to the buffers stored in a tier that can be released by flushing them to + * the disk or remote storage. If the requested buffers from the buffer pool reach the ratio limit, + * {@link CacheFlushManager} will trigger a process wherein all these buffers are flushed to the + * disk or remote storage to recycling these buffers. + * + * <p>Note that when initializing a tier with cached buffers, the tier should register a {@link + * CacheBufferFlushTrigger}, as a listener to flush cached buffers. + */ +public class CacheFlushManager { + private final float numBuffersTriggerFlushRatio; + + private final List<CacheBufferFlushTrigger> flushTriggers; + + private TieredStorageMemoryManager storageMemoryManager; + + private final ScheduledExecutorService executor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("cache flush trigger") + .setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE) + .build()); + + public CacheFlushManager(float numBuffersTriggerFlushRatio) { + this.numBuffersTriggerFlushRatio = numBuffersTriggerFlushRatio; + this.flushTriggers = new ArrayList<>(); + + executor.scheduleWithFixedDelay( + this::checkNeedTriggerFlushCachedBuffers, 10, 50, TimeUnit.MILLISECONDS); + } + + public void setup(TieredStorageMemoryManager storageMemoryManager) { + this.storageMemoryManager = storageMemoryManager; + } + + public void registerCacheBufferFlushTrigger(CacheBufferFlushTrigger cacheBufferFlushTrigger) { + flushTriggers.add(cacheBufferFlushTrigger); + } + + public void triggerFlushCachedBuffers() { + flushTriggers.forEach(CacheBufferFlushTrigger::notifyFlushCachedBuffers); + } + + public void checkNeedTriggerFlushCachedBuffers() { + if (storageMemoryManager == null) { + return; + } + + if (TieredStorageUtils.needFlushCacheBuffers( + storageMemoryManager, numBuffersTriggerFlushRatio)) { + triggerFlushCachedBuffers(); + } + } + + public int numFlushTriggers() { Review Comment: Why do we need this? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; +import org.apache.flink.util.ExceptionUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The implementation for {@link TieredStorageMemoryManager}. This is to request or recycle buffers + * from {@link LocalBufferPool} for different memory owners, for example, the tiers, the buffer + * accumulator, etc. + * + * <p>Note that the memory owner should register its {@link TieredStorageMemorySpec} firstly before + * requesting buffers. + */ +public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManager { + + private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs; + + private int numTotalExclusiveBuffers; + + private BufferPool bufferPool; + + private final AtomicInteger numRequestedBuffers; + + public TieredStorageMemoryManagerImpl() { + this.tieredMemorySpecs = new HashMap<>(); + this.numRequestedBuffers = new AtomicInteger(0); + } + + @Override + public void setup(BufferPool bufferPool) { + this.bufferPool = bufferPool; + } + + @Override + public void registerMemorySpec(TieredStorageMemorySpec memorySpec) { + checkState( + !tieredMemorySpecs.containsKey(memorySpec.getOwner()), + "Duplicated memory spec registration."); + tieredMemorySpecs.put(memorySpec.getOwner(), memorySpec); + numTotalExclusiveBuffers += memorySpec.getNumExclusiveBuffers(); + } Review Comment: Do we allow adding new specs after setting up? How do we guarantee the exclusive buffers for the new spec? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemorySpec.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +/** + * The memory specs for a memory owner, including the owner itself, the number of exclusive buffers + * of the owner, whether the owner's memory can be released when not consumed, etc. + */ +public class TieredStorageMemorySpec { + + private final Object owner; + + private final int numExclusiveBuffers; Review Comment: Maybe "guaranteed" or "required" rather than "exclusive". ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemorySpec.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +/** + * The memory specs for a memory owner, including the owner itself, the number of exclusive buffers + * of the owner, whether the owner's memory can be released when not consumed, etc. + */ +public class TieredStorageMemorySpec { + + private final Object owner; + + private final int numExclusiveBuffers; + + private final boolean isMemoryReleasable; Review Comment: The name is a bit ambiguous. However, I cannot think of a better one. Maybe we should explain this with javadocs. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; +import org.apache.flink.util.ExceptionUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The implementation for {@link TieredStorageMemoryManager}. This is to request or recycle buffers + * from {@link LocalBufferPool} for different memory owners, for example, the tiers, the buffer + * accumulator, etc. + * + * <p>Note that the memory owner should register its {@link TieredStorageMemorySpec} firstly before + * requesting buffers. + */ +public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManager { + + private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs; + + private int numTotalExclusiveBuffers; + + private BufferPool bufferPool; + + private final AtomicInteger numRequestedBuffers; + + public TieredStorageMemoryManagerImpl() { + this.tieredMemorySpecs = new HashMap<>(); + this.numRequestedBuffers = new AtomicInteger(0); + } + + @Override + public void setup(BufferPool bufferPool) { + this.bufferPool = bufferPool; + } + + @Override + public void registerMemorySpec(TieredStorageMemorySpec memorySpec) { + checkState( + !tieredMemorySpecs.containsKey(memorySpec.getOwner()), + "Duplicated memory spec registration."); + tieredMemorySpecs.put(memorySpec.getOwner(), memorySpec); + numTotalExclusiveBuffers += memorySpec.getNumExclusiveBuffers(); + } + + @Override + public BufferBuilder requestBufferBlocking() { + MemorySegment requestedBuffer = null; + try { + requestedBuffer = bufferPool.requestMemorySegmentBlocking(); + } catch (Throwable throwable) { + ExceptionUtils.rethrow(throwable, "Failed to request memory segments."); + } + numRequestedBuffers.incrementAndGet(); + return new BufferBuilder(checkNotNull(requestedBuffer), this::recycleBuffer); + } + + @Override + public int numAvailableBuffers(Object owner) { Review Comment: I'd suggest the name `getMaxAllowedBuffers`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; +import org.apache.flink.util.ExceptionUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The implementation for {@link TieredStorageMemoryManager}. This is to request or recycle buffers + * from {@link LocalBufferPool} for different memory owners, for example, the tiers, the buffer + * accumulator, etc. + * + * <p>Note that the memory owner should register its {@link TieredStorageMemorySpec} firstly before + * requesting buffers. + */ +public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManager { + + private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs; + + private int numTotalExclusiveBuffers; + + private BufferPool bufferPool; + + private final AtomicInteger numRequestedBuffers; Review Comment: Is this class thread-safe or not? Why `numRequestedBuffers` is atomic while `numTotalExclusiveBuffers` is not? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java: ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager; + +/** Utils for reading or writing to tiered store. */ +public class TieredStorageUtils { + public static boolean needFlushCacheBuffers( + TieredStorageMemoryManager storageMemoryManager, float numBuffersTriggerFlushRatio) { + int numTotal = storageMemoryManager.numTotalBuffers(); + int numRequested = storageMemoryManager.numRequestedBuffers(); Review Comment: These are non-atomic now. Why not add a `getUtilization` for the memory manager. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; +import org.apache.flink.util.ExceptionUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The implementation for {@link TieredStorageMemoryManager}. This is to request or recycle buffers + * from {@link LocalBufferPool} for different memory owners, for example, the tiers, the buffer + * accumulator, etc. + * + * <p>Note that the memory owner should register its {@link TieredStorageMemorySpec} firstly before + * requesting buffers. + */ +public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManager { + + private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs; + + private int numTotalExclusiveBuffers; + + private BufferPool bufferPool; + + private final AtomicInteger numRequestedBuffers; + + public TieredStorageMemoryManagerImpl() { + this.tieredMemorySpecs = new HashMap<>(); + this.numRequestedBuffers = new AtomicInteger(0); + } + + @Override + public void setup(BufferPool bufferPool) { + this.bufferPool = bufferPool; + } + + @Override + public void registerMemorySpec(TieredStorageMemorySpec memorySpec) { + checkState( + !tieredMemorySpecs.containsKey(memorySpec.getOwner()), + "Duplicated memory spec registration."); + tieredMemorySpecs.put(memorySpec.getOwner(), memorySpec); + numTotalExclusiveBuffers += memorySpec.getNumExclusiveBuffers(); + } + + @Override + public BufferBuilder requestBufferBlocking() { + MemorySegment requestedBuffer = null; + try { + requestedBuffer = bufferPool.requestMemorySegmentBlocking(); + } catch (Throwable throwable) { + ExceptionUtils.rethrow(throwable, "Failed to request memory segments."); + } + numRequestedBuffers.incrementAndGet(); + return new BufferBuilder(checkNotNull(requestedBuffer), this::recycleBuffer); + } Review Comment: I think we should describe the overall responsibility and protocol of the memory manager in the javadoc. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/CacheFlushManager.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A manager to check whether the cached buffers need to be flushed. + * + * <p>Cached buffers refer to the buffers stored in a tier that can be released by flushing them to + * the disk or remote storage. If the requested buffers from the buffer pool reach the ratio limit, + * {@link CacheFlushManager} will trigger a process wherein all these buffers are flushed to the + * disk or remote storage to recycling these buffers. + * + * <p>Note that when initializing a tier with cached buffers, the tier should register a {@link + * CacheBufferFlushTrigger}, as a listener to flush cached buffers. + */ +public class CacheFlushManager { + private final float numBuffersTriggerFlushRatio; + + private final List<CacheBufferFlushTrigger> flushTriggers; + + private TieredStorageMemoryManager storageMemoryManager; + + private final ScheduledExecutorService executor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("cache flush trigger") + .setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE) + .build()); + + public CacheFlushManager(float numBuffersTriggerFlushRatio) { + this.numBuffersTriggerFlushRatio = numBuffersTriggerFlushRatio; + this.flushTriggers = new ArrayList<>(); + + executor.scheduleWithFixedDelay( + this::checkNeedTriggerFlushCachedBuffers, 10, 50, TimeUnit.MILLISECONDS); + } + + public void setup(TieredStorageMemoryManager storageMemoryManager) { + this.storageMemoryManager = storageMemoryManager; + } + + public void registerCacheBufferFlushTrigger(CacheBufferFlushTrigger cacheBufferFlushTrigger) { + flushTriggers.add(cacheBufferFlushTrigger); + } + + public void triggerFlushCachedBuffers() { + flushTriggers.forEach(CacheBufferFlushTrigger::notifyFlushCachedBuffers); + } + + public void checkNeedTriggerFlushCachedBuffers() { Review Comment: Do we need this to be public? Do we need to manually call the check in addition to the scheduled one? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java: ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager; + +/** Utils for reading or writing to tiered store. */ +public class TieredStorageUtils { + public static boolean needFlushCacheBuffers( + TieredStorageMemoryManager storageMemoryManager, float numBuffersTriggerFlushRatio) { + int numTotal = storageMemoryManager.numTotalBuffers(); + int numRequested = storageMemoryManager.numRequestedBuffers(); Review Comment: What if the requested buffers are not completed yet? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/CacheFlushManager.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A manager to check whether the cached buffers need to be flushed. + * + * <p>Cached buffers refer to the buffers stored in a tier that can be released by flushing them to + * the disk or remote storage. If the requested buffers from the buffer pool reach the ratio limit, + * {@link CacheFlushManager} will trigger a process wherein all these buffers are flushed to the + * disk or remote storage to recycling these buffers. + * + * <p>Note that when initializing a tier with cached buffers, the tier should register a {@link + * CacheBufferFlushTrigger}, as a listener to flush cached buffers. + */ +public class CacheFlushManager { + private final float numBuffersTriggerFlushRatio; + + private final List<CacheBufferFlushTrigger> flushTriggers; + + private TieredStorageMemoryManager storageMemoryManager; + + private final ScheduledExecutorService executor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("cache flush trigger") + .setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE) + .build()); + + public CacheFlushManager(float numBuffersTriggerFlushRatio) { + this.numBuffersTriggerFlushRatio = numBuffersTriggerFlushRatio; + this.flushTriggers = new ArrayList<>(); + + executor.scheduleWithFixedDelay( + this::checkNeedTriggerFlushCachedBuffers, 10, 50, TimeUnit.MILLISECONDS); Review Comment: Hardcoded. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/CacheBufferFlushTrigger.java: ########## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +/** Notify the specific tier to try flushing the cached buffers. */ +public interface CacheBufferFlushTrigger { Review Comment: This seems unnecessary. Why not just use `Runnable`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/CacheFlushManager.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A manager to check whether the cached buffers need to be flushed. + * + * <p>Cached buffers refer to the buffers stored in a tier that can be released by flushing them to + * the disk or remote storage. If the requested buffers from the buffer pool reach the ratio limit, + * {@link CacheFlushManager} will trigger a process wherein all these buffers are flushed to the + * disk or remote storage to recycling these buffers. + * + * <p>Note that when initializing a tier with cached buffers, the tier should register a {@link + * CacheBufferFlushTrigger}, as a listener to flush cached buffers. + */ +public class CacheFlushManager { Review Comment: It seems this is closely related to memory manager. I wonder if it makes sense to combine them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
