This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1786e2ddfd1a1df5e907f5719138f63e819cb1a3
Author: Yuxin Tan <[email protected]>
AuthorDate: Mon Jul 17 15:09:26 2023 +0800

    [FLINK-32576][network] ProducerMergedPartitionFileIndex supports caching 
regions and spilling regions to file
---
 .../hybrid/index/FileRegionWriteReadUtils.java     |  42 ++++++
 .../tiered/file/ProducerMergedPartitionFile.java   |   2 +
 .../file/ProducerMergedPartitionFileIndex.java     | 149 ++++++++++++++-------
 .../file/ProducerMergedPartitionFileReader.java    |   9 +-
 .../hybrid/tiered/tier/disk/DiskTierFactory.java   |  19 ++-
 .../partition/hybrid/HybridShuffleTestUtils.java   |   7 +
 .../hybrid/index/FileRegionWriteReadUtilsTest.java |  29 ++++
 .../file/ProducerMergedPartitionFileIndexTest.java |  15 ++-
 .../ProducerMergedPartitionFileReaderTest.java     |  10 +-
 .../ProducerMergedPartitionFileWriterTest.java     |   2 +
 .../TestingProducerMergedPartitionFileIndex.java   |  54 +++++++-
 11 files changed, 275 insertions(+), 63 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java
index 443bdbf863e..1ca689d0c2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.io.network.partition.hybrid.index;
 
 import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex.FixedSizeRegion;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -105,4 +106,45 @@ public class FileRegionWriteReadUtils {
         }
         return new InternalRegion(firstBufferIndex, firstBufferOffset, 
numBuffers, released);
     }
