xintongsong commented on code in PR #22804:
URL: https://github.com/apache/flink/pull/22804#discussion_r1240656333


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileIndex.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The {@link PartitionFileIndex} is responsible for storing the indexes of 
data files generated
+ * during the partition file write process and utilized during partition file 
reads. In order to
+ * simplify the representation of consecutive buffers that belong to a single 
subpartition within a
+ * file, these indexes are encapsulated into a {@link Region}. During the 
partition file write
+ * process, the {@link Region}s are generated based on the buffers. During 
partition file reads, the
+ * {@link Region} is used to retrieve consecutive buffers that belong to a 
single subpartition.
+ */

Review Comment:
   ```suggestion
   /**
    * The {@link ProducerMergedPartitionFileIndex} is used by {@link 
ProducerMergePartitionFileWriter}
    * and {@link ProducerMergePartitionFileReader}, to maintain the offset of 
each buffer in the
    * physical file.
    *
    * <p>For efficiency, buffers from the same subpartition that are both 
logically (i.e. index in the
    * subpartition) and physically (i.e. offset in the file) consecutive are 
combined into a {@link
    * Region}.
    *
    * <pre>For example, the following buffers (indicated by 
subpartitionId-bufferIndex):
    *   1-1, 1-2, 1-3, 2-1, 2-2, 2-5, 1-4, 1-5, 2-6
    * will be combined into 5 regions (separated by '|'):
    *   1-1, 1-2, 1-3 | 2-1, 2-2 | 2-5 | 1-4, 1-5 | 2-6
    * </pre>
    */
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileIndex.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The {@link PartitionFileIndex} is responsible for storing the indexes of 
data files generated
+ * during the partition file write process and utilized during partition file 
reads. In order to
+ * simplify the representation of consecutive buffers that belong to a single 
subpartition within a
+ * file, these indexes are encapsulated into a {@link Region}. During the 
partition file write
+ * process, the {@link Region}s are generated based on the buffers. During 
partition file reads, the
+ * {@link Region} is used to retrieve consecutive buffers that belong to a 
single subpartition.
+ */
+public class PartitionFileIndex {
+
+    /**
+     * The regions belonging to each subpartitions.
+     *
+     * <p>Note that the field can be accessed by the writing and reading IO 
thread, so the lock is
+     * to ensure the thread safety.
+     */
+    @GuardedBy("lock")
+    private final List<List<Region>> subpartitionRegions;
+
+    @GuardedBy("lock")
+    private boolean isReleased;
+
+    private final Object lock = new Object();
+
+    public PartitionFileIndex(int numSubpartitions) {
+        this.subpartitionRegions = new ArrayList<>();
+        for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+            subpartitionRegions.add(new ArrayList<>());
+        }
+    }
+
+    /**
+     * Based on the input {@link BufferToFlush}s, generate the {@link Region}s 
accordingly. When the
+     * buffer's subpartition id changes or the buffer index changes, a new 
region is created. For
+     * example, the buffers are as follows(each buffer is represented by
+     * subpartitionId-bufferIndex). 1-1, 1-2, 1-3, 2-1, 2-2, 2-5, 1-4, 1-5, 
2-6, then 5 regions are
+     * generated. |1-1, 1-2, 1-3|2-1, 2-2|2-5|1-4, 1-5|2-6|.
+     *
+     * <p>Note that these buffers are logically partitioned by the region 
indexes logically, but
+     * they remain physically contiguous when flushing to disk.
+     *
+     * @param bufferToFlushes the buffers to be flushed
+     */
+    void generateRegionsBasedOnBuffers(List<BufferToFlush> bufferToFlushes) {
+        if (bufferToFlushes.isEmpty()) {
+            return;
+        }
+
+        Map<Integer, List<Region>> convertedRegions = 
convertToRegions(bufferToFlushes);
+        synchronized (lock) {
+            convertedRegions.forEach(
+                    (subpartition, regions) ->
+                            
subpartitionRegions.get(subpartition).addAll(regions));
+        }
+    }
+
+    /**
+     * Get the {@link Region} of the specific subpartition.
+     *
+     * @param subpartitionId the specific subpartition id
+     * @param regionId the region id to get from the {@link PartitionFileIndex}
+     */
+    public Optional<Region> getRegion(int subpartitionId, int regionId) {

Review Comment:
   How does the caller know the region id?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileIndex.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The {@link PartitionFileIndex} is responsible for storing the indexes of 
data files generated
+ * during the partition file write process and utilized during partition file 
reads. In order to
+ * simplify the representation of consecutive buffers that belong to a single 
subpartition within a
+ * file, these indexes are encapsulated into a {@link Region}. During the 
partition file write
+ * process, the {@link Region}s are generated based on the buffers. During 
partition file reads, the
+ * {@link Region} is used to retrieve consecutive buffers that belong to a 
single subpartition.
+ */
+public class PartitionFileIndex {
+
+    /**
+     * The regions belonging to each subpartitions.
+     *
+     * <p>Note that the field can be accessed by the writing and reading IO 
thread, so the lock is
+     * to ensure the thread safety.
+     */
+    @GuardedBy("lock")
+    private final List<List<Region>> subpartitionRegions;
+
+    @GuardedBy("lock")
+    private boolean isReleased;
+
+    private final Object lock = new Object();
+
+    public PartitionFileIndex(int numSubpartitions) {
+        this.subpartitionRegions = new ArrayList<>();
+        for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+            subpartitionRegions.add(new ArrayList<>());
+        }
+    }
+
+    /**
+     * Based on the input {@link BufferToFlush}s, generate the {@link Region}s 
accordingly. When the
+     * buffer's subpartition id changes or the buffer index changes, a new 
region is created. For
+     * example, the buffers are as follows(each buffer is represented by
+     * subpartitionId-bufferIndex). 1-1, 1-2, 1-3, 2-1, 2-2, 2-5, 1-4, 1-5, 
2-6, then 5 regions are
+     * generated. |1-1, 1-2, 1-3|2-1, 2-2|2-5|1-4, 1-5|2-6|.
+     *
+     * <p>Note that these buffers are logically partitioned by the region 
indexes logically, but
+     * they remain physically contiguous when flushing to disk.
+     *
+     * @param bufferToFlushes the buffers to be flushed
+     */
+    void generateRegionsBasedOnBuffers(List<BufferToFlush> bufferToFlushes) {
+        if (bufferToFlushes.isEmpty()) {
+            return;
+        }
+
+        Map<Integer, List<Region>> convertedRegions = 
convertToRegions(bufferToFlushes);
+        synchronized (lock) {
+            convertedRegions.forEach(
+                    (subpartition, regions) ->
+                            
subpartitionRegions.get(subpartition).addAll(regions));
+        }
+    }
+
+    /**
+     * Get the {@link Region} of the specific subpartition.
+     *
+     * @param subpartitionId the specific subpartition id
+     * @param regionId the region id to get from the {@link PartitionFileIndex}
+     */
+    public Optional<Region> getRegion(int subpartitionId, int regionId) {
+        synchronized (lock) {
+            if (isReleased) {
+                return Optional.empty();
+            }
+            List<Region> currentRegions = 
subpartitionRegions.get(subpartitionId);
+            if (regionId < currentRegions.size()) {
+                return Optional.of(currentRegions.get(regionId));
+            }
+            return Optional.empty();
+        }
+    }
+
+    void release() {
+        synchronized (lock) {
+            subpartitionRegions.clear();
+            isReleased = true;
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    private static Map<Integer, List<Region>> convertToRegions(
+            List<BufferToFlush> bufferToFlushes) {
+        Map<Integer, List<Region>> subpartitionRegionMap = new HashMap<>();
+        Iterator<BufferToFlush> iterator = bufferToFlushes.iterator();
+        BufferToFlush firstBufferInRegion = iterator.next();
+        BufferToFlush lastBufferInRegion = firstBufferInRegion;
+
+        while (iterator.hasNext()) {
+            BufferToFlush currentBuffer = iterator.next();
+            if (currentBuffer.getSubpartitionId() != 
firstBufferInRegion.getSubpartitionId()
+                    || currentBuffer.getBufferIndex() != 
lastBufferInRegion.getBufferIndex() + 1) {
+                // the current buffer belongs to a new region, close the 
previous region
+                addInternalRegionToMap(
+                        firstBufferInRegion, lastBufferInRegion, 
subpartitionRegionMap);
+                firstBufferInRegion = currentBuffer;
+            }
+            lastBufferInRegion = currentBuffer;
+        }
+
+        addInternalRegionToMap(firstBufferInRegion, lastBufferInRegion, 
subpartitionRegionMap);
+        return subpartitionRegionMap;
+    }
+
+    private static void addInternalRegionToMap(
+            BufferToFlush firstBufferInRegion,
+            BufferToFlush lastBufferInRegion,
+            Map<Integer, List<Region>> subpartitionRegionMap) {
+        checkArgument(
+                firstBufferInRegion.getSubpartitionId() == 
lastBufferInRegion.getSubpartitionId());
+        checkArgument(firstBufferInRegion.getBufferIndex() <= 
lastBufferInRegion.getBufferIndex());
+
+        subpartitionRegionMap
+                .computeIfAbsent(firstBufferInRegion.getSubpartitionId(), 
ArrayList::new)
+                .add(
+                        new Region(
+                                firstBufferInRegion.getFileOffset(),
+                                lastBufferInRegion.getBufferIndex()
+                                        - firstBufferInRegion.getBufferIndex()
+                                        + 1));
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Classes
+    // ------------------------------------------------------------------------
+
+    /** Represents a buffer to be flushed. */
+    public static class BufferToFlush {
+        /** The subpartition id that the buffer belongs to. */
+        private final int subpartitionId;
+
+        /** The buffer index within the subpartition. */
+        private final int bufferIndex;
+
+        /** The file offset that the buffer begin with. */
+        private final long fileOffset;
+
+        BufferToFlush(int subpartitionId, int bufferIndex, long fileOffset) {
+            this.subpartitionId = subpartitionId;
+            this.bufferIndex = bufferIndex;
+            this.fileOffset = fileOffset;
+        }
+
+        public int getSubpartitionId() {
+            return subpartitionId;
+        }
+
+        public int getBufferIndex() {
+            return bufferIndex;
+        }
+
+        public long getFileOffset() {
+            return fileOffset;
+        }
+    }
+
+    /**
+     * A {@link Region} represents a series of physically continuous buffers 
in the file, which are
+     * from the same subpartition.
+     */

Review Comment:
   ```suggestion
       /**
        * Represents a series of buffers that are:
        *
        * <ul>
        *   <li>From the same subpartition
        *   <li>Logically (i.e. buffer index) consecutive
        *   <li>Physically (i.e. offset in the file) consecutive
        * </ul>
        */
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileIndex.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The {@link PartitionFileIndex} is responsible for storing the indexes of 
data files generated
+ * during the partition file write process and utilized during partition file 
reads. In order to
+ * simplify the representation of consecutive buffers that belong to a single 
subpartition within a
+ * file, these indexes are encapsulated into a {@link Region}. During the 
partition file write
+ * process, the {@link Region}s are generated based on the buffers. During 
partition file reads, the
+ * {@link Region} is used to retrieve consecutive buffers that belong to a 
single subpartition.
+ */
+public class PartitionFileIndex {

Review Comment:
   ```suggestion
   public class ProducerMergedPartitionFileIndex {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.io.IOException;
+
+/** {@link PartitionFileReader} defines the read logic for different types of 
shuffle files. */
+public interface PartitionFileReader {
+
+    /**
+     * Read a buffer from the partition file.
+     *
+     * @param partitionId the partition id of the buffer
+     * @param subpartitionId the subpartition id of the buffer
+     * @param segmentId the segment id of the buffer
+     * @param bufferIndex the index of buffer
+     * @param memorySegment the empty buffer to store the read buffer
+     * @param recycler the buffer recycler
+     * @return the read buffer
+     */
+    Buffer readBuffer(
+            TieredStoragePartitionId partitionId,
+            TieredStorageSubpartitionId subpartitionId,
+            int segmentId,
+            int bufferIndex,
+            MemorySegment memorySegment,
+            BufferRecycler recycler)
+            throws IOException;
+
+    /**
+     * Get the priority when reading partition file data. The priority may 
improve the read
+     * efficiency or the read performance. For example, use the file offset of 
the reader as the
+     * priority to achieve better disk sequential reading to improve the read 
performance. Note that
+     * the reader priority is not a guaranteed ability for a reader, and all 
the readers will return
+     * the same priority value if the readers have no the priorities.
+     *
+     * @param partitionId the partition id of the buffer
+     * @param subpartitionId the subpartition id of the buffer
+     * @param segmentId the segment id of the buffer
+     * @param bufferIndex the index of buffer
+     * @return the priority of the {@link PartitionFileReader}.
+     */

Review Comment:
   ```suggestion
       /**
        * Get the priority for reading a particular buffer from the partitioned 
file. The priority is
        * defined as, it is suggested to read buffers with higher priority 
(smaller value) in prior to
        * buffers with lower priority (larger value).
        *
        * <p>Depending on the partition file implementation, following the 
suggestions should typically
        * result in better performance and efficiency. This can be achieved by 
e.g. choosing preloaded
        * data over others, optimizing the order of disk access to be more 
sequential, etc.
        *
        * <p>Note: Priorities are suggestions rather than a requirements. The 
caller can still read
        * data in whichever order it wants.
        *
        * @param partitionId the partition id of the buffer
        * @param subpartitionId the subpartition id of the buffer
        * @param segmentId the segment id of the buffer
        * @param bufferIndex the index of buffer
        * @return the priority of the {@link PartitionFileReader}.
        */
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileIndex.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The {@link PartitionFileIndex} is responsible for storing the indexes of 
data files generated
+ * during the partition file write process and utilized during partition file 
reads. In order to
+ * simplify the representation of consecutive buffers that belong to a single 
subpartition within a
+ * file, these indexes are encapsulated into a {@link Region}. During the 
partition file write
+ * process, the {@link Region}s are generated based on the buffers. During 
partition file reads, the
+ * {@link Region} is used to retrieve consecutive buffers that belong to a 
single subpartition.
+ */
+public class PartitionFileIndex {
+
+    /**
+     * The regions belonging to each subpartitions.
+     *
+     * <p>Note that the field can be accessed by the writing and reading IO 
thread, so the lock is
+     * to ensure the thread safety.
+     */
+    @GuardedBy("lock")
+    private final List<List<Region>> subpartitionRegions;
+
+    @GuardedBy("lock")
+    private boolean isReleased;
+
+    private final Object lock = new Object();
+
+    public PartitionFileIndex(int numSubpartitions) {
+        this.subpartitionRegions = new ArrayList<>();
+        for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+            subpartitionRegions.add(new ArrayList<>());
+        }
+    }
+
+    /**
+     * Based on the input {@link BufferToFlush}s, generate the {@link Region}s 
accordingly. When the
+     * buffer's subpartition id changes or the buffer index changes, a new 
region is created. For
+     * example, the buffers are as follows(each buffer is represented by
+     * subpartitionId-bufferIndex). 1-1, 1-2, 1-3, 2-1, 2-2, 2-5, 1-4, 1-5, 
2-6, then 5 regions are
+     * generated. |1-1, 1-2, 1-3|2-1, 2-2|2-5|1-4, 1-5|2-6|.
+     *
+     * <p>Note that these buffers are logically partitioned by the region 
indexes logically, but
+     * they remain physically contiguous when flushing to disk.
+     *
+     * @param bufferToFlushes the buffers to be flushed
+     */
+    void generateRegionsBasedOnBuffers(List<BufferToFlush> bufferToFlushes) {
+        if (bufferToFlushes.isEmpty()) {
+            return;
+        }
+
+        Map<Integer, List<Region>> convertedRegions = 
convertToRegions(bufferToFlushes);
+        synchronized (lock) {
+            convertedRegions.forEach(
+                    (subpartition, regions) ->
+                            
subpartitionRegions.get(subpartition).addAll(regions));
+        }
+    }
+
+    /**
+     * Get the {@link Region} of the specific subpartition.
+     *
+     * @param subpartitionId the specific subpartition id
+     * @param regionId the region id to get from the {@link PartitionFileIndex}
+     */
+    public Optional<Region> getRegion(int subpartitionId, int regionId) {
+        synchronized (lock) {
+            if (isReleased) {
+                return Optional.empty();
+            }
+            List<Region> currentRegions = 
subpartitionRegions.get(subpartitionId);
+            if (regionId < currentRegions.size()) {
+                return Optional.of(currentRegions.get(regionId));
+            }
+            return Optional.empty();
+        }
+    }
+
+    void release() {
+        synchronized (lock) {
+            subpartitionRegions.clear();
+            isReleased = true;
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    private static Map<Integer, List<Region>> convertToRegions(
+            List<BufferToFlush> bufferToFlushes) {
+        Map<Integer, List<Region>> subpartitionRegionMap = new HashMap<>();
+        Iterator<BufferToFlush> iterator = bufferToFlushes.iterator();
+        BufferToFlush firstBufferInRegion = iterator.next();
+        BufferToFlush lastBufferInRegion = firstBufferInRegion;
+
+        while (iterator.hasNext()) {
+            BufferToFlush currentBuffer = iterator.next();
+            if (currentBuffer.getSubpartitionId() != 
firstBufferInRegion.getSubpartitionId()
+                    || currentBuffer.getBufferIndex() != 
lastBufferInRegion.getBufferIndex() + 1) {
+                // the current buffer belongs to a new region, close the 
previous region
+                addInternalRegionToMap(
+                        firstBufferInRegion, lastBufferInRegion, 
subpartitionRegionMap);
+                firstBufferInRegion = currentBuffer;
+            }
+            lastBufferInRegion = currentBuffer;
+        }
+
+        addInternalRegionToMap(firstBufferInRegion, lastBufferInRegion, 
subpartitionRegionMap);
+        return subpartitionRegionMap;
+    }
+
+    private static void addInternalRegionToMap(
+            BufferToFlush firstBufferInRegion,
+            BufferToFlush lastBufferInRegion,
+            Map<Integer, List<Region>> subpartitionRegionMap) {
+        checkArgument(
+                firstBufferInRegion.getSubpartitionId() == 
lastBufferInRegion.getSubpartitionId());
+        checkArgument(firstBufferInRegion.getBufferIndex() <= 
lastBufferInRegion.getBufferIndex());
+
+        subpartitionRegionMap
+                .computeIfAbsent(firstBufferInRegion.getSubpartitionId(), 
ArrayList::new)
+                .add(
+                        new Region(
+                                firstBufferInRegion.getFileOffset(),
+                                lastBufferInRegion.getBufferIndex()
+                                        - firstBufferInRegion.getBufferIndex()
+                                        + 1));
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Classes
+    // ------------------------------------------------------------------------
+
+    /** Represents a buffer to be flushed. */
+    public static class BufferToFlush {

Review Comment:
   ```suggestion
       public static class FlushedBuffers {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileIndex.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The {@link PartitionFileIndex} is responsible for storing the indexes of 
data files generated
+ * during the partition file write process and utilized during partition file 
reads. In order to
+ * simplify the representation of consecutive buffers that belong to a single 
subpartition within a
+ * file, these indexes are encapsulated into a {@link Region}. During the 
partition file write
+ * process, the {@link Region}s are generated based on the buffers. During 
partition file reads, the
+ * {@link Region} is used to retrieve consecutive buffers that belong to a 
single subpartition.
+ */
+public class PartitionFileIndex {
+
+    /**
+     * The regions belonging to each subpartitions.
+     *
+     * <p>Note that the field can be accessed by the writing and reading IO 
thread, so the lock is
+     * to ensure the thread safety.
+     */
+    @GuardedBy("lock")
+    private final List<List<Region>> subpartitionRegions;
+
+    @GuardedBy("lock")
+    private boolean isReleased;
+
+    private final Object lock = new Object();
+
+    public PartitionFileIndex(int numSubpartitions) {
+        this.subpartitionRegions = new ArrayList<>();
+        for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+            subpartitionRegions.add(new ArrayList<>());
+        }
+    }
+
+    /**
+     * Based on the input {@link BufferToFlush}s, generate the {@link Region}s 
accordingly. When the
+     * buffer's subpartition id changes or the buffer index changes, a new 
region is created. For
+     * example, the buffers are as follows(each buffer is represented by
+     * subpartitionId-bufferIndex). 1-1, 1-2, 1-3, 2-1, 2-2, 2-5, 1-4, 1-5, 
2-6, then 5 regions are
+     * generated. |1-1, 1-2, 1-3|2-1, 2-2|2-5|1-4, 1-5|2-6|.
+     *
+     * <p>Note that these buffers are logically partitioned by the region 
indexes logically, but
+     * they remain physically contiguous when flushing to disk.
+     *
+     * @param bufferToFlushes the buffers to be flushed
+     */

Review Comment:
   ```suggestion
       /**
        * Add buffers to the index.
        *
        * @param buffers to be added. Note, the provided buffers are required 
to be physically
        *     consecutive and in the same order as in the file.
        */
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergePartitionFile.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import java.nio.file.Path;
+
+/**
+ * The partition file in the producer-merge mode. In this mode, the shuffle 
data is written in the
+ * producer side, the consumer side need to read multiple producers to get its 
partition data.
+ */
+public class ProducerMergePartitionFile {

Review Comment:
   ```suggestion
   public class ProducerMergedPartitionFile {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileIndex.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The {@link PartitionFileIndex} is responsible for storing the indexes of 
data files generated
+ * during the partition file write process and utilized during partition file 
reads. In order to
+ * simplify the representation of consecutive buffers that belong to a single 
subpartition within a
+ * file, these indexes are encapsulated into a {@link Region}. During the 
partition file write
+ * process, the {@link Region}s are generated based on the buffers. During 
partition file reads, the
+ * {@link Region} is used to retrieve consecutive buffers that belong to a 
single subpartition.
+ */
+public class PartitionFileIndex {
+
+    /**
+     * The regions belonging to each subpartitions.
+     *
+     * <p>Note that the field can be accessed by the writing and reading IO 
thread, so the lock is
+     * to ensure the thread safety.
+     */
+    @GuardedBy("lock")
+    private final List<List<Region>> subpartitionRegions;
+
+    @GuardedBy("lock")
+    private boolean isReleased;
+
+    private final Object lock = new Object();
+
+    public PartitionFileIndex(int numSubpartitions) {
+        this.subpartitionRegions = new ArrayList<>();
+        for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+            subpartitionRegions.add(new ArrayList<>());
+        }
+    }
+
+    /**
+     * Based on the input {@link BufferToFlush}s, generate the {@link Region}s 
accordingly. When the
+     * buffer's subpartition id changes or the buffer index changes, a new 
region is created. For
+     * example, the buffers are as follows(each buffer is represented by
+     * subpartitionId-bufferIndex). 1-1, 1-2, 1-3, 2-1, 2-2, 2-5, 1-4, 1-5, 
2-6, then 5 regions are
+     * generated. |1-1, 1-2, 1-3|2-1, 2-2|2-5|1-4, 1-5|2-6|.
+     *
+     * <p>Note that these buffers are logically partitioned by the region 
indexes logically, but
+     * they remain physically contiguous when flushing to disk.
+     *
+     * @param bufferToFlushes the buffers to be flushed
+     */
+    void generateRegionsBasedOnBuffers(List<BufferToFlush> bufferToFlushes) {

Review Comment:
   ```suggestion
       void addBuffers(List<BufferToFlush> buffers) {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergePartitionFile.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import java.nio.file.Path;
+
+/**
+ * The partition file in the producer-merge mode. In this mode, the shuffle 
data is written in the
+ * producer side, the consumer side need to read multiple producers to get its 
partition data.
+ */
+public class ProducerMergePartitionFile {

Review Comment:
   Same for the reader & writer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to