reswqa commented on code in PR #22804:
URL: https://github.com/apache/flink/pull/22804#discussion_r1246319606


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileWriter.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The {@link PartitionFileWriter} interface defines the write logic for 
different types of shuffle
+ * files.
+ */
+public interface PartitionFileWriter {
+
+    /**
+     * Write the buffers to the partition file. The written buffers may belong 
to multiple
+     * subpartitions, but these buffers will be consecutive in the file.
+     *
+     * @param partitionId the partition id
+     * @param buffersToWrite the buffers to be written to the partition file
+     * @return the completable future indicating whether the writing file 
process has finished. If
+     *     the {@link CompletableFuture} is completed, the written process is 
completed.
+     */
+    CompletableFuture<Void> write(
+            TieredStoragePartitionId partitionId, 
List<SubpartitionBufferContext> buffersToWrite);
+
+    /** Release all the resources of the {@link PartitionFileWriter}. */
+    void release();
+
+    /**
+     * The {@link SubpartitionBufferContext} contains all the buffers 
belonging to one subpartition.
+     */
+    class SubpartitionBufferContext {
+
+        /** The subpartition id. */
+        private final int subpartitionId;
+
+        /** The {@link SegmentBufferContext}s belonging to the subpartition. */
+        private final List<SegmentBufferContext> segmentBufferContexts;
+
+        public SubpartitionBufferContext(
+                int subpartitionId, List<SegmentBufferContext> 
segmentBufferContexts) {
+            this.subpartitionId = subpartitionId;
+            this.segmentBufferContexts = segmentBufferContexts;
+        }
+
+        public int getSubpartitionId() {
+            return subpartitionId;
+        }
+
+        public List<SegmentBufferContext> getSegmentBufferContexts() {
+            return segmentBufferContexts;
+        }
+    }
+
+    /**
+     * The {@link SegmentBufferContext} contains all the buffers belonging to 
the segment. Note that
+     * when this indicates whether the segment is finished, the field {@code 
bufferWithIndexes}
+     * should be empty.
+     */
+    class SegmentBufferContext {
+
+        /** The segment id. */
+        private final int segmentId;
+
+        /** All the buffers belonging to the segment. */
+        private final List<Tuple2<Buffer, Integer>> bufferAndIndexes;
+
+        /** Whether it is necessary to finish the segment. */
+        private final boolean segmentFinished;
+
+        public SegmentBufferContext(
+                int segmentId,
+                List<Tuple2<Buffer, Integer>> bufferAndIndexes,
+                boolean segmentFinished) {
+            this.segmentId = segmentId;
+            this.bufferAndIndexes = bufferAndIndexes;
+            this.segmentFinished = segmentFinished;
+        }
+
+        public int getSegmentId() {
+            return segmentId;
+        }
+
+        public List<Tuple2<Buffer, Integer>> getBufferAndIndexes() {
+            return bufferAndIndexes;
+        }
+
+        public boolean isSegmentFinished() {

Review Comment:
   It seems that this method is unused.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The {@link ProducerMergedPartitionFileIndex} is used by {@link 
ProducerMergedPartitionFileWriter}
+ * and {@link ProducerMergedPartitionFileReader}, to maintain the offset of 
each buffer in the
+ * physical file.
+ *
+ * <p>For efficiency, buffers from the same subpartition that are both 
logically (i.e. index in the
+ * subpartition) and physically (i.e. offset in the file) consecutive are 
combined into a {@link
+ * Region}.
+ *
+ * <pre>For example, the following buffers (indicated by 
subpartitionId-bufferIndex):
+ *   1-1, 1-2, 1-3, 2-1, 2-2, 2-5, 1-4, 1-5, 2-6
+ * will be combined into 5 regions (separated by '|'):
+ *   1-1, 1-2, 1-3 | 2-1, 2-2 | 2-5 | 1-4, 1-5 | 2-6
+ * </pre>
+ */
+public class ProducerMergedPartitionFileIndex {
+
+    /**
+     * The regions belonging to each subpartitions.
+     *
+     * <p>Note that the field can be accessed by the writing and reading IO 
thread, so the lock is
+     * to ensure the thread safety.
+     */
+    @GuardedBy("lock")
+    private final List<TreeMap<Integer, Region>> subpartitionRegions;
+
+    @GuardedBy("lock")
+    private boolean isReleased;
+
+    private final Object lock = new Object();
+
+    public ProducerMergedPartitionFileIndex(int numSubpartitions) {
+        this.subpartitionRegions = new ArrayList<>();
+        for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+            subpartitionRegions.add(new TreeMap<>());
+        }
+    }
+
+    /**
+     * Add buffers to the index.
+     *
+     * @param buffers to be added. Note, the provided buffers are required to 
be physically
+     *     consecutive and in the same order as in the file.
+     */
+    void addBuffers(List<FlushedBuffer> buffers) {
+        if (buffers.isEmpty()) {
+            return;
+        }
+
+        Map<Integer, List<Region>> convertedRegions = 
convertToRegions(buffers);
+        synchronized (lock) {
+            convertedRegions.forEach(
+                    (subpartition, regions) -> {
+                        Map<Integer, Region> regionMap = 
subpartitionRegions.get(subpartition);
+                        for (Region region : regions) {
+                            regionMap.put(region.getFirstBufferIndex(), 
region);
+                        }
+                    });
+        }
+    }
+
+    /**
+     * Get the subpartition's {@link Region} containing the specific buffer 
index.
+     *
+     * @param subpartitionId the subpartition id
+     * @param bufferIndex the buffer index
+     * @return the region containing the buffer index, or return emtpy if the 
region is not found.
+     */
+    Optional<Region> getRegion(TieredStorageSubpartitionId subpartitionId, int 
bufferIndex) {
+        synchronized (lock) {
+            if (isReleased) {
+                return Optional.empty();
+            }
+            Map.Entry<Integer, Region> regionEntry =
+                    subpartitionRegions
+                            .get(subpartitionId.getSubpartitionId())
+                            .floorEntry(bufferIndex);
+            if (regionEntry == null) {
+                return Optional.empty();
+            }
+            Region region = regionEntry.getValue();
+            return bufferIndex < region.getFirstBufferIndex() + 
region.numBuffers
+                    ? Optional.of(region)
+                    : Optional.empty();
+        }
+    }
+
+    void release() {
+        synchronized (lock) {
+            subpartitionRegions.clear();
+            isReleased = true;
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    private static Map<Integer, List<Region>> 
convertToRegions(List<FlushedBuffer> buffers) {
+        Map<Integer, List<Region>> subpartitionRegionMap = new HashMap<>();
+        Iterator<FlushedBuffer> iterator = buffers.iterator();
+        FlushedBuffer firstBufferInRegion = iterator.next();
+        FlushedBuffer lastBufferInRegion = firstBufferInRegion;
+
+        while (iterator.hasNext()) {
+            FlushedBuffer currentBuffer = iterator.next();
+            if (currentBuffer.getSubpartitionId() != 
firstBufferInRegion.getSubpartitionId()
+                    || currentBuffer.getBufferIndex() != 
lastBufferInRegion.getBufferIndex() + 1) {
+                // The current buffer belongs to a new region, add the current 
region to the map
+                addRegionToMap(firstBufferInRegion, lastBufferInRegion, 
subpartitionRegionMap);
+                firstBufferInRegion = currentBuffer;
+            }
+            lastBufferInRegion = currentBuffer;
+        }
+
+        // Add the last region to the map
+        addRegionToMap(firstBufferInRegion, lastBufferInRegion, 
subpartitionRegionMap);
+        return subpartitionRegionMap;
+    }
+
+    private static void addRegionToMap(
+            FlushedBuffer firstBufferInRegion,
+            FlushedBuffer lastBufferInRegion,
+            Map<Integer, List<Region>> subpartitionRegionMap) {
+        checkArgument(
+                firstBufferInRegion.getSubpartitionId() == 
lastBufferInRegion.getSubpartitionId());
+        checkArgument(firstBufferInRegion.getBufferIndex() <= 
lastBufferInRegion.getBufferIndex());
+
+        subpartitionRegionMap
+                .computeIfAbsent(firstBufferInRegion.getSubpartitionId(), 
ArrayList::new)
+                .add(
+                        new Region(
+                                firstBufferInRegion.getBufferIndex(),
+                                firstBufferInRegion.getFileOffset(),
+                                lastBufferInRegion.getBufferIndex()
+                                        - firstBufferInRegion.getBufferIndex()
+                                        + 1));
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Classes
+    // ------------------------------------------------------------------------
+
+    /** Represents a buffer to be flushed. */
+    static class FlushedBuffer {
+        /** The subpartition id that the buffer belongs to. */
+        private final int subpartitionId;
+
+        /** The buffer index within the subpartition. */
+        private final int bufferIndex;
+
+        /** The file offset that the buffer begin with. */
+        private final long fileOffset;
+
+        FlushedBuffer(int subpartitionId, int bufferIndex, long fileOffset) {
+            this.subpartitionId = subpartitionId;
+            this.bufferIndex = bufferIndex;
+            this.fileOffset = fileOffset;
+        }
+
+        int getSubpartitionId() {
+            return subpartitionId;
+        }
+
+        int getBufferIndex() {
+            return bufferIndex;
+        }
+
+        long getFileOffset() {
+            return fileOffset;
+        }
+    }
+
+    /**
+     * Represents a series of buffers that are:
+     *
+     * <ul>
+     *   <li>From the same subpartition
+     *   <li>Logically (i.e. buffer index) consecutive
+     *   <li>Physically (i.e. offset in the file) consecutive
+     * </ul>
+     */
+    static class Region {
+
+        /** The buffer index of first buffer. */
+        private final int firstBufferIndex;
+
+        /** The file offset of the region. */
+        private final long regionFileOffset;
+
+        /** The number of buffers that the region contains. */
+        private final int numBuffers;
+
+        Region(int firstBufferIndex, long regionFileOffset, int numBuffers) {
+            this.firstBufferIndex = firstBufferIndex;
+            this.regionFileOffset = regionFileOffset;
+            this.numBuffers = numBuffers;
+        }
+
+        long getRegionFileOffset() {
+            return regionFileOffset;
+        }
+
+        int getNumBuffers() {

Review Comment:
   Unused method.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The {@link DiskCacheManager} is responsible for managing cached buffers 
before flushing to files.
+ */
+class DiskCacheManager {
+
+    private final TieredStoragePartitionId partitionId;
+
+    private final int numSubpartitions;
+
+    private final PartitionFileWriter partitionFileWriter;
+
+    private final SubpartitionDiskCacheManager[] subpartitionCacheManagers;
+
+    /** Whether the current flush process has completed. */
+    private CompletableFuture<Void> hasFlushCompleted;

Review Comment:
   ```suggestion
       private CompletableFuture<Void> onGoingFlushFuture;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The {@link ProducerMergedPartitionFileIndex} is used by {@link 
ProducerMergedPartitionFileWriter}
+ * and {@link ProducerMergedPartitionFileReader}, to maintain the offset of 
each buffer in the
+ * physical file.
+ *
+ * <p>For efficiency, buffers from the same subpartition that are both 
logically (i.e. index in the
+ * subpartition) and physically (i.e. offset in the file) consecutive are 
combined into a {@link
+ * Region}.
+ *
+ * <pre>For example, the following buffers (indicated by 
subpartitionId-bufferIndex):
+ *   1-1, 1-2, 1-3, 2-1, 2-2, 2-5, 1-4, 1-5, 2-6
+ * will be combined into 5 regions (separated by '|'):
+ *   1-1, 1-2, 1-3 | 2-1, 2-2 | 2-5 | 1-4, 1-5 | 2-6
+ * </pre>
+ */
+public class ProducerMergedPartitionFileIndex {
+
+    /**
+     * The regions belonging to each subpartitions.
+     *
+     * <p>Note that the field can be accessed by the writing and reading IO 
thread, so the lock is
+     * to ensure the thread safety.
+     */
+    @GuardedBy("lock")
+    private final List<TreeMap<Integer, Region>> subpartitionRegions;
+
+    @GuardedBy("lock")
+    private boolean isReleased;
+
+    private final Object lock = new Object();
+
+    public ProducerMergedPartitionFileIndex(int numSubpartitions) {
+        this.subpartitionRegions = new ArrayList<>();
+        for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+            subpartitionRegions.add(new TreeMap<>());
+        }
+    }
+
+    /**
+     * Add buffers to the index.
+     *
+     * @param buffers to be added. Note, the provided buffers are required to 
be physically
+     *     consecutive and in the same order as in the file.
+     */
+    void addBuffers(List<FlushedBuffer> buffers) {
+        if (buffers.isEmpty()) {
+            return;
+        }
+
+        Map<Integer, List<Region>> convertedRegions = 
convertToRegions(buffers);
+        synchronized (lock) {
+            convertedRegions.forEach(
+                    (subpartition, regions) -> {
+                        Map<Integer, Region> regionMap = 
subpartitionRegions.get(subpartition);
+                        for (Region region : regions) {
+                            regionMap.put(region.getFirstBufferIndex(), 
region);
+                        }
+                    });
+        }
+    }
+
+    /**
+     * Get the subpartition's {@link Region} containing the specific buffer 
index.
+     *
+     * @param subpartitionId the subpartition id
+     * @param bufferIndex the buffer index
+     * @return the region containing the buffer index, or return emtpy if the 
region is not found.
+     */
+    Optional<Region> getRegion(TieredStorageSubpartitionId subpartitionId, int 
bufferIndex) {
+        synchronized (lock) {
+            if (isReleased) {
+                return Optional.empty();
+            }
+            Map.Entry<Integer, Region> regionEntry =
+                    subpartitionRegions
+                            .get(subpartitionId.getSubpartitionId())
+                            .floorEntry(bufferIndex);
+            if (regionEntry == null) {
+                return Optional.empty();
+            }
+            Region region = regionEntry.getValue();
+            return bufferIndex < region.getFirstBufferIndex() + 
region.numBuffers
+                    ? Optional.of(region)
+                    : Optional.empty();
+        }
+    }
+
+    void release() {
+        synchronized (lock) {
+            subpartitionRegions.clear();
+            isReleased = true;
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    private static Map<Integer, List<Region>> 
convertToRegions(List<FlushedBuffer> buffers) {
+        Map<Integer, List<Region>> subpartitionRegionMap = new HashMap<>();
+        Iterator<FlushedBuffer> iterator = buffers.iterator();
+        FlushedBuffer firstBufferInRegion = iterator.next();
+        FlushedBuffer lastBufferInRegion = firstBufferInRegion;
+
+        while (iterator.hasNext()) {
+            FlushedBuffer currentBuffer = iterator.next();
+            if (currentBuffer.getSubpartitionId() != 
firstBufferInRegion.getSubpartitionId()
+                    || currentBuffer.getBufferIndex() != 
lastBufferInRegion.getBufferIndex() + 1) {
+                // The current buffer belongs to a new region, add the current 
region to the map
+                addRegionToMap(firstBufferInRegion, lastBufferInRegion, 
subpartitionRegionMap);
+                firstBufferInRegion = currentBuffer;
+            }
+            lastBufferInRegion = currentBuffer;
+        }
+
+        // Add the last region to the map
+        addRegionToMap(firstBufferInRegion, lastBufferInRegion, 
subpartitionRegionMap);
+        return subpartitionRegionMap;
+    }
+
+    private static void addRegionToMap(
+            FlushedBuffer firstBufferInRegion,
+            FlushedBuffer lastBufferInRegion,
+            Map<Integer, List<Region>> subpartitionRegionMap) {
+        checkArgument(
+                firstBufferInRegion.getSubpartitionId() == 
lastBufferInRegion.getSubpartitionId());
+        checkArgument(firstBufferInRegion.getBufferIndex() <= 
lastBufferInRegion.getBufferIndex());
+
+        subpartitionRegionMap
+                .computeIfAbsent(firstBufferInRegion.getSubpartitionId(), 
ArrayList::new)
+                .add(
+                        new Region(
+                                firstBufferInRegion.getBufferIndex(),
+                                firstBufferInRegion.getFileOffset(),
+                                lastBufferInRegion.getBufferIndex()
+                                        - firstBufferInRegion.getBufferIndex()
+                                        + 1));
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Classes
+    // ------------------------------------------------------------------------
+
+    /** Represents a buffer to be flushed. */
+    static class FlushedBuffer {
+        /** The subpartition id that the buffer belongs to. */
+        private final int subpartitionId;
+
+        /** The buffer index within the subpartition. */
+        private final int bufferIndex;
+
+        /** The file offset that the buffer begin with. */
+        private final long fileOffset;
+
+        FlushedBuffer(int subpartitionId, int bufferIndex, long fileOffset) {
+            this.subpartitionId = subpartitionId;
+            this.bufferIndex = bufferIndex;
+            this.fileOffset = fileOffset;
+        }
+
+        int getSubpartitionId() {
+            return subpartitionId;
+        }
+
+        int getBufferIndex() {
+            return bufferIndex;
+        }
+
+        long getFileOffset() {
+            return fileOffset;
+        }
+    }
+
+    /**
+     * Represents a series of buffers that are:
+     *
+     * <ul>
+     *   <li>From the same subpartition
+     *   <li>Logically (i.e. buffer index) consecutive
+     *   <li>Physically (i.e. offset in the file) consecutive
+     * </ul>
+     */
+    static class Region {
+
+        /** The buffer index of first buffer. */
+        private final int firstBufferIndex;
+
+        /** The file offset of the region. */
+        private final long regionFileOffset;
+
+        /** The number of buffers that the region contains. */
+        private final int numBuffers;
+
+        Region(int firstBufferIndex, long regionFileOffset, int numBuffers) {
+            this.firstBufferIndex = firstBufferIndex;
+            this.regionFileOffset = regionFileOffset;
+            this.numBuffers = numBuffers;
+        }
+
+        long getRegionFileOffset() {

Review Comment:
   Unused method.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/SubpartitionDiskCacheManager.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link SubpartitionDiskCacheManager} is responsible to manage the 
cached buffers in a single
+ * subpartition.
+ */
+class SubpartitionDiskCacheManager {
+
+    /**
+     * All the buffers. The first field of the tuple is the buffer, while the 
second field of the
+     * buffer is the buffer index.
+     *
+     * <p>Note that this field can be accessed by the task thread or the write 
IO thread, so the
+     * thread safety should be ensured.
+     */
+    private final Deque<Tuple2<Buffer, Integer>> allBuffers = new 
LinkedList<>();
+
+    /**
+     * Record the buffer index in the {@link SubpartitionDiskCacheManager}. 
Each time a new buffer
+     * is added to the {@code allBuffers}, this field is increased by one.
+     */
+    private int bufferIndex;
+
+    /**
+     * Record the segment id that is writing to. Each time when the segment is 
finished, this filed
+     * is increased by one.
+     */
+    private int segmentIndex;
+
+    // ------------------------------------------------------------------------
+    //  Called by DiskCacheManager
+    // ------------------------------------------------------------------------
+
+    void append(Buffer buffer) {
+        addBuffer(buffer);
+    }
+
+    void appendEndOfSegmentEvent(ByteBuffer record, DataType dataType) {
+        writeEvent(record, dataType);
+        segmentIndex++;
+    }
+
+    /** Note that allBuffers can be touched by multiple threads. */
+    List<Tuple2<Buffer, Integer>> removeAllBuffers() {
+        synchronized (allBuffers) {
+            List<Tuple2<Buffer, Integer>> targetBuffers = new 
ArrayList<>(allBuffers);
+            allBuffers.clear();
+            return targetBuffers;
+        }
+    }
+
+    int getBufferIndex() {
+        return bufferIndex;
+    }
+
+    int getSegmentIndex() {
+        return segmentIndex;
+    }
+
+    void release() {
+        recycleBuffers();
+        checkState(allBuffers.isEmpty(), "Leaking buffers.");
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    private void writeEvent(ByteBuffer event, DataType dataType) {
+        checkArgument(dataType.isEvent());
+
+        MemorySegment data = MemorySegmentFactory.wrap(event.array());
+        addBuffer(new NetworkBuffer(data, FreeingBufferRecycler.INSTANCE, 
dataType, data.size()));
+    }
+
+    /** Note that allBuffers can be touched by multiple threads. */

Review Comment:
   Is this method only called by task thread?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The {@link DiskCacheManager} is responsible for managing cached buffers 
before flushing to files.
+ */
+class DiskCacheManager {
+
+    private final TieredStoragePartitionId partitionId;
+
+    private final int numSubpartitions;
+
+    private final PartitionFileWriter partitionFileWriter;
+
+    private final SubpartitionDiskCacheManager[] subpartitionCacheManagers;
+
+    /** Whether the current flush process has completed. */
+    private CompletableFuture<Void> hasFlushCompleted;
+
+    DiskCacheManager(
+            TieredStoragePartitionId partitionId,
+            int numSubpartitions,
+            TieredStorageMemoryManager storageMemoryManager,
+            PartitionFileWriter partitionFileWriter) {
+        this.partitionId = partitionId;
+        this.numSubpartitions = numSubpartitions;
+        this.partitionFileWriter = partitionFileWriter;
+        this.subpartitionCacheManagers = new 
SubpartitionDiskCacheManager[numSubpartitions];
+        this.hasFlushCompleted = FutureUtils.completedVoidFuture();
+
+        for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+            subpartitionCacheManagers[subpartitionId] = new 
SubpartitionDiskCacheManager();
+        }
+        
storageMemoryManager.listenBufferReclaimRequest(this::notifyFlushCachedBuffers);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Called by DiskTierProducerAgent
+    // ------------------------------------------------------------------------
+
+    /**
+     * Append buffer to {@link DiskCacheManager}.
+     *
+     * @param buffer to be managed by this class.
+     * @param subpartitionId the subpartition of this record.
+     */
+    void append(Buffer buffer, int subpartitionId) {
+        subpartitionCacheManagers[subpartitionId].append(buffer);
+    }
+
+    /**
+     * Append the end-of-segment event to {@link DiskCacheManager}, which 
indicates the segment has
+     * finished.
+     *
+     * @param record the end-of-segment event
+     * @param subpartitionId target subpartition of this record.
+     * @param dataType the type of this record. In other words, is it data or 
event.
+     */
+    void appendEndOfSegmentEvent(ByteBuffer record, int subpartitionId, 
Buffer.DataType dataType) {

Review Comment:
   `Buffer.DataType` dataType seems can be removed safety.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/SubpartitionDiskCacheManager.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link SubpartitionDiskCacheManager} is responsible to manage the 
cached buffers in a single
+ * subpartition.
+ */
+class SubpartitionDiskCacheManager {
+
+    /**
+     * All the buffers. The first field of the tuple is the buffer, while the 
second field of the
+     * buffer is the buffer index.
+     *
+     * <p>Note that this field can be accessed by the task thread or the write 
IO thread, so the
+     * thread safety should be ensured.
+     */
+    private final Deque<Tuple2<Buffer, Integer>> allBuffers = new 
LinkedList<>();
+
+    /**
+     * Record the buffer index in the {@link SubpartitionDiskCacheManager}. 
Each time a new buffer
+     * is added to the {@code allBuffers}, this field is increased by one.
+     */
+    private int bufferIndex;

Review Comment:
   If this field only touched by task thread, we'd better add some explanations.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/SubpartitionDiskCacheManager.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link SubpartitionDiskCacheManager} is responsible to manage the 
cached buffers in a single
+ * subpartition.
+ */
+class SubpartitionDiskCacheManager {
+
+    /**
+     * All the buffers. The first field of the tuple is the buffer, while the 
second field of the
+     * buffer is the buffer index.
+     *
+     * <p>Note that this field can be accessed by the task thread or the write 
IO thread, so the
+     * thread safety should be ensured.
+     */
+    private final Deque<Tuple2<Buffer, Integer>> allBuffers = new 
LinkedList<>();
+
+    /**
+     * Record the buffer index in the {@link SubpartitionDiskCacheManager}. 
Each time a new buffer
+     * is added to the {@code allBuffers}, this field is increased by one.
+     */
+    private int bufferIndex;
+
+    /**
+     * Record the segment id that is writing to. Each time when the segment is 
finished, this filed
+     * is increased by one.
+     */
+    private int segmentIndex;

Review Comment:
   I'm a little suspicious that this field is unsafe.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/** Utils for reading or writing from tiered storage. */
+public class TieredStorageUtils {
+
+    public static final String DATA_FILE_SUFFIX = ".storage.data";

Review Comment:
   ```suggestion
       public static final String DATA_FILE_SUFFIX = ".tier-storage.data";
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The {@link DiskCacheManager} is responsible for managing cached buffers 
before flushing to files.
+ */
+class DiskCacheManager {
+
+    private final TieredStoragePartitionId partitionId;
+
+    private final int numSubpartitions;
+
+    private final PartitionFileWriter partitionFileWriter;
+
+    private final SubpartitionDiskCacheManager[] subpartitionCacheManagers;
+
+    /** Whether the current flush process has completed. */
+    private CompletableFuture<Void> hasFlushCompleted;
+
+    DiskCacheManager(
+            TieredStoragePartitionId partitionId,
+            int numSubpartitions,
+            TieredStorageMemoryManager storageMemoryManager,
+            PartitionFileWriter partitionFileWriter) {
+        this.partitionId = partitionId;
+        this.numSubpartitions = numSubpartitions;
+        this.partitionFileWriter = partitionFileWriter;
+        this.subpartitionCacheManagers = new 
SubpartitionDiskCacheManager[numSubpartitions];
+        this.hasFlushCompleted = FutureUtils.completedVoidFuture();
+
+        for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+            subpartitionCacheManagers[subpartitionId] = new 
SubpartitionDiskCacheManager();
+        }
+        
storageMemoryManager.listenBufferReclaimRequest(this::notifyFlushCachedBuffers);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Called by DiskTierProducerAgent
+    // ------------------------------------------------------------------------
+
+    /**
+     * Append buffer to {@link DiskCacheManager}.
+     *
+     * @param buffer to be managed by this class.
+     * @param subpartitionId the subpartition of this record.
+     */
+    void append(Buffer buffer, int subpartitionId) {
+        subpartitionCacheManagers[subpartitionId].append(buffer);
+    }
+
+    /**
+     * Append the end-of-segment event to {@link DiskCacheManager}, which 
indicates the segment has
+     * finished.
+     *
+     * @param record the end-of-segment event
+     * @param subpartitionId target subpartition of this record.
+     * @param dataType the type of this record. In other words, is it data or 
event.
+     */
+    void appendEndOfSegmentEvent(ByteBuffer record, int subpartitionId, 
Buffer.DataType dataType) {
+        
subpartitionCacheManagers[subpartitionId].appendEndOfSegmentEvent(record, 
dataType);
+
+        // When finishing a segment, the buffers should be flushed because the 
next segment may be
+        // written to another tier. If the buffers in this tier are not 
flushed here, then the next
+        // segment in another tier may be stuck by lacking buffers. This flush 
has a low trigger
+        // frequency, so its impact on performance is relatively small.
+        forceFlushCachedBuffers();
+    }
+
+    /**
+     * Return the current buffer index.
+     *
+     * @param subpartitionId the target subpartition id
+     * @return the finished buffer index
+     */
+    int getBufferIndex(int subpartitionId) {
+        return subpartitionCacheManagers[subpartitionId].getBufferIndex();
+    }
+
+    /** Close this {@link DiskCacheManager}, it means no data can append to 
memory. */
+    void close() {
+        forceFlushCachedBuffers();
+    }
+
+    /**
+     * Release this {@link DiskCacheManager}, it means all memory taken by 
this class will recycle.
+     */
+    void release() {
+        
Arrays.stream(subpartitionCacheManagers).forEach(SubpartitionDiskCacheManager::release);
+        partitionFileWriter.release();
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    private void notifyFlushCachedBuffers() {
+        flushBuffers(false);
+    }
+
+    private void forceFlushCachedBuffers() {
+        flushBuffers(true);
+    }
+
+    /**
+     * Note that the request of flushing buffers may come from the disk check 
thread or the task
+     * thread, so the method itself should ensure the thread safety.
+     */
+    private synchronized void flushBuffers(boolean needForceFlush) {

Review Comment:
   ```suggestion
       private synchronized void flushBuffers(boolean forceFlush) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to