+
+    /**
+     * Write {@link FixedSizeRegion} to {@link FileChannel}.
+     *
+     * <p>Note that this type of region's length is fixed.
+     *
+     * @param channel the file's channel to write.
+     * @param regionBuffer the buffer to write {@link FixedSizeRegion}'s 
header.
+     * @param region the region to be written to channel.
+     */
+    public static void writeFixedSizeRegionToFile(
+            FileChannel channel, ByteBuffer regionBuffer, 
FileDataIndexRegionHelper.Region region)
+            throws IOException {
+        regionBuffer.clear();
+        regionBuffer.putInt(region.getFirstBufferIndex());
+        regionBuffer.putInt(region.getNumBuffers());
+        regionBuffer.putLong(region.getRegionFileOffset());
+        regionBuffer.flip();
+        BufferReaderWriterUtil.writeBuffers(channel, regionBuffer.capacity(), 
regionBuffer);
+    }
+
+    /**
+     * Read {@link FixedSizeRegion} from {@link FileChannel}.
+     *
+     * <p>Note that this type of region's length is fixed.
+     *
+     * @param channel the channel to read.
+     * @param regionBuffer the buffer to read {@link FixedSizeRegion}'s header.
+     * @param fileOffset the file offset to start read.
+     * @return the {@link FixedSizeRegion} that read from this channel.
+     */
+    public static FixedSizeRegion readFixedSizeRegionFromFile(
+            FileChannel channel, ByteBuffer regionBuffer, long fileOffset) 
throws IOException {
+        regionBuffer.clear();
+        BufferReaderWriterUtil.readByteBufferFully(channel, regionBuffer, 
fileOffset);
+        regionBuffer.flip();
+        int firstBufferIndex = regionBuffer.getInt();
+        int numBuffers = regionBuffer.getInt();
+        long firstBufferOffset = regionBuffer.getLong();
+        return new FixedSizeRegion(firstBufferIndex, firstBufferOffset, 
numBuffers);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFile.java
index fef384286b7..00aa0344534 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFile.java
@@ -28,6 +28,8 @@ public class ProducerMergedPartitionFile {
 
     public static final String DATA_FILE_SUFFIX = ".tier-storage.data";
 
+    public static final String INDEX_FILE_SUFFIX = ".tier-storage.index";
+
     public static ProducerMergedPartitionFileWriter createPartitionFileWriter(
             Path dataFilePath, ProducerMergedPartitionFileIndex 
partitionFileIndex) {
         return new ProducerMergedPartitionFileWriter(dataFilePath, 
partitionFileIndex);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java
index 34d86a2caf6..01cc9086d56 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java
@@ -18,18 +18,28 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
 
+import 
org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexCache;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexRegionHelper;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexSpilledRegionManagerImpl;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.index.FileRegionWriteReadUtils;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.TreeMap;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.index.FileRegionWriteReadUtils.allocateAndConfigureBuffer;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
@@ -39,7 +49,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  *
  * <p>For efficiency, buffers from the same subpartition that are both 
logically (i.e. index in the
  * subpartition) and physically (i.e. offset in the file) consecutive are 
combined into a {@link
- * Region}.
+ * FixedSizeRegion}.
  *
  * <pre>For example, the following buffers (indicated by 
subpartitionId-bufferIndex):
  *   1-1, 1-2, 1-3, 2-1, 2-2, 2-5, 1-4, 1-5, 2-6
@@ -49,6 +59,8 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  */
 public class ProducerMergedPartitionFileIndex {
 
+    private final Path indexFilePath;
+
     /**
      * The regions belonging to each subpartitions.
      *
@@ -56,18 +68,26 @@ public class ProducerMergedPartitionFileIndex {
      * to ensure the thread safety.
      */
     @GuardedBy("lock")
-    private final List<TreeMap<Integer, Region>> subpartitionRegions;
-
-    @GuardedBy("lock")
-    private boolean isReleased;
+    private final FileDataIndexCache<FixedSizeRegion> indexCache;
 
     private final Object lock = new Object();
 
-    public ProducerMergedPartitionFileIndex(int numSubpartitions) {
-        this.subpartitionRegions = new ArrayList<>();
-        for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
-            subpartitionRegions.add(new TreeMap<>());
-        }
+    public ProducerMergedPartitionFileIndex(
+            int numSubpartitions,
+            Path indexFilePath,
+            int regionGroupSizeInBytes,
+            long numRetainedInMemoryRegionsMax) {
+        this.indexFilePath = indexFilePath;
+        this.indexCache =
+                new FileDataIndexCache<>(
+                        numSubpartitions,
+                        indexFilePath,
+                        numRetainedInMemoryRegionsMax,
+                        new FileDataIndexSpilledRegionManagerImpl.Factory<>(
+                                regionGroupSizeInBytes,
+                                numRetainedInMemoryRegionsMax,
+                                FixedSizeRegion.REGION_SIZE,
+                                
ProducerMergedPartitionFileDataIndexRegionHelper.INSTANCE));
     }
 
     /**
@@ -81,48 +101,34 @@ public class ProducerMergedPartitionFileIndex {
             return;
         }
 
-        Map<Integer, List<Region>> convertedRegions = 
convertToRegions(buffers);
+        Map<Integer, List<FixedSizeRegion>> convertedRegions = 
convertToRegions(buffers);
         synchronized (lock) {
-            convertedRegions.forEach(
-                    (subpartition, regions) -> {
-                        Map<Integer, Region> regionMap = 
subpartitionRegions.get(subpartition);
-                        for (Region region : regions) {
-                            regionMap.put(region.getFirstBufferIndex(), 
region);
-                        }
-                    });
+            convertedRegions.forEach(indexCache::put);
         }
     }
 
     /**
-     * Get the subpartition's {@link Region} containing the specific buffer 
index.
+     * Get the subpartition's {@link FixedSizeRegion} containing the specific 
buffer index.
      *
      * @param subpartitionId the subpartition id
      * @param bufferIndex the buffer index
      * @return the region containing the buffer index, or return emtpy if the 
region is not found.
      */
-    Optional<Region> getRegion(TieredStorageSubpartitionId subpartitionId, int 
bufferIndex) {
+    Optional<FixedSizeRegion> getRegion(
+            TieredStorageSubpartitionId subpartitionId, int bufferIndex) {
         synchronized (lock) {
-            if (isReleased) {
-                return Optional.empty();
-            }
-            Map.Entry<Integer, Region> regionEntry =
-                    subpartitionRegions
-                            .get(subpartitionId.getSubpartitionId())
-                            .floorEntry(bufferIndex);
-            if (regionEntry == null) {
-                return Optional.empty();
-            }
-            Region region = regionEntry.getValue();
-            return bufferIndex < region.getFirstBufferIndex() + 
region.numBuffers
-                    ? Optional.of(region)
-                    : Optional.empty();
+            return indexCache.get(subpartitionId.getSubpartitionId(), 
bufferIndex);
         }
     }
 
     void release() {
         synchronized (lock) {
-            subpartitionRegions.clear();
-            isReleased = true;
+            try {
+                indexCache.close();
+                IOUtils.deleteFileQuietly(indexFilePath);
+            } catch (IOException e) {
+                ExceptionUtils.rethrow(e);
+            }
         }
     }
 
@@ -130,8 +136,9 @@ public class ProducerMergedPartitionFileIndex {
     //  Internal Methods
     // ------------------------------------------------------------------------
 
-    private static Map<Integer, List<Region>> 
convertToRegions(List<FlushedBuffer> buffers) {
-        Map<Integer, List<Region>> subpartitionRegionMap = new HashMap<>();
+    private static Map<Integer, List<FixedSizeRegion>> convertToRegions(
+            List<FlushedBuffer> buffers) {
+        Map<Integer, List<FixedSizeRegion>> subpartitionRegionMap = new 
HashMap<>();
         Iterator<FlushedBuffer> iterator = buffers.iterator();
         FlushedBuffer firstBufferInRegion = iterator.next();
         FlushedBuffer lastBufferInRegion = firstBufferInRegion;
@@ -155,7 +162,7 @@ public class ProducerMergedPartitionFileIndex {
     private static void addRegionToMap(
             FlushedBuffer firstBufferInRegion,
             FlushedBuffer lastBufferInRegion,
-            Map<Integer, List<Region>> subpartitionRegionMap) {
+            Map<Integer, List<FixedSizeRegion>> subpartitionRegionMap) {
         checkArgument(
                 firstBufferInRegion.getSubpartitionId() == 
lastBufferInRegion.getSubpartitionId());
         checkArgument(firstBufferInRegion.getBufferIndex() <= 
lastBufferInRegion.getBufferIndex());
@@ -163,7 +170,7 @@ public class ProducerMergedPartitionFileIndex {
         subpartitionRegionMap
                 .computeIfAbsent(firstBufferInRegion.getSubpartitionId(), 
ArrayList::new)
                 .add(
-                        new Region(
+                        new FixedSizeRegion(
                                 firstBufferInRegion.getBufferIndex(),
                                 firstBufferInRegion.getFileOffset(),
                                 lastBufferInRegion.getBufferIndex()
@@ -205,6 +212,38 @@ public class ProducerMergedPartitionFileIndex {
         }
     }
 
+    /**
+     * The implementation of {@link FileDataIndexRegionHelper} to writing a 
region to the file or
+     * reading a region from the file.
+     *
+     * <p>Note that this type of region's length is fixed.
+     */
+    static class ProducerMergedPartitionFileDataIndexRegionHelper
+            implements FileDataIndexRegionHelper<FixedSizeRegion> {
+
+        /** Reusable buffer used to read and write the immutable part of 
region. */
+        private final ByteBuffer regionBuffer =
+                allocateAndConfigureBuffer(FixedSizeRegion.REGION_SIZE);
+
+        static final ProducerMergedPartitionFileDataIndexRegionHelper INSTANCE 
=
+                new ProducerMergedPartitionFileDataIndexRegionHelper();
+
+        private ProducerMergedPartitionFileDataIndexRegionHelper() {}
+
+        @Override
+        public void writeRegionToFile(FileChannel channel, FixedSizeRegion 
region)
+                throws IOException {
+            FileRegionWriteReadUtils.writeFixedSizeRegionToFile(channel, 
regionBuffer, region);
+        }
+
+        @Override
+        public FixedSizeRegion readRegionFromFile(FileChannel channel, long 
fileOffset)
+                throws IOException {
+            return FileRegionWriteReadUtils.readFixedSizeRegionFromFile(
+                    channel, regionBuffer, fileOffset);
+        }
+    }
+
     /**
      * Represents a series of buffers that are:
      *
@@ -213,8 +252,12 @@ public class ProducerMergedPartitionFileIndex {
      *   <li>Logically (i.e. buffer index) consecutive
      *   <li>Physically (i.e. offset in the file) consecutive
      * </ul>
+     *
+     * <p>Note that the region has a fixed size.
      */
-    static class Region {
+    public static class FixedSizeRegion implements 
FileDataIndexRegionHelper.Region {
+
+        public static final int REGION_SIZE = Integer.BYTES + Long.BYTES + 
Integer.BYTES;
 
         /** The buffer index of first buffer. */
         private final int firstBufferIndex;
@@ -225,21 +268,35 @@ public class ProducerMergedPartitionFileIndex {
         /** The number of buffers that the region contains. */
         private final int numBuffers;
 
-        Region(int firstBufferIndex, long regionFileOffset, int numBuffers) {
+        public FixedSizeRegion(int firstBufferIndex, long regionFileOffset, 
int numBuffers) {
             this.firstBufferIndex = firstBufferIndex;
             this.regionFileOffset = regionFileOffset;
             this.numBuffers = numBuffers;
         }
 
-        long getRegionFileOffset() {
+        @Override
+        public boolean containBuffer(int bufferIndex) {
+            return bufferIndex >= firstBufferIndex && bufferIndex < 
firstBufferIndex + numBuffers;
+        }
+
+        /** Get the total size in bytes of this region, including the fields 
and the buffers. */
+        @Override
+        public int getSize() {
+            return REGION_SIZE + numBuffers;
+        }
+
+        @Override
+        public long getRegionFileOffset() {
             return regionFileOffset;
         }
 
-        int getNumBuffers() {
+        @Override
+        public int getNumBuffers() {
             return numBuffers;
         }
 
-        int getFirstBufferIndex() {
+        @Override
+        public int getFirstBufferIndex() {
             return firstBufferIndex;
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java
index d82aa78659f..00b8bd49db4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java
@@ -40,7 +40,6 @@ import java.util.Optional;
 
 import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.positionToNextBuffer;
 import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel;
-import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex.Region;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -192,7 +191,8 @@ public class ProducerMergedPartitionFileReader implements 
PartitionFileReader {
             Tuple2<TieredStorageSubpartitionId, Integer> cacheKey, boolean 
removeKey) {
         BufferOffsetCache bufferOffsetCache = 
bufferOffsetCaches.remove(cacheKey);
         if (bufferOffsetCache == null) {
-            Optional<Region> regionOpt = dataIndex.getRegion(cacheKey.f0, 
cacheKey.f1);
+            Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> 
regionOpt =
+                    dataIndex.getRegion(cacheKey.f0, cacheKey.f1);
             return regionOpt.map(region -> new BufferOffsetCache(cacheKey.f1, 
region));
         } else {
             if (removeKey) {
@@ -211,13 +211,14 @@ public class ProducerMergedPartitionFileReader implements 
PartitionFileReader {
      */
     private class BufferOffsetCache {
 
-        private final Region region;
+        private final ProducerMergedPartitionFileIndex.FixedSizeRegion region;
 
         private long fileOffset;
 
         private int nextBufferIndex;
 
-        private BufferOffsetCache(int bufferIndex, Region region) {
+        private BufferOffsetCache(
+                int bufferIndex, 
ProducerMergedPartitionFileIndex.FixedSizeRegion region) {
             this.nextBufferIndex = bufferIndex;
             this.region = region;
             moveFileOffsetToBuffer(bufferIndex);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
index 65288fe01a6..fad6677f4b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFile.DATA_FILE_SUFFIX;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFile.INDEX_FILE_SUFFIX;
 
 /** The implementation of {@link TierFactory} for disk tier. */
 public class DiskTierFactory implements TierFactory {
@@ -51,11 +52,21 @@ public class DiskTierFactory implements TierFactory {
 
     private final float minReservedDiskSpaceFraction;
 
+    private final int regionGroupSizeInBytes;
+
+    private final long numRetainedInMemoryRegionsMax;
+
     public DiskTierFactory(
-            int numBytesPerSegment, int bufferSizeBytes, float 
minReservedDiskSpaceFraction) {
+            int numBytesPerSegment,
+            int bufferSizeBytes,
+            float minReservedDiskSpaceFraction,
+            int regionGroupSizeInBytes,
+            long numRetainedInMemoryRegionsMax) {
         this.numBytesPerSegment = numBytesPerSegment;
         this.bufferSizeBytes = bufferSizeBytes;
         this.minReservedDiskSpaceFraction = minReservedDiskSpaceFraction;
+        this.regionGroupSizeInBytes = regionGroupSizeInBytes;
+        this.numRetainedInMemoryRegionsMax = numRetainedInMemoryRegionsMax;
     }
 
     @Override
@@ -78,7 +89,11 @@ public class DiskTierFactory implements TierFactory {
             Duration bufferRequestTimeout,
             int maxBufferReadAhead) {
         ProducerMergedPartitionFileIndex partitionFileIndex =
-                new ProducerMergedPartitionFileIndex(isBroadcastOnly ? 1 : 
numSubpartitions);
+                new ProducerMergedPartitionFileIndex(
+                        isBroadcastOnly ? 1 : numSubpartitions,
+                        Paths.get(dataFileBasePath + INDEX_FILE_SUFFIX),
+                        regionGroupSizeInBytes,
+                        numRetainedInMemoryRegionsMax);
         Path dataFilePath = Paths.get(dataFileBasePath + DATA_FILE_SUFFIX);
         ProducerMergedPartitionFileWriter partitionFileWriter =
                 ProducerMergedPartitionFile.createPartitionFileWriter(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
index bbb88314b4c..5b0629bd1a5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexRegionHelper;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.index.TestingFileDataIndexRegion;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -116,6 +117,12 @@ public class HybridShuffleTestUtils {
         return regions;
     }
 
+    public static FileDataIndexRegionHelper.Region createSingleFixedSizeRegion(
+            int firstBufferIndex, long firstBufferOffset, int 
numBuffersPerRegion) {
+        return new ProducerMergedPartitionFileIndex.FixedSizeRegion(
+                firstBufferIndex, firstBufferOffset, numBuffersPerRegion);
+    }
+
     public static void assertRegionEquals(
             FileDataIndexRegionHelper.Region expected, 
FileDataIndexRegionHelper.Region region) {
         
assertThat(region.getFirstBufferIndex()).isEqualTo(expected.getFirstBufferIndex());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java
index fc2cf847002..8e04f24186e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.hybrid.index;
 
 import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -34,6 +35,7 @@ import java.util.UUID;
 
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion.HEADER_SIZE;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.assertRegionEquals;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createSingleFixedSizeRegion;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -78,6 +80,33 @@ class FileRegionWriteReadUtilsTest {
         assertRegionEquals(readRegion, region);
     }
 
+    @Test
+    void testReadPrematureEndOfFileForFixedSizeRegion(@TempDir Path tmpPath) 
throws Exception {
+        FileChannel channel = tmpFileChannel(tmpPath);
+        ByteBuffer buffer = 
FileRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE);
+        FileRegionWriteReadUtils.writeFixedSizeRegionToFile(
+                channel, buffer, createSingleFixedSizeRegion(0, 0L, 1));
+        channel.truncate(channel.position() - 1);
+        buffer.flip();
+        assertThatThrownBy(
+                        () ->
+                                
FileRegionWriteReadUtils.readFixedSizeRegionFromFile(
+                                        channel, buffer, 0L))
+                .isInstanceOf(IOException.class);
+    }
+
+    @Test
+    void testWriteAndReadFixedSizeRegion(@TempDir Path tmpPath) throws 
Exception {
+        FileChannel channel = tmpFileChannel(tmpPath);
+        ByteBuffer buffer = 
FileRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE);
+        FileDataIndexRegionHelper.Region region = 
createSingleFixedSizeRegion(10, 100L, 1);
+        FileRegionWriteReadUtils.writeFixedSizeRegionToFile(channel, buffer, 
region);
+        buffer.flip();
+        ProducerMergedPartitionFileIndex.FixedSizeRegion readRegion =
+                FileRegionWriteReadUtils.readFixedSizeRegionFromFile(channel, 
buffer, 0L);
+        assertRegionEquals(readRegion, region);
+    }
+
     private static FileChannel tmpFileChannel(Path tempPath) throws 
IOException {
         return FileChannel.open(
                 
Files.createFile(tempPath.resolve(UUID.randomUUID().toString())),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndexTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndexTest.java
index 74d96f1c6d1..615e2235643 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndexTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndexTest.java
@@ -21,8 +21,11 @@ package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
 import org.apache.flink.api.java.tuple.Tuple2;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -35,13 +38,21 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link ProducerMergedPartitionFileIndex}. */
 class ProducerMergedPartitionFileIndexTest {
 
+    private Path indexFilePath;
+
+    @BeforeEach
+    void before(@TempDir Path tempDir) {
+        this.indexFilePath = tempDir.resolve(".index");
+    }
+
     @Test
     void testAddBufferAndGetRegion() {
         int numSubpartitions = 5;
         int numBuffersPerSubpartition = 10;
 
         ProducerMergedPartitionFileIndex partitionFileIndex =
-                new ProducerMergedPartitionFileIndex(numSubpartitions);
+                new ProducerMergedPartitionFileIndex(
+                        numSubpartitions, indexFilePath, 256, Long.MAX_VALUE);
 
         List<ProducerMergedPartitionFileIndex.FlushedBuffer> flushedBuffers = 
new ArrayList<>();
         Tuple2<Integer, Integer> numExpectedRegionsAndMaxBufferIndex =
@@ -91,7 +102,7 @@ class ProducerMergedPartitionFileIndexTest {
         for (int i = 0; i < numSubpartitions; i++) {
             subpartitionFirstBufferIndexes.add(new HashSet<>());
             for (int j = 0; j <= maxBufferIndex; j++) {
-                Optional<ProducerMergedPartitionFileIndex.Region> region =
+                Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> 
region =
                         partitionFileIndex.getRegion(new 
TieredStorageSubpartitionId(i), j);
                 if (region.isPresent()) {
                     
subpartitionFirstBufferIndexes.get(i).add(region.get().getFirstBufferIndex());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReaderTest.java
index 7d6f3b25da0..fc29e07c045 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReaderTest.java
@@ -58,6 +58,8 @@ class ProducerMergedPartitionFileReaderTest {
 
     private static final String DEFAULT_TEST_FILE_NAME = "testFile";
 
+    private static final String DEFAULT_TEST_INDEX_NAME = "testIndex";
+
     private static final TieredStoragePartitionId DEFAULT_PARTITION_ID =
             TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
 
@@ -72,8 +74,10 @@ class ProducerMergedPartitionFileReaderTest {
 
     @BeforeEach
     void before() throws ExecutionException, InterruptedException {
+        Path testIndexPath = new File(tempFolder.toFile(), 
DEFAULT_TEST_INDEX_NAME).toPath();
         ProducerMergedPartitionFileIndex partitionFileIndex =
-                new ProducerMergedPartitionFileIndex(DEFAULT_NUM_SUBPARTITION);
+                new ProducerMergedPartitionFileIndex(
+                        DEFAULT_NUM_SUBPARTITION, testIndexPath, 256, 
Long.MAX_VALUE);
         testFilePath = new File(tempFolder.toFile(), 
DEFAULT_TEST_FILE_NAME).toPath();
         ProducerMergedPartitionFileWriter partitionFileWriter =
                 new ProducerMergedPartitionFileWriter(testFilePath, 
partitionFileIndex);
@@ -133,11 +137,13 @@ class ProducerMergedPartitionFileReaderTest {
         AtomicInteger indexQueryTime = new AtomicInteger(0);
         TestingProducerMergedPartitionFileIndex partitionFileIndex =
                 new TestingProducerMergedPartitionFileIndex.Builder()
+                        .setIndexFilePath(new File(tempFolder.toFile(), 
"test-Index").toPath())
                         .setGetRegionFunction(
                                 (subpartitionId, integer) -> {
                                     indexQueryTime.incrementAndGet();
                                     return Optional.of(
-                                            new 
ProducerMergedPartitionFileIndex.Region(0, 0, 2));
+                                            new 
ProducerMergedPartitionFileIndex.FixedSizeRegion(
+                                                    0, 0, 2));
                                 })
                         .build();
         partitionFileReader =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileWriterTest.java
index ddde5377567..be93b80b72b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileWriterTest.java
@@ -50,6 +50,7 @@ class ProducerMergedPartitionFileWriterTest {
         AtomicInteger receivedBuffers = new AtomicInteger(0);
         TestingProducerMergedPartitionFileIndex partitionFileIndex =
                 new TestingProducerMergedPartitionFileIndex.Builder()
+                        .setIndexFilePath(new File(tempFolder.toFile(), 
"testIndex").toPath())
                         .setAddBuffersConsumer(buffers -> 
receivedBuffers.getAndAdd(buffers.size()))
                         .build();
 
@@ -83,6 +84,7 @@ class ProducerMergedPartitionFileWriterTest {
         AtomicBoolean isReleased = new AtomicBoolean(false);
         TestingProducerMergedPartitionFileIndex partitionFileIndex =
                 new TestingProducerMergedPartitionFileIndex.Builder()
+                        .setIndexFilePath(new File(tempFolder.toFile(), 
"testIndex").toPath())
                         .setReleaseRunnable(() -> isReleased.set(true))
                         .build();
         ProducerMergedPartitionFileWriter partitionFileWriter =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/TestingProducerMergedPartitionFileIndex.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/TestingProducerMergedPartitionFileIndex.java
index 74a4bc8708d..52859f3b21c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/TestingProducerMergedPartitionFileIndex.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/TestingProducerMergedPartitionFileIndex.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
 
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 
+import java.nio.file.Path;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.BiFunction;
@@ -30,17 +31,25 @@ public class TestingProducerMergedPartitionFileIndex 
extends ProducerMergedParti
 
     private final Consumer<List<FlushedBuffer>> addBuffersConsumer;
 
-    private final BiFunction<TieredStorageSubpartitionId, Integer, 
Optional<Region>>
+    private final BiFunction<TieredStorageSubpartitionId, Integer, 
Optional<FixedSizeRegion>>
             getRegionFunction;
 
     private final Runnable releaseRunnable;
 
     private TestingProducerMergedPartitionFileIndex(
             int numSubpartitions,
+            Path indexFilePath,
+            int regionGroupSizeInBytes,
+            long numRetainedInMemoryRegionsMax,
             Consumer<List<FlushedBuffer>> addBuffersConsumer,
-            BiFunction<TieredStorageSubpartitionId, Integer, Optional<Region>> 
getRegionFunction,
+            BiFunction<TieredStorageSubpartitionId, Integer, 
Optional<FixedSizeRegion>>
+                    getRegionFunction,
             Runnable releaseRunnable) {
-        super(numSubpartitions);
+        super(
+                numSubpartitions,
+                indexFilePath,
+                regionGroupSizeInBytes,
+                numRetainedInMemoryRegionsMax);
         this.addBuffersConsumer = addBuffersConsumer;
         this.getRegionFunction = getRegionFunction;
         this.releaseRunnable = releaseRunnable;
@@ -52,7 +61,8 @@ public class TestingProducerMergedPartitionFileIndex extends 
ProducerMergedParti
     }
 
     @Override
-    Optional<Region> getRegion(TieredStorageSubpartitionId subpartitionId, int 
bufferIndex) {
+    Optional<FixedSizeRegion> getRegion(
+            TieredStorageSubpartitionId subpartitionId, int bufferIndex) {
         return getRegionFunction.apply(subpartitionId, bufferIndex);
     }
 
@@ -66,9 +76,15 @@ public class TestingProducerMergedPartitionFileIndex extends 
ProducerMergedParti
 
         private int numSubpartitions = 1;
 
+        private Path indexFilePath = null;
+
+        private int regionGroupSizeInBytes = 256;
+
+        private long numRetainedInMemoryRegionsMax = Long.MAX_VALUE;
+
         private Consumer<List<FlushedBuffer>> addBuffersConsumer = 
flushedBuffers -> {};
 
-        private BiFunction<TieredStorageSubpartitionId, Integer, 
Optional<Region>>
+        private BiFunction<TieredStorageSubpartitionId, Integer, 
Optional<FixedSizeRegion>>
                 getRegionFunction = (tieredStorageSubpartitionId, integer) -> 
Optional.empty();
 
         private Runnable releaseRunnable = () -> {};
@@ -81,6 +97,24 @@ public class TestingProducerMergedPartitionFileIndex extends 
ProducerMergedParti
             return this;
         }
 
+        public TestingProducerMergedPartitionFileIndex.Builder 
setIndexFilePath(
+                Path indexFilePath) {
+            this.indexFilePath = indexFilePath;
+            return this;
+        }
+
+        public TestingProducerMergedPartitionFileIndex.Builder 
setRegionGroupSizeInBytes(
+                int regionGroupSizeInBytes) {
+            this.regionGroupSizeInBytes = regionGroupSizeInBytes;
+            return this;
+        }
+
+        public TestingProducerMergedPartitionFileIndex.Builder 
setNumRetainedInMemoryRegionsMax(
+                long numRetainedInMemoryRegionsMax) {
+            this.numRetainedInMemoryRegionsMax = numRetainedInMemoryRegionsMax;
+            return this;
+        }
+
         public TestingProducerMergedPartitionFileIndex.Builder 
setAddBuffersConsumer(
                 Consumer<List<FlushedBuffer>> addBuffersConsumer) {
             this.addBuffersConsumer = addBuffersConsumer;
@@ -88,7 +122,7 @@ public class TestingProducerMergedPartitionFileIndex extends 
ProducerMergedParti
         }
 
         public TestingProducerMergedPartitionFileIndex.Builder 
setGetRegionFunction(
-                BiFunction<TieredStorageSubpartitionId, Integer, 
Optional<Region>>
+                BiFunction<TieredStorageSubpartitionId, Integer, 
Optional<FixedSizeRegion>>
                         getRegionFunction) {
             this.getRegionFunction = getRegionFunction;
             return this;
@@ -102,7 +136,13 @@ public class TestingProducerMergedPartitionFileIndex 
extends ProducerMergedParti
 
         public TestingProducerMergedPartitionFileIndex build() {
             return new TestingProducerMergedPartitionFileIndex(
-                    numSubpartitions, addBuffersConsumer, getRegionFunction, 
releaseRunnable);
+                    numSubpartitions,
+                    indexFilePath,
+                    regionGroupSizeInBytes,
+                    numRetainedInMemoryRegionsMax,
+                    addBuffersConsumer,
+                    getRegionFunction,
+                    releaseRunnable);
         }
     }
 }


Reply via email to