xintongsong commented on code in PR #22352:
URL: https://github.com/apache/flink/pull/22352#discussion_r1181474271


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/CacheFlushManager.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A manager to check whether the cached buffers need to be flushed.
+ *
+ * <p>Note that when initializing a tier with cached buffers, the tier should 
register a {@link
+ * CacheBufferFlushTrigger}, as a listener to flush cached buffers.
+ */
+public class CacheFlushManager {

Review Comment:
   I see what this class does from the implementation. But it is unclear to me 
why do we need this? Why would we need to flush everything when a certain ratio 
of buffers in the pool are used?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+
+/**
+ * The {@link TieredStorageMemoryManager} is to request or recycle buffer from 
{@link
+ * LocalBufferPool} for different tiers or {@link BufferAccumulator}.
+ *
+ * <p>The buffers of {@link BufferAccumulator} is also managed in the {@link
+ * TieredStorageMemoryManager}. When {@link BufferAccumulator} requests or 
recycles buffers, it uses
+ * separate methods that are different from tiers.
+ */
+public interface TieredStorageMemoryManager {
+
+    /** Setup the {@link TieredStorageMemoryManager}. */
+    void setup(BufferPool bufferPool);
+
+    /**
+     * Request a {@link MemorySegment} instance from {@link LocalBufferPool} 
for a specific tier.
+     *
+     * @param tierIndex the tier index
+     * @return the requested buffer
+     */
+    MemorySegment requestBufferBlocking(int tierIndex);
+
+    /**
+     * Recycle a {@link MemorySegment} buffer to {@link LocalBufferPool} for a 
specific tier.
+     *
+     * @param memorySegment the buffer to be recycled
+     * @param tierIndex the specific tier index
+     */
+    void recycleBuffer(MemorySegment memorySegment, int tierIndex);
+
+    /** Request a {@link MemorySegment} buffer for the buffer accumulator. */
+    MemorySegment requestBufferInAccumulator();
+
+    /** Recycle a {@link MemorySegment} buffer for the buffer accumulator. */
+    void recycleBufferInAccumulator(MemorySegment memorySegment);
+
+    /**
+     * Increase the number of requested buffers for a tier.
+     *
+     * @param tierIndex the specific tier index
+     */
+    void incNumRequestedBuffer(int tierIndex);
+
+    /**
+     * Decrease the number of requested buffers for a tier.
+     *
+     * @param tierIndex the specific tier index
+     */
+    void decNumRequestedBuffer(int tierIndex);
+
+    /** Increase the number of requested buffers for buffer accumulator. */
+    void incNumRequestedBufferInAccumulator();
+
+    /** Decrease the number of requested buffers for buffer accumulator. */
+    void decNumRequestedBufferInAccumulator();

Review Comment:
   These seems to be internal methods and should not be exposed via the 
interface.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+
+/**
+ * The {@link TieredStorageMemoryManager} is to request or recycle buffer from 
{@link
+ * LocalBufferPool} for different tiers or {@link BufferAccumulator}.
+ *
+ * <p>The buffers of {@link BufferAccumulator} is also managed in the {@link
+ * TieredStorageMemoryManager}. When {@link BufferAccumulator} requests or 
recycles buffers, it uses
+ * separate methods that are different from tiers.
+ */
+public interface TieredStorageMemoryManager {
+
+    /** Setup the {@link TieredStorageMemoryManager}. */
+    void setup(BufferPool bufferPool);
+
+    /**
+     * Request a {@link MemorySegment} instance from {@link LocalBufferPool} 
for a specific tier.
+     *
+     * @param tierIndex the tier index
+     * @return the requested buffer
+     */
+    MemorySegment requestBufferBlocking(int tierIndex);
+
+    /**
+     * Recycle a {@link MemorySegment} buffer to {@link LocalBufferPool} for a 
specific tier.
+     *
+     * @param memorySegment the buffer to be recycled
+     * @param tierIndex the specific tier index
+     */
+    void recycleBuffer(MemorySegment memorySegment, int tierIndex);
+
+    /** Request a {@link MemorySegment} buffer for the buffer accumulator. */
+    MemorySegment requestBufferInAccumulator();
+
+    /** Recycle a {@link MemorySegment} buffer for the buffer accumulator. */
+    void recycleBufferInAccumulator(MemorySegment memorySegment);

Review Comment:
   Why do we have different interfaces for tiers and accumulator? How are they 
different as memory consumers?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMemorySpec;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation for {@link TieredStorageMemoryManager}. This is to 
request or recycle buffers
+ * from {@link LocalBufferPool} for different tiers or {@link 
BufferAccumulator}.
+ *
+ * <p>The buffers in a buffer pool are mainly divided into types, the 
exclusive buffers for a tier
+ * and the shared buffers for all tiers. Each tier decides the number of 
exclusive buffers and
+ * whether it can use the shared buffers by {@link TierMemorySpec}.
+ *
+ * <p>The buffers of {@link BufferAccumulator} is also managed in the {@link
+ * TieredStorageMemoryManager}. When {@link BufferAccumulator} requests or 
recycles buffers, it uses
+ * separate methods that are different from tiers.
+ *
+ * <p>Note that when initializing the {@link TieredStorageMemoryManagerImpl}, 
the {@link
+ * TierMemorySpec} of each tier should be ready. These tier memory specs 
mainly decide how many
+ * buffers can be used for every tier.
+ */
+public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManager {

Review Comment:
   How does this memory manager guarantees the exclusive buffers? I see this 
class bookkeeps the number of buffers each tier and accumulator uses. However, 
I don't see any logic for reversing exclusive buffers for a tier, or limiting a 
tier from requesting too many buffers.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+
+/**
+ * The {@link TieredStorageMemoryManager} is to request or recycle buffer from 
{@link
+ * LocalBufferPool} for different tiers or {@link BufferAccumulator}.
+ *
+ * <p>The buffers of {@link BufferAccumulator} is also managed in the {@link
+ * TieredStorageMemoryManager}. When {@link BufferAccumulator} requests or 
recycles buffers, it uses
+ * separate methods that are different from tiers.
+ */
+public interface TieredStorageMemoryManager {
+
+    /** Setup the {@link TieredStorageMemoryManager}. */
+    void setup(BufferPool bufferPool);
+
+    /**
+     * Request a {@link MemorySegment} instance from {@link LocalBufferPool} 
for a specific tier.
+     *
+     * @param tierIndex the tier index
+     * @return the requested buffer
+     */
+    MemorySegment requestBufferBlocking(int tierIndex);
+
+    /**
+     * Recycle a {@link MemorySegment} buffer to {@link LocalBufferPool} for a 
specific tier.
+     *
+     * @param memorySegment the buffer to be recycled
+     * @param tierIndex the specific tier index
+     */
+    void recycleBuffer(MemorySegment memorySegment, int tierIndex);

Review Comment:
   Why not provide `Buffer` directly? So that we won't need a `recycle` 
interface at all, because the recycler can be set in the returned buffer.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMemorySpec.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier;
+
+/**
+ * The memory specs for a tier, including the tier index, the number of 
exclusive buffers of the
+ * tier, whether the tier can use the shared buffers in the memory manager, 
etc.
+ */
+public class TierMemorySpec {
+
+    private final int tierIndex;
+
+    private final int numExclusiveBuffers;
+
+    private final boolean canUseSharedBuffers;
+
+    public TierMemorySpec(int tierIndex, int numExclusiveBuffers, boolean 
canUseSharedBuffers) {
+        this.tierIndex = tierIndex;
+        this.numExclusiveBuffers = numExclusiveBuffers;
+        this.canUseSharedBuffers = canUseSharedBuffers;
+    }
+
+    public int getTierIndex() {
+        return tierIndex;
+    }
+
+    public int getNumExclusiveBuffers() {
+        return numExclusiveBuffers;
+    }
+
+    public boolean canUseShareBuffers() {
+        return canUseSharedBuffers;

Review Comment:
   Under what condition should this be `false`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to