This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0ef0dbd4077161d47b29cc6130a2da3bc2a8048b
Author: Yuxin Tan <[email protected]>
AuthorDate: Mon Jul 17 15:02:04 2023 +0800

    [FLINK-32576][network] Extract file data index cache for hybrid shuffle
---
 .../generated/all_taskmanager_network_section.html |   4 +-
 .../netty_shuffle_environment_configuration.html   |   4 +-
 .../NettyShuffleEnvironmentOptions.java            |  10 +-
 .../io/network/NettyShuffleServiceFactory.java     |   2 +-
 .../network/partition/ResultPartitionFactory.java  |   8 +-
 .../partition/hybrid/HsFileDataIndexImpl.java      | 125 ++++---
 .../HsFileDataIndexSpilledRegionManagerImpl.java   | 402 --------------------
 .../partition/hybrid/HsResultPartition.java        |   2 +-
 .../hybrid/HybridShuffleConfiguration.java         |  20 +-
 .../FileDataIndexCache.java}                       |  65 ++--
 .../hybrid/index/FileDataIndexRegionHelper.java    |  92 +++++
 .../FileDataIndexSpilledRegionManager.java}        |  17 +-
 .../FileDataIndexSpilledRegionManagerImpl.java     | 415 +++++++++++++++++++++
 .../FileRegionWriteReadUtils.java}                 |  26 +-
 .../NettyShuffleEnvironmentConfiguration.java      |  13 +-
 .../partition/hybrid/HybridShuffleTestUtils.java   |  55 ++-
 .../TestingFileDataIndexSpilledRegionManager.java  |  38 +-
 .../FileDataIndexCacheTest.java}                   |  72 ++--
 ...FileDataIndexSpilledRegionManagerImplTest.java} | 142 ++++---
 .../FileRegionWriteReadUtilsTest.java}             |  36 +-
 .../hybrid/index/TestingFileDataIndexRegion.java   | 176 +++++++++
 .../index/TestingFileDataIndexRegionHelper.java    |  86 +++++
 22 files changed, 1118 insertions(+), 692 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html 
b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
index b82a5bee720..e71647520cd 100644
--- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
+++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
@@ -39,10 +39,10 @@
             <td>Controls the max number of hybrid retained regions in 
memory.</td>
         </tr>
         <tr>
-            
<td><h5>taskmanager.network.hybrid-shuffle.spill-index-segment-size</h5></td>
+            
<td><h5>taskmanager.network.hybrid-shuffle.spill-index-region-group-size</h5></td>
             <td style="word-wrap: break-word;">1024</td>
             <td>Integer</td>
-            <td>Controls the segment size(in bytes) of hybrid spilled file 
data index.</td>
+            <td>Controls the region group size(in bytes) of hybrid spilled 
file data index. </td>
         </tr>
         <tr>
             <td><h5>taskmanager.network.max-num-tcp-connections</h5></td>
diff --git 
a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
 
b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
index 9730839c0c4..6cd4621656a 100644
--- 
a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
@@ -57,10 +57,10 @@
             <td>Controls the max number of hybrid retained regions in 
memory.</td>
         </tr>
         <tr>
-            
<td><h5>taskmanager.network.hybrid-shuffle.spill-index-segment-size</h5></td>
+            
<td><h5>taskmanager.network.hybrid-shuffle.spill-index-region-group-size</h5></td>
             <td style="word-wrap: break-word;">1024</td>
             <td>Integer</td>
-            <td>Controls the segment size(in bytes) of hybrid spilled file 
data index.</td>
+            <td>Controls the region group size(in bytes) of hybrid spilled 
file data index. </td>
         </tr>
         <tr>
             <td><h5>taskmanager.network.max-num-tcp-connections</h5></td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index 1af9deadf35..d1fbe2474ce 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -315,14 +315,16 @@ public class NettyShuffleEnvironmentOptions {
                                     // this raw value must be changed 
correspondingly
                                     
"taskmanager.memory.framework.off-heap.batch-shuffle.size"));
 
-    /** Segment size of hybrid spilled file data index. */
+    /** Region group size of hybrid spilled file data index. */
     @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
-    public static final ConfigOption<Integer> 
HYBRID_SHUFFLE_SPILLED_INDEX_SEGMENT_SIZE =
-            key("taskmanager.network.hybrid-shuffle.spill-index-segment-size")
+    public static final ConfigOption<Integer> 
HYBRID_SHUFFLE_SPILLED_INDEX_REGION_GROUP_SIZE =
+            
key("taskmanager.network.hybrid-shuffle.spill-index-region-group-size")
                     .intType()
                     .defaultValue(1024)
+                    .withDeprecatedKeys(
+                            
"taskmanager.network.hybrid-shuffle.spill-index-segment-size")
                     .withDescription(
-                            "Controls the segment size(in bytes) of hybrid 
spilled file data index.");
+                            "Controls the region group size(in bytes) of 
hybrid spilled file data index. ");
 
     /** Max number of hybrid retained regions in memory. */
     @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index b20f5820e4f..de8ba5f120b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -211,7 +211,7 @@ public class NettyShuffleServiceFactory
                         config.sortShuffleMinParallelism(),
                         config.isSSLEnabled(),
                         config.getMaxOverdraftBuffersPerGate(),
-                        config.getHybridShuffleSpilledIndexSegmentSize(),
+                        config.getHybridShuffleSpilledIndexRegionGroupSize(),
                         
config.getHybridShuffleNumRetainedInMemoryRegionsMax());
 
         SingleInputGateFactory singleInputGateFactory =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 069536e8664..4642312da87 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -75,7 +75,7 @@ public class ResultPartitionFactory {
 
     private final int sortShuffleMinParallelism;
 
-    private final int hybridShuffleSpilledIndexSegmentSize;
+    private final int hybridShuffleSpilledIndexRegionGroupSize;
 
     private final long hybridShuffleNumRetainedInMemoryRegionsMax;
 
@@ -100,7 +100,7 @@ public class ResultPartitionFactory {
             int sortShuffleMinParallelism,
             boolean sslEnabled,
             int maxOverdraftBuffersPerGate,
-            int hybridShuffleSpilledIndexSegmentSize,
+            int hybridShuffleSpilledIndexRegionGroupSize,
             long hybridShuffleNumRetainedInMemoryRegionsMax) {
 
         this.partitionManager = partitionManager;
@@ -119,7 +119,7 @@ public class ResultPartitionFactory {
         this.sortShuffleMinParallelism = sortShuffleMinParallelism;
         this.sslEnabled = sslEnabled;
         this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate;
-        this.hybridShuffleSpilledIndexSegmentSize = 
hybridShuffleSpilledIndexSegmentSize;
+        this.hybridShuffleSpilledIndexRegionGroupSize = 
hybridShuffleSpilledIndexRegionGroupSize;
         this.hybridShuffleNumRetainedInMemoryRegionsMax =
                 hybridShuffleNumRetainedInMemoryRegionsMax;
     }
@@ -261,7 +261,7 @@ public class ResultPartitionFactory {
                         resultPartitionType == ResultPartitionType.HYBRID_FULL
                                 ? 
HybridShuffleConfiguration.SpillingStrategyType.FULL
                                 : 
HybridShuffleConfiguration.SpillingStrategyType.SELECTIVE)
-                
.setSpilledIndexSegmentSize(hybridShuffleSpilledIndexSegmentSize)
+                
.setRegionGroupSizeInBytes(hybridShuffleSpilledIndexRegionGroupSize)
                 
.setNumRetainedInMemoryRegionsMax(hybridShuffleNumRetainedInMemoryRegionsMax)
                 .build();
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
index c832d715329..4b4221225cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
@@ -18,12 +18,18 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid;
 
+import 
org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexCache;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexRegionHelper;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexSpilledRegionManagerImpl;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.index.FileRegionWriteReadUtils;
 import org.apache.flink.util.ExceptionUtils;
 
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -34,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.index.FileRegionWriteReadUtils.allocateAndConfigureBuffer;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** Default implementation of {@link HsFileDataIndex}. */
@@ -41,25 +48,26 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 public class HsFileDataIndexImpl implements HsFileDataIndex {
 
     @GuardedBy("lock")
-    private final HsFileDataIndexCache indexCache;
+    private final FileDataIndexCache<InternalRegion> indexCache;
 
-    /**
-     * {@link HsFileDataIndexCache} is not thread-safe, any access to it needs 
to hold this lock.
-     */
+    /** {@link FileDataIndexCache} is not thread-safe, any access to it needs 
to hold this lock. */
     private final Object lock = new Object();
 
     public HsFileDataIndexImpl(
             int numSubpartitions,
             Path indexFilePath,
-            int spilledIndexSegmentSize,
+            int regionGroupSizeInBytes,
             long numRetainedInMemoryRegionsMax) {
         this.indexCache =
-                new HsFileDataIndexCache(
+                new FileDataIndexCache<>(
                         numSubpartitions,
                         indexFilePath,
                         numRetainedInMemoryRegionsMax,
-                        new HsFileDataIndexSpilledRegionManagerImpl.Factory(
-                                spilledIndexSegmentSize, 
numRetainedInMemoryRegionsMax));
+                        new FileDataIndexSpilledRegionManagerImpl.Factory<>(
+                                regionGroupSizeInBytes,
+                                numRetainedInMemoryRegionsMax,
+                                InternalRegion.HEADER_SIZE,
+                                HsFileDataIndexRegionHelper.INSTANCE));
     }
 
     @Override
@@ -163,24 +171,11 @@ public class HsFileDataIndexImpl implements 
HsFileDataIndex {
     }
 
     /**
-     * A {@link InternalRegion} represents a series of physically continuous 
buffers in the file,
-     * which are from the same subpartition, and has sequential buffer index.
-     *
-     * <p>The following example illustrates some physically continuous buffers 
in a file and regions
-     * upon them, where `x-y` denotes buffer from subpartition x with buffer 
index y, and `()`
-     * denotes a region.
-     *
-     * <p>(1-1, 1-2), (2-1), (2-2, 2-3), (1-5, 1-6), (1-4)
-     *
-     * <p>Note: The file may not contain all the buffers. E.g., 1-3 is missing 
in the above example.
-     *
-     * <p>Note: Buffers in file may have different orders than their buffer 
index. E.g., 1-4 comes
-     * after 1-6 in the above example.
-     *
-     * <p>Note: This index may not always maintain the longest possible 
regions. E.g., 2-1, 2-2, 2-3
-     * are in two separate regions.
+     * A {@link InternalRegion} is an implementation of {@link 
FileDataIndexRegionHelper.Region}.
+     * Note that this class introduced a new field to indicate whether each 
buffer in the region is
+     * released.
      */
-    static class InternalRegion {
+    public static class InternalRegion implements 
FileDataIndexRegionHelper.Region {
         /**
          * {@link InternalRegion} is consists of header and payload. 
(firstBufferIndex,
          * firstBufferOffset, numBuffer) are immutable header part that have 
fixed size. The array
@@ -189,30 +184,51 @@ public class HsFileDataIndexImpl implements 
HsFileDataIndex {
         public static final int HEADER_SIZE = Integer.BYTES + Long.BYTES + 
Integer.BYTES;
 
         private final int firstBufferIndex;
-        private final long firstBufferOffset;
+        private final long regionFileOffset;
         private final int numBuffers;
         private final boolean[] released;
 
-        private InternalRegion(int firstBufferIndex, long firstBufferOffset, 
int numBuffers) {
+        private InternalRegion(int firstBufferIndex, long regionFileOffset, 
int numBuffers) {
             this.firstBufferIndex = firstBufferIndex;
-            this.firstBufferOffset = firstBufferOffset;
+            this.regionFileOffset = regionFileOffset;
             this.numBuffers = numBuffers;
             this.released = new boolean[numBuffers];
             Arrays.fill(released, false);
         }
 
-        InternalRegion(
-                int firstBufferIndex, long firstBufferOffset, int numBuffers, 
boolean[] released) {
+        public InternalRegion(
+                int firstBufferIndex, long regionFileOffset, int numBuffers, 
boolean[] released) {
             this.firstBufferIndex = firstBufferIndex;
-            this.firstBufferOffset = firstBufferOffset;
+            this.regionFileOffset = regionFileOffset;
             this.numBuffers = numBuffers;
             this.released = released;
         }
 
-        boolean containBuffer(int bufferIndex) {
+        @Override
+        public boolean containBuffer(int bufferIndex) {
             return bufferIndex >= firstBufferIndex && bufferIndex < 
firstBufferIndex + numBuffers;
         }
 
+        @Override
+        public int getSize() {
+            return HEADER_SIZE + numBuffers;
+        }
+
+        @Override
+        public int getFirstBufferIndex() {
+            return firstBufferIndex;
+        }
+
+        @Override
+        public long getRegionFileOffset() {
+            return regionFileOffset;
+        }
+
+        @Override
+        public int getNumBuffers() {
+            return numBuffers;
+        }
+
         private HsFileDataIndex.ReadableRegion toReadableRegion(
                 int bufferIndex, int consumingOffset) {
             int nSkip = bufferIndex - firstBufferIndex;
@@ -223,32 +239,49 @@ public class HsFileDataIndexImpl implements 
HsFileDataIndex {
                 }
                 ++nReadable;
             }
-            return new ReadableRegion(nSkip, nReadable, firstBufferOffset);
+            return new ReadableRegion(nSkip, nReadable, regionFileOffset);
         }
 
         private void markBufferReleased(int bufferIndex) {
             released[bufferIndex - firstBufferIndex] = true;
         }
 
-        /** Get the total size in bytes of this region, including header and 
payload. */
-        int getSize() {
-            return HEADER_SIZE + numBuffers;
+        public boolean[] getReleased() {
+            return released;
         }
+    }
 
-        int getFirstBufferIndex() {
-            return firstBufferIndex;
-        }
+    /**
+     * The implementation of {@link FileDataIndexRegionHelper} to writing a 
region to the file or
+     * reading a region from the file.
+     *
+     * <p>Note that this type of region's length may be variable because it 
contains an array to
+     * indicate each buffer's release state.
+     */
+    public static class HsFileDataIndexRegionHelper
+            implements FileDataIndexRegionHelper<InternalRegion> {
 
-        long getFirstBufferOffset() {
-            return firstBufferOffset;
-        }
+        /** Reusable buffer used to read and write the immutable part of 
region. */
+        private final ByteBuffer regionHeaderBuffer =
+                
allocateAndConfigureBuffer(HsFileDataIndexImpl.InternalRegion.HEADER_SIZE);
 
-        int getNumBuffers() {
-            return numBuffers;
+        public static final HsFileDataIndexRegionHelper INSTANCE =
+                new HsFileDataIndexRegionHelper();
+
+        private HsFileDataIndexRegionHelper() {}
+
+        @Override
+        public void writeRegionToFile(FileChannel channel, InternalRegion 
region)
+                throws IOException {
+            FileRegionWriteReadUtils.writeHsInternalRegionToFile(
+                    channel, regionHeaderBuffer, region);
         }
 
-        boolean[] getReleased() {
-            return released;
+        @Override
+        public InternalRegion readRegionFromFile(FileChannel channel, long 
fileOffset)
+                throws IOException {
+            return FileRegionWriteReadUtils.readHsInternalRegionFromFile(
+                    channel, regionHeaderBuffer, fileOffset);
         }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImpl.java
deleted file mode 100644
index c87c0242b7e..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImpl.java
+++ /dev/null
@@ -1,402 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition.hybrid;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
-import org.apache.flink.util.ExceptionUtils;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.TreeMap;
-import java.util.function.BiConsumer;
-
-import static 
org.apache.flink.runtime.io.network.partition.hybrid.InternalRegionWriteReadUtils.allocateAndConfigureBuffer;
-
-/**
- * Default implementation of {@link HsFileDataIndexSpilledRegionManager}. This 
manager will handle
- * and spill regions in the following way:
- *
- * <ul>
- *   <li>All regions will be written to the same file, namely index file.
- *   <li>Multiple regions belonging to the same subpartition form a segment.
- *   <li>The regions in the same segment have no special relationship, but are 
only related to the
- *       order in which they are spilled.
- *   <li>Each segment is independent. Even if the previous segment is not 
full, the next segment can
- *       still be allocated.
- *   <li>If a region has been written to the index file already, spill it 
again will overwrite the
- *       previous region.
- *   <li>The very large region will monopolize a single segment.
- * </ul>
- *
- * <p>The relationships between index file and segment are shown below.
- *
- * <pre>
- *
- *         - - - - - - - - - Index File - - — - - - - - - - - -
- *        |                                                     |
- *        | - - — -Segment1 - - - -   - - - - Segment2- - - -   |
- *        ||SP1 R1||SP1 R2| Free | |SP2 R3| SP2 R1| SP2 R2 |  |
- *        | - - - - - - - - - - - -   - - - - - - - - - - - -   |
- *        |                                                     |
- *        | - - - - - - - -Segment3 - - - - - - -               |
- *        ||              Big Region             |              |
- *        | - - - - - - - - - - - - - - - - - - -               |
- *         - - - - - - - - - - - - - - - - - - - - - -- - - - -
- * </pre>
- */
-public class HsFileDataIndexSpilledRegionManagerImpl
-        implements HsFileDataIndexSpilledRegionManager {
-
-    /** Reusable buffer used to read and write the immutable part of region. */
-    private final ByteBuffer regionHeaderBuffer =
-            allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE);
-
-    /**
-     * List of subpartition's segment meta. Each element is a treeMap contains 
all {@link
-     * SegmentMeta}'s of specific subpartition corresponding to the subscript. 
The value of this
-     * treeMap is a {@link SegmentMeta}, and the key is minBufferIndex of this 
segment. Only
-     * finished(i.e. no longer appended) segment will be put to here.
-     */
-    private final List<TreeMap<Integer, SegmentMeta>> 
subpartitionFinishedSegmentMetas;
-
-    private FileChannel channel;
-
-    /** The Offset of next segment, new segment will start from this offset. */
-    private long nextSegmentOffset = 0L;
-
-    private final long[] subpartitionCurrentOffset;
-
-    /** Free space of every subpartition's current segment. */
-    private final int[] subpartitionFreeSpaceInBytes;
-
-    /** Metadata of every subpartition's current segment. */
-    private final SegmentMeta[] currentSegmentMeta;
-
-    /**
-     * Default size of segment. If the size of a region is larger than this 
value, it will be
-     * allocated and occupy a single segment.
-     */
-    private final int segmentSizeInBytes;
-
-    /**
-     * This consumer is used to load region to cache. The first parameter is 
subpartition id, and
-     * second parameter is the region to load.
-     */
-    private final BiConsumer<Integer, InternalRegion> cacheRegionConsumer;
-
-    /**
-     * When region in segment needs to be loaded to cache, whether to load all 
regions of the entire
-     * segment.
-     */
-    private final boolean loadEntireSegmentToCache;
-
-    public HsFileDataIndexSpilledRegionManagerImpl(
-            int numSubpartitions,
-            Path indexFilePath,
-            int segmentSizeInBytes,
-            long maxCacheCapacity,
-            BiConsumer<Integer, InternalRegion> cacheRegionConsumer) {
-        try {
-            this.channel =
-                    FileChannel.open(
-                            indexFilePath,
-                            StandardOpenOption.CREATE_NEW,
-                            StandardOpenOption.READ,
-                            StandardOpenOption.WRITE);
-        } catch (IOException e) {
-            ExceptionUtils.rethrow(e);
-        }
-        this.loadEntireSegmentToCache =
-                shouldLoadEntireSegmentToCache(
-                        numSubpartitions, segmentSizeInBytes, 
maxCacheCapacity);
-        this.subpartitionFinishedSegmentMetas = new 
ArrayList<>(numSubpartitions);
-        this.subpartitionCurrentOffset = new long[numSubpartitions];
-        this.subpartitionFreeSpaceInBytes = new int[numSubpartitions];
-        this.currentSegmentMeta = new SegmentMeta[numSubpartitions];
-        for (int i = 0; i < numSubpartitions; i++) {
-            subpartitionFinishedSegmentMetas.add(new TreeMap<>());
-        }
-        this.cacheRegionConsumer = cacheRegionConsumer;
-        this.segmentSizeInBytes = segmentSizeInBytes;
-    }
-
-    @Override
-    public long findRegion(int subpartition, int bufferIndex, boolean 
loadToCache) {
-        // first of all, find the region from current writing segment.
-        SegmentMeta segmentMeta = currentSegmentMeta[subpartition];
-        if (segmentMeta != null) {
-            long regionOffset =
-                    findRegionInSegment(subpartition, bufferIndex, 
segmentMeta, loadToCache);
-            if (regionOffset != -1) {
-                return regionOffset;
-            }
-        }
-
-        // next, find the region from finished segments.
-        TreeMap<Integer, SegmentMeta> subpartitionSegmentMetaTreeMap =
-                subpartitionFinishedSegmentMetas.get(subpartition);
-        // all segments with a minBufferIndex less than or equal to this 
target buffer index may
-        // contain the target region.
-        for (SegmentMeta meta :
-                subpartitionSegmentMetaTreeMap.headMap(bufferIndex, 
true).values()) {
-            long regionOffset = findRegionInSegment(subpartition, bufferIndex, 
meta, loadToCache);
-            if (regionOffset != -1) {
-                return regionOffset;
-            }
-        }
-        return -1;
-    }
-
-    private long findRegionInSegment(
-            int subpartition, int bufferIndex, SegmentMeta meta, boolean 
loadToCache) {
-        if (bufferIndex <= meta.getMaxBufferIndex()) {
-            try {
-                return readSegmentAndLoadToCacheIfNeeded(
-                        subpartition, bufferIndex, meta, loadToCache);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
-        // -1 indicates that target region is not founded from this segment
-        return -1;
-    }
-
-    private long readSegmentAndLoadToCacheIfNeeded(
-            int subpartition, int bufferIndex, SegmentMeta meta, boolean 
loadToCache)
-            throws IOException {
-        // read all regions belong to this segment.
-        List<Tuple2<InternalRegion, Long>> regionAndOffsets =
-                readSegment(meta.getOffset(), meta.getNumRegions());
-        // -1 indicates that target region is not founded from this segment.
-        long targetRegionOffset = -1;
-        InternalRegion targetRegion = null;
-        // traverse all regions to find target.
-        Iterator<Tuple2<InternalRegion, Long>> it = 
regionAndOffsets.iterator();
-        while (it.hasNext()) {
-            Tuple2<InternalRegion, Long> regionAndOffset = it.next();
-            InternalRegion region = regionAndOffset.f0;
-            // whether the region contains this buffer.
-            if (region.containBuffer(bufferIndex)) {
-                // target region is founded.
-                targetRegion = region;
-                targetRegionOffset = regionAndOffset.f1;
-                it.remove();
-            }
-        }
-
-        // target region is founded and need to load to cache.
-        if (targetRegion != null && loadToCache) {
-            if (loadEntireSegmentToCache) {
-                // first of all, load all regions except target to cache.
-                regionAndOffsets.forEach(
-                        (regionAndOffsetTuple) ->
-                                cacheRegionConsumer.accept(subpartition, 
regionAndOffsetTuple.f0));
-                // load target region to cache in the end, this is to prevent 
the target
-                // from being eliminated.
-                cacheRegionConsumer.accept(subpartition, targetRegion);
-            } else {
-                // only load target region to cache.
-                cacheRegionConsumer.accept(subpartition, targetRegion);
-            }
-        }
-        // return the offset of target region.
-        return targetRegionOffset;
-    }
-
-    @Override
-    public void appendOrOverwriteRegion(int subpartition, InternalRegion 
newRegion)
-            throws IOException {
-        // This method will only be called when we want to eliminate a region. 
We can't let the
-        // region be reloaded into the cache, otherwise it will lead to an 
infinite loop.
-        long oldRegionOffset = findRegion(subpartition, 
newRegion.getFirstBufferIndex(), false);
-        if (oldRegionOffset != -1) {
-            // if region is already exists in file, overwrite it.
-            writeRegionToOffset(oldRegionOffset, newRegion);
-        } else {
-            // otherwise, append region to segment.
-            appendRegion(subpartition, newRegion);
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (channel != null) {
-            channel.close();
-        }
-    }
-
-    private static boolean shouldLoadEntireSegmentToCache(
-            int numSubpartitions, int segmentSizeInBytes, long 
maxCacheCapacity) {
-        // If the cache can put at least two segments (one for reading and one 
for writing) for each
-        // subpartition, it is reasonable to load the entire segment into 
memory, which can improve
-        // the cache hit rate. On the contrary, if the cache capacity is 
small, loading a large
-        // number of regions will lead to performance degradation,only the 
target region should be
-        // loaded.
-        return ((long) 2 * numSubpartitions * segmentSizeInBytes) / 
InternalRegion.HEADER_SIZE
-                <= maxCacheCapacity;
-    }
-
-    private void appendRegion(int subpartition, InternalRegion region) throws 
IOException {
-        int regionSize = region.getSize();
-        // check whether we have enough space to append this region.
-        if (subpartitionFreeSpaceInBytes[subpartition] < regionSize) {
-            // No enough free space, start a new segment. Note that if region 
is larger than
-            // segment's size, this will start a new segment only contains the 
big region.
-            startNewSegment(subpartition, Math.max(regionSize, 
segmentSizeInBytes));
-        }
-        // spill this region to current offset of file index.
-        writeRegionToOffset(subpartitionCurrentOffset[subpartition], region);
-        // a new region was appended to segment, update it.
-        updateSegment(subpartition, region);
-    }
-
-    private void writeRegionToOffset(long offset, InternalRegion region) 
throws IOException {
-        channel.position(offset);
-        InternalRegionWriteReadUtils.writeRegionToFile(channel, 
regionHeaderBuffer, region);
-    }
-
-    private void startNewSegment(int subpartition, int newSegmentSize) {
-        SegmentMeta oldSegmentMeta = currentSegmentMeta[subpartition];
-        currentSegmentMeta[subpartition] = new SegmentMeta(nextSegmentOffset);
-        subpartitionCurrentOffset[subpartition] = nextSegmentOffset;
-        nextSegmentOffset += newSegmentSize;
-        subpartitionFreeSpaceInBytes[subpartition] = newSegmentSize;
-        if (oldSegmentMeta != null) {
-            // put the finished segment to subpartitionFinishedSegmentMetas.
-            subpartitionFinishedSegmentMetas
-                    .get(subpartition)
-                    .put(oldSegmentMeta.minBufferIndex, oldSegmentMeta);
-        }
-    }
-
-    private void updateSegment(int subpartition, InternalRegion region) {
-        int regionSize = region.getSize();
-        subpartitionFreeSpaceInBytes[subpartition] -= regionSize;
-        subpartitionCurrentOffset[subpartition] += regionSize;
-        SegmentMeta segmentMeta = currentSegmentMeta[subpartition];
-        segmentMeta.addRegion(
-                region.getFirstBufferIndex(),
-                region.getFirstBufferIndex() + region.getNumBuffers() - 1);
-    }
-
-    /**
-     * Read segment from index file.
-     *
-     * @param offset offset of this segment.
-     * @param numRegions number of regions of this segment.
-     * @return List of all regions and its offset belong to this segment.
-     */
-    private List<Tuple2<InternalRegion, Long>> readSegment(long offset, int 
numRegions)
-            throws IOException {
-        List<Tuple2<InternalRegion, Long>> regionAndOffsets = new 
ArrayList<>();
-        for (int i = 0; i < numRegions; i++) {
-            InternalRegion region =
-                    InternalRegionWriteReadUtils.readRegionFromFile(
-                            channel, regionHeaderBuffer, offset);
-            regionAndOffsets.add(Tuple2.of(region, offset));
-            offset += region.getSize();
-        }
-        return regionAndOffsets;
-    }
-
-    /**
-     * Metadata of spilled regions segment. When a segment is finished(i.e. no 
longer appended), its
-     * corresponding {@link SegmentMeta} becomes immutable.
-     */
-    private static class SegmentMeta {
-        /**
-         * Minimum buffer index of this segment. It is the smallest 
bufferIndex(inclusive) in all
-         * regions belong to this segment.
-         */
-        private int minBufferIndex;
-
-        /**
-         * Maximum buffer index of this segment. It is the largest 
bufferIndex(inclusive) in all
-         * regions belong to this segment.
-         */
-        private int maxBufferIndex;
-
-        /** Number of regions belong to this segment. */
-        private int numRegions;
-
-        /** The index file offset of this segment. */
-        private final long offset;
-
-        public SegmentMeta(long offset) {
-            this.offset = offset;
-            this.minBufferIndex = Integer.MAX_VALUE;
-            this.maxBufferIndex = 0;
-            this.numRegions = 0;
-        }
-
-        public int getMaxBufferIndex() {
-            return maxBufferIndex;
-        }
-
-        public long getOffset() {
-            return offset;
-        }
-
-        public int getNumRegions() {
-            return numRegions;
-        }
-
-        public void addRegion(int firstBufferIndexOfRegion, int 
maxBufferIndexOfRegion) {
-            if (firstBufferIndexOfRegion < minBufferIndex) {
-                this.minBufferIndex = firstBufferIndexOfRegion;
-            }
-            if (maxBufferIndexOfRegion > maxBufferIndex) {
-                this.maxBufferIndex = maxBufferIndexOfRegion;
-            }
-            this.numRegions++;
-        }
-    }
-
-    /** Factory of {@link HsFileDataIndexSpilledRegionManager}. */
-    public static class Factory implements 
HsFileDataIndexSpilledRegionManager.Factory {
-        private final int segmentSizeInBytes;
-
-        private final long maxCacheCapacity;
-
-        public Factory(int segmentSizeInBytes, long maxCacheCapacity) {
-            this.segmentSizeInBytes = segmentSizeInBytes;
-            this.maxCacheCapacity = maxCacheCapacity;
-        }
-
-        @Override
-        public HsFileDataIndexSpilledRegionManager create(
-                int numSubpartitions,
-                Path indexFilePath,
-                BiConsumer<Integer, InternalRegion> cacheRegionConsumer) {
-            return new HsFileDataIndexSpilledRegionManagerImpl(
-                    numSubpartitions,
-                    indexFilePath,
-                    segmentSizeInBytes,
-                    maxCacheCapacity,
-                    cacheRegionConsumer);
-        }
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
index 602e13a62fa..c0298325caf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
@@ -116,7 +116,7 @@ public class HsResultPartition extends ResultPartition {
                 new HsFileDataIndexImpl(
                         isBroadcastOnly ? 1 : numSubpartitions,
                         new File(dataFileBashPath + 
INDEX_FILE_SUFFIX).toPath(),
-                        
hybridShuffleConfiguration.getSpilledIndexSegmentSize(),
+                        hybridShuffleConfiguration.getRegionGroupSizeInBytes(),
                         
hybridShuffleConfiguration.getNumRetainedInMemoryRegionsMax());
         this.hybridShuffleConfiguration = hybridShuffleConfiguration;
         this.isBroadcastOnly = isBroadcastOnly;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
index 27667afcd47..c41f19e9e24 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
@@ -40,7 +40,7 @@ public class HybridShuffleConfiguration {
 
     private static final long DEFAULT_NUM_RETAINED_IN_MEMORY_REGIONS_MAX = 
Long.MAX_VALUE;
 
-    private static final int DEFAULT_SPILLED_INDEX_SEGMENT_SIZE = 256;
+    private static final int DEFAULT_REGION_GROUP_SIZE_IN_BYTES = 256;
 
     private static final SpillingStrategyType DEFAULT_SPILLING_STRATEGY_NAME =
             SpillingStrategyType.FULL;
@@ -55,7 +55,7 @@ public class HybridShuffleConfiguration {
 
     private final long numRetainedInMemoryRegionsMax;
 
-    private final int spilledIndexSegmentSize;
+    private final int regionGroupSizeInBytes;
 
     private final long bufferPoolSizeCheckIntervalMs;
 
@@ -87,7 +87,7 @@ public class HybridShuffleConfiguration {
             SpillingStrategyType spillingStrategyType,
             long bufferPoolSizeCheckIntervalMs,
             long numRetainedInMemoryRegionsMax,
-            int spilledIndexSegmentSize) {
+            int regionGroupSizeInBytes) {
         this.maxBuffersReadAhead = maxBuffersReadAhead;
         this.bufferRequestTimeout = bufferRequestTimeout;
         this.maxRequestedBuffers = maxRequestedBuffers;
@@ -100,7 +100,7 @@ public class HybridShuffleConfiguration {
         this.spillingStrategyType = spillingStrategyType;
         this.bufferPoolSizeCheckIntervalMs = bufferPoolSizeCheckIntervalMs;
         this.numRetainedInMemoryRegionsMax = numRetainedInMemoryRegionsMax;
-        this.spilledIndexSegmentSize = spilledIndexSegmentSize;
+        this.regionGroupSizeInBytes = regionGroupSizeInBytes;
     }
 
     public static Builder builder(int numSubpartitions, int 
numBuffersPerRequest) {
@@ -172,8 +172,8 @@ public class HybridShuffleConfiguration {
     }
 
     /** Segment size of hybrid spilled file data index. */
-    public int getSpilledIndexSegmentSize() {
-        return spilledIndexSegmentSize;
+    public int getRegionGroupSizeInBytes() {
+        return regionGroupSizeInBytes;
     }
 
     /** Max number of hybrid retained regions in memory. */
@@ -211,7 +211,7 @@ public class HybridShuffleConfiguration {
 
         private long numRetainedInMemoryRegionsMax = 
DEFAULT_NUM_RETAINED_IN_MEMORY_REGIONS_MAX;
 
-        private int spilledIndexSegmentSize = 
DEFAULT_SPILLED_INDEX_SEGMENT_SIZE;
+        private int regionGroupSizeInBytes = 
DEFAULT_REGION_GROUP_SIZE_IN_BYTES;
 
         private final int numSubpartitions;
 
@@ -275,8 +275,8 @@ public class HybridShuffleConfiguration {
             return this;
         }
 
-        public Builder setSpilledIndexSegmentSize(int spilledIndexSegmentSize) 
{
-            this.spilledIndexSegmentSize = spilledIndexSegmentSize;
+        public Builder setRegionGroupSizeInBytes(int regionGroupSizeInBytes) {
+            this.regionGroupSizeInBytes = regionGroupSizeInBytes;
             return this;
         }
 
@@ -293,7 +293,7 @@ public class HybridShuffleConfiguration {
                     spillingStrategyType,
                     bufferPoolSizeCheckIntervalMs,
                     numRetainedInMemoryRegionsMax,
-                    spilledIndexSegmentSize);
+                    regionGroupSizeInBytes);
         }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCache.java
similarity index 77%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCache.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCache.java
index 171007e3220..24029da69a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCache.java
@@ -16,9 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.partition.hybrid;
+package org.apache.flink.runtime.io.network.partition.hybrid.index;
 
-import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.IOUtils;
 
@@ -42,25 +41,25 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * and automatically caches some indexes in memory. When there are too many 
cached indexes, it is
  * this class's responsibility to decide and eliminate some indexes to disk.
  */
-public class HsFileDataIndexCache {
+public class FileDataIndexCache<T extends FileDataIndexRegionHelper.Region> {
     /**
-     * This struct stores all in memory {@link InternalRegion}s. Each element 
is a treeMap contains
-     * all in memory {@link InternalRegion}'s of specific subpartition 
corresponding to the
-     * subscript. The value of this treeMap is a {@link InternalRegion}, and 
the key is
-     * firstBufferIndex of this region. Only cached in memory region will be 
put to here.
+     * This struct stores all in memory {@link 
FileDataIndexRegionHelper.Region}s. Each element is a
+     * treeMap contains all in memory {@link 
FileDataIndexRegionHelper.Region}'s of specific
+     * subpartition corresponding to the subscript. The value of this treeMap 
is a {@link
+     * FileDataIndexRegionHelper.Region}, and the key is firstBufferIndex of 
this region. Only
+     * cached in memory region will be put to here.
      */
-    private final List<TreeMap<Integer, InternalRegion>>
-            subpartitionFirstBufferIndexInternalRegions;
+    private final List<TreeMap<Integer, T>> 
subpartitionFirstBufferIndexRegions;
 
     /**
      * This cache is used to help eliminate regions from memory. It is only 
maintains the key of
      * each in memory region, the value is just a placeholder. Note that this 
internal cache must be
-     * consistent with subpartitionFirstBufferIndexInternalRegions, that means 
both of them must add
+     * consistent with subpartitionFirstBufferIndexHsBaseRegions, that means 
both of them must add
      * or delete elements at the same time.
      */
     private final Cache<CachedRegionKey, Object> internalCache;
 
-    private final HsFileDataIndexSpilledRegionManager spilledRegionManager;
+    private final FileDataIndexSpilledRegionManager<T> spilledRegionManager;
 
     private final Path indexFilePath;
 
@@ -70,14 +69,14 @@ public class HsFileDataIndexCache {
      */
     public static final Object PLACEHOLDER = new Object();
 
-    public HsFileDataIndexCache(
+    public FileDataIndexCache(
             int numSubpartitions,
             Path indexFilePath,
             long numRetainedInMemoryRegionsMax,
-            HsFileDataIndexSpilledRegionManager.Factory 
spilledRegionManagerFactory) {
-        this.subpartitionFirstBufferIndexInternalRegions = new 
ArrayList<>(numSubpartitions);
+            FileDataIndexSpilledRegionManager.Factory<T> 
spilledRegionManagerFactory) {
+        this.subpartitionFirstBufferIndexRegions = new 
ArrayList<>(numSubpartitions);
         for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
-            subpartitionFirstBufferIndexInternalRegions.add(new TreeMap<>());
+            subpartitionFirstBufferIndexRegions.add(new TreeMap<>());
         }
         this.internalCache =
                 CacheBuilder.newBuilder()
@@ -93,7 +92,7 @@ public class HsFileDataIndexCache {
                             if (!getCachedRegionContainsTargetBufferIndex(
                                             subpartition, 
region.getFirstBufferIndex())
                                     .isPresent()) {
-                                subpartitionFirstBufferIndexInternalRegions
+                                subpartitionFirstBufferIndexRegions
                                         .get(subpartition)
                                         .put(region.getFirstBufferIndex(), 
region);
                                 internalCache.put(
@@ -117,12 +116,12 @@ public class HsFileDataIndexCache {
      * @return If target region can be founded from memory or disk, return 
optional contains target
      *     region. Otherwise, return {@code Optional#empty()};
      */
-    public Optional<InternalRegion> get(int subpartitionId, int bufferIndex) {
+    public Optional<T> get(int subpartitionId, int bufferIndex) {
         // first of all, try to get region in memory.
-        Optional<InternalRegion> regionOpt =
+        Optional<T> regionOpt =
                 getCachedRegionContainsTargetBufferIndex(subpartitionId, 
bufferIndex);
         if (regionOpt.isPresent()) {
-            InternalRegion region = regionOpt.get();
+            T region = regionOpt.get();
             checkNotNull(
                     // this is needed for cache entry remove algorithm like 
LRU.
                     internalCache.getIfPresent(
@@ -139,22 +138,20 @@ public class HsFileDataIndexCache {
      * Put regions to cache.
      *
      * @param subpartition the subpartition's id of regions.
-     * @param internalRegions regions to be cached.
+     * @param fileRegions regions to be cached.
      */
-    public void put(int subpartition, List<InternalRegion> internalRegions) {
-        TreeMap<Integer, InternalRegion> treeMap =
-                subpartitionFirstBufferIndexInternalRegions.get(subpartition);
-        for (InternalRegion internalRegion : internalRegions) {
+    public void put(int subpartition, List<T> fileRegions) {
+        TreeMap<Integer, T> treeMap = 
subpartitionFirstBufferIndexRegions.get(subpartition);
+        for (T region : fileRegions) {
             internalCache.put(
-                    new CachedRegionKey(subpartition, 
internalRegion.getFirstBufferIndex()),
-                    PLACEHOLDER);
-            treeMap.put(internalRegion.getFirstBufferIndex(), internalRegion);
+                    new CachedRegionKey(subpartition, 
region.getFirstBufferIndex()), PLACEHOLDER);
+            treeMap.put(region.getFirstBufferIndex(), region);
         }
     }
 
     /**
-     * Close {@link HsFileDataIndexCache}, this will delete the index file. 
After that, the index
-     * can no longer be read or written.
+     * Close {@link FileDataIndexCache}, this will delete the index file. 
After that, the index can
+     * no longer be read or written.
      */
     public void close() throws IOException {
         spilledRegionManager.close();
@@ -165,8 +162,8 @@ public class HsFileDataIndexCache {
     private void handleRemove(RemovalNotification<CachedRegionKey, Object> 
removedEntry) {
         CachedRegionKey removedKey = removedEntry.getKey();
         // remove the corresponding region from memory.
-        InternalRegion removedRegion =
-                subpartitionFirstBufferIndexInternalRegions
+        T removedRegion =
+                subpartitionFirstBufferIndexRegions
                         .get(removedKey.getSubpartition())
                         .remove(removedKey.getFirstBufferIndex());
 
@@ -175,7 +172,7 @@ public class HsFileDataIndexCache {
         writeRegion(removedKey.getSubpartition(), removedRegion);
     }
 
-    private void writeRegion(int subpartition, InternalRegion region) {
+    private void writeRegion(int subpartition, T region) {
         try {
             spilledRegionManager.appendOrOverwriteRegion(subpartition, region);
         } catch (IOException e) {
@@ -191,10 +188,10 @@ public class HsFileDataIndexCache {
      * @return If target region is cached in memory, return optional contains 
target region.
      *     Otherwise, return {@code Optional#empty()};
      */
-    private Optional<InternalRegion> getCachedRegionContainsTargetBufferIndex(
+    private Optional<T> getCachedRegionContainsTargetBufferIndex(
             int subpartitionId, int bufferIndex) {
         return Optional.ofNullable(
-                        subpartitionFirstBufferIndexInternalRegions
+                        subpartitionFirstBufferIndexRegions
                                 .get(subpartitionId)
                                 .floorEntry(bufferIndex))
                 .map(Map.Entry::getValue)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java
new file mode 100644
index 00000000000..c0d4270e109
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.index;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+/**
+ * {@link FileDataIndexRegionHelper} is responsible for writing a {@link 
Region} to the file or
+ * reading a {@link Region} from file.
+ */
+public interface FileDataIndexRegionHelper<T extends 
FileDataIndexRegionHelper.Region> {
+
+    /**
+     * Write the region to the file.
+     *
+     * @param channel the file channel to write the region
+     * @param region the region to be written to the file
+     */
+    void writeRegionToFile(FileChannel channel, T region) throws IOException;
+
+    /**
+     * Read a region from the file.
+     *
+     * @param channel the file channel to read the region
+     * @param fileOffset the current region data is from this file offset, so 
start reading the file
+     *     from the offset when reading the region
+     * @return the region read from the file
+     */
+    T readRegionFromFile(FileChannel channel, long fileOffset) throws 
IOException;
+
+    /**
+     * A {@link Region} Represents a series of buffers that are:
+     *
+     * <ul>
+     *   <li>From the same subpartition
+     *   <li>Logically (i.e. buffer index) consecutive
+     *   <li>Physically (i.e. offset in the file) consecutive
+     * </ul>
+     *
+     * <p>The following example illustrates some physically continuous buffers 
in a file and regions
+     * upon them, where `x-y` denotes buffer from subpartition x with buffer 
index y, and `()`
+     * denotes a region.
+     *
+     * <p>(1-1, 1-2), (2-1), (2-2, 2-3), (1-5, 1-6), (1-4)
+     *
+     * <p>Note: The file may not contain all the buffers. E.g., 1-3 is missing 
in the above example.
+     *
+     * <p>Note: Buffers in file may have different orders than their buffer 
index. E.g., 1-4 comes
+     * after 1-6 in the above example.
+     *
+     * <p>Note: This index may not always maintain the longest possible 
regions. E.g., 2-1, 2-2, 2-3
+     * are in two separate regions.
+     */
+    interface Region {
+
+        /** Get the total size in bytes of this region, including the fields 
and the buffers. */
+        int getSize();
+
+        /** Get the first buffer index of this region. */
+        int getFirstBufferIndex();
+
+        /** Get the file start offset of this region. */
+        long getRegionFileOffset();
+
+        /** Get the number of buffers in this region. */
+        int getNumBuffers();
+
+        /**
+         * Whether the current region contain the buffer.
+         *
+         * @param bufferIndex the specific buffer index
+         */
+        boolean containBuffer(int bufferIndex);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManager.java
similarity index 77%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManager.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManager.java
index e0a413a5c9f..2e3e6064a82 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManager.java
@@ -16,23 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.partition.hybrid;
-
-import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
+package org.apache.flink.runtime.io.network.partition.hybrid.index;
 
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.function.BiConsumer;
 
 /** This class is responsible for spilling region to disk and managing these 
spilled regions. */
-public interface HsFileDataIndexSpilledRegionManager extends AutoCloseable {
+public interface FileDataIndexSpilledRegionManager<T extends 
FileDataIndexRegionHelper.Region>
+        extends AutoCloseable {
     /**
      * Write this region to index file. If target region already spilled, 
overwrite it.
      *
      * @param subpartition the subpartition id of this region.
      * @param region the region to be spilled to index file.
      */
-    void appendOrOverwriteRegion(int subpartition, InternalRegion region) 
throws IOException;
+    void appendOrOverwriteRegion(int subpartition, T region) throws 
IOException;
 
     /**
      * Find the region contains target bufferIndex and belong to target 
subpartition.
@@ -48,11 +47,11 @@ public interface HsFileDataIndexSpilledRegionManager 
extends AutoCloseable {
     /** Close this spilled region manager. */
     void close() throws IOException;
 
-    /** Factory of {@link HsFileDataIndexSpilledRegionManager}. */
-    interface Factory {
-        HsFileDataIndexSpilledRegionManager create(
+    /** Factory of {@link FileDataIndexSpilledRegionManager}. */
+    interface Factory<T extends FileDataIndexRegionHelper.Region> {
+        FileDataIndexSpilledRegionManager<T> create(
                 int numSubpartitions,
                 Path indexFilePath,
-                BiConsumer<Integer, InternalRegion> cacheRegionConsumer);
+                BiConsumer<Integer, T> cacheRegionConsumer);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManagerImpl.java
new file mode 100644
index 00000000000..d468e2f2dd2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManagerImpl.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.index;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.function.BiConsumer;
+
+/**
+ * Default implementation of {@link FileDataIndexSpilledRegionManager}. This 
manager will handle and
+ * spill regions in the following way:
+ *
+ * <ul>
+ *   <li>All regions will be written to the same file, namely index file.
+ *   <li>Multiple regions belonging to the same subpartition form a region 
group.
+ *   <li>The regions in the same region group have no special relationship, 
but are only related to
+ *       the order in which they are spilled.
+ *   <li>Each region group is independent. Even if the previous region group 
is not full, the next
+ *       region group can still be allocated.
+ *   <li>If a region has been written to the index file already, spill it 
again will overwrite the
+ *       previous region.
+ *   <li>The very large region will monopolize a single region group.
+ * </ul>
+ *
+ * <p>The relationships between index file and region group are shown below.
+ *
+ * <pre>
+ *
+ *         - - - - - - - - - Index File - - — - - - - - - - - -
+ *        |                                                     |
+ *        | - - — -RegionGroup1 - -   - - RegionGroup2- - - -   |
+ *        ||SP1 R1||SP1 R2| Free | |SP2 R3| SP2 R1| SP2 R2 |  |
+ *        | - - - - - - - - - - - -   - - - - - - - - - - - -   |
+ *        |                                                     |
+ *        | - - - - - - - -RegionGroup3 - - - - -               |
+ *        ||              Big Region             |              |
+ *        | - - - - - - - - - - - - - - - - - - -               |
+ *         - - - - - - - - - - - - - - - - - - - - - -- - - - -
+ * </pre>
+ */
+public class FileDataIndexSpilledRegionManagerImpl<T extends 
FileDataIndexRegionHelper.Region>
+        implements FileDataIndexSpilledRegionManager<T> {
+
+    /**
+     * List of subpartition's region group meta. Each element is a treeMap 
contains all {@link
+     * RegionGroup}'s of specific subpartition corresponding to the subscript. 
The value of this
+     * treeMap is a {@link RegionGroup}, and the key is minBufferIndex of this 
region group. Only
+     * finished(i.e. no longer appended) region group will be put to here.
+     */
+    private final List<TreeMap<Integer, RegionGroup>> 
subpartitionFinishedRegionGroupMetas;
+
+    private FileChannel channel;
+
+    /** The Offset of next region group, new region group will start from this 
offset. */
+    private long nextRegionGroupOffset = 0L;
+
+    private final long[] subpartitionCurrentOffset;
+
+    /** Free space of every subpartition's current region group. */
+    private final int[] subpartitionFreeSpaceInBytes;
+
+    /** Metadata of every subpartition's current region group. */
+    private final RegionGroup[] currentRegionGroup;
+
+    /**
+     * Default size of region group. If the size of a region is larger than 
this value, it will be
+     * allocated and occupy a single region group.
+     */
+    private final int regionGroupSizeInBytes;
+
+    /**
+     * This consumer is used to load region to cache. The first parameter is 
subpartition id, and
+     * second parameter is the region to load.
+     */
+    private final BiConsumer<Integer, T> cacheRegionConsumer;
+
+    private final FileDataIndexRegionHelper<T> fileDataIndexRegionHelper;
+
+    /**
+     * When region in region group needs to be loaded to cache, whether to 
load all regions of the
+     * entire region group.
+     */
+    private final boolean loadEntireRegionGroupToCache;
+
+    public FileDataIndexSpilledRegionManagerImpl(
+            int numSubpartitions,
+            Path indexFilePath,
+            int regionGroupSizeInBytes,
+            long maxCacheCapacity,
+            int regionHeaderSize,
+            BiConsumer<Integer, T> cacheRegionConsumer,
+            FileDataIndexRegionHelper<T> fileDataIndexRegionHelper) {
+        try {
+            this.channel =
+                    FileChannel.open(
+                            indexFilePath,
+                            StandardOpenOption.CREATE_NEW,
+                            StandardOpenOption.READ,
+                            StandardOpenOption.WRITE);
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e);
+        }
+        this.loadEntireRegionGroupToCache =
+                shouldLoadEntireRegionGroupToCache(
+                        numSubpartitions,
+                        regionGroupSizeInBytes,
+                        maxCacheCapacity,
+                        regionHeaderSize);
+        this.subpartitionFinishedRegionGroupMetas = new 
ArrayList<>(numSubpartitions);
+        this.subpartitionCurrentOffset = new long[numSubpartitions];
+        this.subpartitionFreeSpaceInBytes = new int[numSubpartitions];
+        this.currentRegionGroup = new RegionGroup[numSubpartitions];
+        for (int i = 0; i < numSubpartitions; i++) {
+            subpartitionFinishedRegionGroupMetas.add(new TreeMap<>());
+        }
+        this.cacheRegionConsumer = cacheRegionConsumer;
+        this.fileDataIndexRegionHelper = fileDataIndexRegionHelper;
+        this.regionGroupSizeInBytes = regionGroupSizeInBytes;
+    }
+
+    @Override
+    public long findRegion(int subpartition, int bufferIndex, boolean 
loadToCache) {
+        // first of all, find the region from current writing region group.
+        RegionGroup regionGroup = currentRegionGroup[subpartition];
+        if (regionGroup != null) {
+            long regionOffset =
+                    findRegionInRegionGroup(subpartition, bufferIndex, 
regionGroup, loadToCache);
+            if (regionOffset != -1) {
+                return regionOffset;
+            }
+        }
+
+        // next, find the region from finished region groups.
+        TreeMap<Integer, RegionGroup> subpartitionRegionGroupMetaTreeMap =
+                subpartitionFinishedRegionGroupMetas.get(subpartition);
+        // all region groups with a minBufferIndex less than or equal to this 
target buffer index
+        // may contain the target region.
+        for (RegionGroup meta :
+                subpartitionRegionGroupMetaTreeMap.headMap(bufferIndex, 
true).values()) {
+            long regionOffset =
+                    findRegionInRegionGroup(subpartition, bufferIndex, meta, 
loadToCache);
+            if (regionOffset != -1) {
+                return regionOffset;
+            }
+        }
+        return -1;
+    }
+
+    private long findRegionInRegionGroup(
+            int subpartition, int bufferIndex, RegionGroup meta, boolean 
loadToCache) {
+        if (bufferIndex <= meta.getMaxBufferIndex()) {
+            try {
+                return readRegionGroupAndLoadToCacheIfNeeded(
+                        subpartition, bufferIndex, meta, loadToCache);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        // -1 indicates that target region is not founded from this region 
group
+        return -1;
+    }
+
+    private long readRegionGroupAndLoadToCacheIfNeeded(
+            int subpartition, int bufferIndex, RegionGroup meta, boolean 
loadToCache)
+            throws IOException {
+        // read all regions belong to this region group.
+        List<Tuple2<T, Long>> regionAndOffsets =
+                readRegionGroup(meta.getOffset(), meta.getNumRegions());
+        // -1 indicates that target region is not founded from this region 
group.
+        long targetRegionOffset = -1;
+        T targetRegion = null;
+        // traverse all regions to find target.
+        Iterator<Tuple2<T, Long>> it = regionAndOffsets.iterator();
+        while (it.hasNext()) {
+            Tuple2<T, Long> regionAndOffset = it.next();
+            T region = regionAndOffset.f0;
+            // whether the region contains this buffer.
+            if (region.containBuffer(bufferIndex)) {
+                // target region is founded.
+                targetRegion = region;
+                targetRegionOffset = regionAndOffset.f1;
+                it.remove();
+            }
+        }
+
+        // target region is founded and need to load to cache.
+        if (targetRegion != null && loadToCache) {
+            if (loadEntireRegionGroupToCache) {
+                // first of all, load all regions except target to cache.
+                regionAndOffsets.forEach(
+                        (regionAndOffsetTuple) ->
+                                cacheRegionConsumer.accept(subpartition, 
regionAndOffsetTuple.f0));
+                // load target region to cache in the end, this is to prevent 
the target
+                // from being eliminated.
+                cacheRegionConsumer.accept(subpartition, targetRegion);
+            } else {
+                // only load target region to cache.
+                cacheRegionConsumer.accept(subpartition, targetRegion);
+            }
+        }
+        // return the offset of target region.
+        return targetRegionOffset;
+    }
+
+    @Override
+    public void appendOrOverwriteRegion(int subpartition, T newRegion) throws 
IOException {
+        // This method will only be called when we want to eliminate a region. 
We can't let the
+        // region be reloaded into the cache, otherwise it will lead to an 
infinite loop.
+        long oldRegionOffset = findRegion(subpartition, 
newRegion.getFirstBufferIndex(), false);
+        if (oldRegionOffset != -1) {
+            // if region is already exists in file, overwrite it.
+            writeRegionToOffset(oldRegionOffset, newRegion);
+        } else {
+            // otherwise, append region to region group.
+            appendRegion(subpartition, newRegion);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (channel != null) {
+            channel.close();
+        }
+    }
+
+    private static boolean shouldLoadEntireRegionGroupToCache(
+            int numSubpartitions,
+            int regionGroupSizeInBytes,
+            long maxCacheCapacity,
+            int regionHeaderSize) {
+        // If the cache can put at least two region groups (one for reading 
and one for writing) for
+        // each subpartition, it is reasonable to load the entire region group 
into memory, which
+        // can improve the cache hit rate. On the contrary, if the cache 
capacity is small, loading
+        // a large number of regions will lead to performance degradation,only 
the target region
+        // should be loaded.
+        return ((long) 2 * numSubpartitions * regionGroupSizeInBytes) / 
regionHeaderSize
+                <= maxCacheCapacity;
+    }
+
+    private void appendRegion(int subpartition, T region) throws IOException {
+        int regionSize = region.getSize();
+        // check whether we have enough space to append this region.
+        if (subpartitionFreeSpaceInBytes[subpartition] < regionSize) {
+            // No enough free space, start a new region group. Note that if 
region is larger than
+            // region group's size, this will start a new region group only 
contains the big region.
+            startNewRegionGroup(subpartition, Math.max(regionSize, 
regionGroupSizeInBytes));
+        }
+        // spill this region to current offset of file index.
+        writeRegionToOffset(subpartitionCurrentOffset[subpartition], region);
+        // a new region was appended to region group, update it.
+        updateRegionGroup(subpartition, region);
+    }
+
+    private void writeRegionToOffset(long offset, T region) throws IOException 
{
+        channel.position(offset);
+        fileDataIndexRegionHelper.writeRegionToFile(channel, region);
+    }
+
+    private void startNewRegionGroup(int subpartition, int newRegionGroupSize) 
{
+        RegionGroup oldRegionGroup = currentRegionGroup[subpartition];
+        currentRegionGroup[subpartition] = new 
RegionGroup(nextRegionGroupOffset);
+        subpartitionCurrentOffset[subpartition] = nextRegionGroupOffset;
+        nextRegionGroupOffset += newRegionGroupSize;
+        subpartitionFreeSpaceInBytes[subpartition] = newRegionGroupSize;
+        if (oldRegionGroup != null) {
+            // put the finished region group to 
subpartitionFinishedRegionGroupMetas.
+            subpartitionFinishedRegionGroupMetas
+                    .get(subpartition)
+                    .put(oldRegionGroup.minBufferIndex, oldRegionGroup);
+        }
+    }
+
+    private void updateRegionGroup(int subpartition, T region) {
+        int regionSize = region.getSize();
+        subpartitionFreeSpaceInBytes[subpartition] -= regionSize;
+        subpartitionCurrentOffset[subpartition] += regionSize;
+        RegionGroup regionGroup = currentRegionGroup[subpartition];
+        regionGroup.addRegion(
+                region.getFirstBufferIndex(),
+                region.getFirstBufferIndex() + region.getNumBuffers() - 1);
+    }
+
+    /**
+     * Read region group from index file.
+     *
+     * @param offset offset of this region group.
+     * @param numRegions number of regions of this region group.
+     * @return List of all regions and its offset belong to this region group.
+     */
+    private List<Tuple2<T, Long>> readRegionGroup(long offset, int numRegions) 
throws IOException {
+        List<Tuple2<T, Long>> regionAndOffsets = new ArrayList<>();
+        for (int i = 0; i < numRegions; i++) {
+            T region = fileDataIndexRegionHelper.readRegionFromFile(channel, 
offset);
+            regionAndOffsets.add(Tuple2.of(region, offset));
+            offset += region.getSize();
+        }
+        return regionAndOffsets;
+    }
+
+    /**
+     * Metadata of spilled regions region group. When a region group is 
finished(i.e. no longer
+     * appended), its corresponding {@link RegionGroup} becomes immutable.
+     */
+    private static class RegionGroup {
+        /**
+         * Minimum buffer index of this region group. It is the smallest 
bufferIndex(inclusive) in
+         * all regions belong to this region group.
+         */
+        private int minBufferIndex;
+
+        /**
+         * Maximum buffer index of this region group. It is the largest 
bufferIndex(inclusive) in
+         * all regions belong to this region group.
+         */
+        private int maxBufferIndex;
+
+        /** Number of regions belong to this region group. */
+        private int numRegions;
+
+        /** The index file offset of this region group. */
+        private final long offset;
+
+        public RegionGroup(long offset) {
+            this.offset = offset;
+            this.minBufferIndex = Integer.MAX_VALUE;
+            this.maxBufferIndex = 0;
+            this.numRegions = 0;
+        }
+
+        public int getMaxBufferIndex() {
+            return maxBufferIndex;
+        }
+
+        public long getOffset() {
+            return offset;
+        }
+
+        public int getNumRegions() {
+            return numRegions;
+        }
+
+        public void addRegion(int firstBufferIndexOfRegion, int 
maxBufferIndexOfRegion) {
+            if (firstBufferIndexOfRegion < minBufferIndex) {
+                this.minBufferIndex = firstBufferIndexOfRegion;
+            }
+            if (maxBufferIndexOfRegion > maxBufferIndex) {
+                this.maxBufferIndex = maxBufferIndexOfRegion;
+            }
+            this.numRegions++;
+        }
+    }
+
+    /** Factory of {@link FileDataIndexSpilledRegionManager}. */
+    public static class Factory<T extends FileDataIndexRegionHelper.Region>
+            implements FileDataIndexSpilledRegionManager.Factory<T> {
+        private final int regionGroupSizeInBytes;
+
+        private final long maxCacheCapacity;
+
+        private final int regionHeaderSize;
+
+        private final FileDataIndexRegionHelper<T> fileDataIndexRegionHelper;
+
+        public Factory(
+                int regionGroupSizeInBytes,
+                long maxCacheCapacity,
+                int regionHeaderSize,
+                FileDataIndexRegionHelper<T> fileDataIndexRegionHelper) {
+            this.regionGroupSizeInBytes = regionGroupSizeInBytes;
+            this.maxCacheCapacity = maxCacheCapacity;
+            this.regionHeaderSize = regionHeaderSize;
+            this.fileDataIndexRegionHelper = fileDataIndexRegionHelper;
+        }
+
+        @Override
+        public FileDataIndexSpilledRegionManager<T> create(
+                int numSubpartitions,
+                Path indexFilePath,
+                BiConsumer<Integer, T> cacheRegionConsumer) {
+            return new FileDataIndexSpilledRegionManagerImpl<>(
+                    numSubpartitions,
+                    indexFilePath,
+                    regionGroupSizeInBytes,
+                    maxCacheCapacity,
+                    regionHeaderSize,
+                    cacheRegionConsumer,
+                    fileDataIndexRegionHelper);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java
similarity index 79%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtils.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java
index 08726fb8afb..443bdbf863e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.partition.hybrid;
+package org.apache.flink.runtime.io.network.partition.hybrid.index;
 
 import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
@@ -26,8 +26,8 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
 
-/** Utils for read and write {@link InternalRegion}. */
-public class InternalRegionWriteReadUtils {
+/** Utils for read and write {@link FileDataIndexRegionHelper.Region}. */
+public class FileRegionWriteReadUtils {
 
     /**
      * Allocate a buffer with specific size and configure it to native order.
@@ -44,18 +44,21 @@ public class InternalRegionWriteReadUtils {
     /**
      * Write {@link InternalRegion} to {@link FileChannel}.
      *
+     * <p>Note that this type of region's length may be variable because it 
contains an array to
+     * indicate each buffer's release state.
+     *
      * @param channel the file's channel to write.
      * @param headerBuffer the buffer to write {@link InternalRegion}'s header.
      * @param region the region to be written to channel.
      */
-    public static void writeRegionToFile(
+    public static void writeHsInternalRegionToFile(
             FileChannel channel, ByteBuffer headerBuffer, InternalRegion 
region)
             throws IOException {
         // write header buffer.
         headerBuffer.clear();
         headerBuffer.putInt(region.getFirstBufferIndex());
         headerBuffer.putInt(region.getNumBuffers());
-        headerBuffer.putLong(region.getFirstBufferOffset());
+        headerBuffer.putLong(region.getRegionFileOffset());
         headerBuffer.flip();
 
         // write payload buffer.
@@ -76,22 +79,25 @@ public class InternalRegionWriteReadUtils {
     /**
      * Read {@link InternalRegion} from {@link FileChannel}.
      *
+     * <p>Note that this type of region's length may be variable because it 
contains an array to
+     * indicate each buffer's release state.
+     *
      * @param channel the channel to read.
      * @param headerBuffer the buffer to read {@link InternalRegion}'s header.
-     * @param position position to start read.
+     * @param fileOffset the file offset to start read.
      * @return the {@link InternalRegion} that read from this channel.
      */
-    public static InternalRegion readRegionFromFile(
-            FileChannel channel, ByteBuffer headerBuffer, long position) 
throws IOException {
+    public static InternalRegion readHsInternalRegionFromFile(
+            FileChannel channel, ByteBuffer headerBuffer, long fileOffset) 
throws IOException {
         headerBuffer.clear();
-        BufferReaderWriterUtil.readByteBufferFully(channel, headerBuffer, 
position);
+        BufferReaderWriterUtil.readByteBufferFully(channel, headerBuffer, 
fileOffset);
         headerBuffer.flip();
         int firstBufferIndex = headerBuffer.getInt();
         int numBuffers = headerBuffer.getInt();
         long firstBufferOffset = headerBuffer.getLong();
         ByteBuffer payloadBuffer = allocateAndConfigureBuffer(numBuffers);
         BufferReaderWriterUtil.readByteBufferFully(
-                channel, payloadBuffer, position + InternalRegion.HEADER_SIZE);
+                channel, payloadBuffer, fileOffset + 
InternalRegion.HEADER_SIZE);
         boolean[] released = new boolean[numBuffers];
         payloadBuffer.flip();
         for (int i = 0; i < numBuffers; i++) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index 2e49c60699a..856065a0421 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -105,7 +105,7 @@ public class NettyShuffleEnvironmentConfiguration {
 
     private final int maxOverdraftBuffersPerGate;
 
-    private final int hybridShuffleSpilledIndexSegmentSize;
+    private final int hybridShuffleSpilledIndexRegionGroupSize;
 
     private final long hybridShuffleNumRetainedInMemoryRegionsMax;
 
@@ -134,7 +134,7 @@ public class NettyShuffleEnvironmentConfiguration {
             int maxNumberOfConnections,
             boolean connectionReuseEnabled,
             int maxOverdraftBuffersPerGate,
-            int hybridShuffleSpilledIndexSegmentSize,
+            int hybridShuffleSpilledIndexRegionGroupSize,
             long hybridShuffleNumRetainedInMemoryRegionsMax,
             @Nullable TieredStorageConfiguration tieredStorageConfiguration) {
 
@@ -160,7 +160,7 @@ public class NettyShuffleEnvironmentConfiguration {
         this.maxNumberOfConnections = maxNumberOfConnections;
         this.connectionReuseEnabled = connectionReuseEnabled;
         this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate;
-        this.hybridShuffleSpilledIndexSegmentSize = 
hybridShuffleSpilledIndexSegmentSize;
+        this.hybridShuffleSpilledIndexRegionGroupSize = 
hybridShuffleSpilledIndexRegionGroupSize;
         this.hybridShuffleNumRetainedInMemoryRegionsMax =
                 hybridShuffleNumRetainedInMemoryRegionsMax;
         this.tieredStorageConfiguration = tieredStorageConfiguration;
@@ -264,8 +264,8 @@ public class NettyShuffleEnvironmentConfiguration {
         return hybridShuffleNumRetainedInMemoryRegionsMax;
     }
 
-    public int getHybridShuffleSpilledIndexSegmentSize() {
-        return hybridShuffleSpilledIndexSegmentSize;
+    public int getHybridShuffleSpilledIndexRegionGroupSize() {
+        return hybridShuffleSpilledIndexRegionGroupSize;
     }
 
     public TieredStorageConfiguration getTieredStorageConfiguration() {
@@ -376,7 +376,8 @@ public class NettyShuffleEnvironmentConfiguration {
 
         int hybridShuffleSpilledIndexSegmentSize =
                 configuration.get(
-                        
NettyShuffleEnvironmentOptions.HYBRID_SHUFFLE_SPILLED_INDEX_SEGMENT_SIZE);
+                        NettyShuffleEnvironmentOptions
+                                
.HYBRID_SHUFFLE_SPILLED_INDEX_REGION_GROUP_SIZE);
 
         long hybridShuffleNumRetainedInMemoryRegionsMax =
                 configuration.get(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
index 279f12d2782..bbb88314b4c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
@@ -25,12 +25,15 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexRegionHelper;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.index.TestingFileDataIndexRegion;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.List;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.index.TestingFileDataIndexRegion.getContainBufferFunction;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test utils for hybrid shuffle mode. */
@@ -73,37 +76,55 @@ public class HybridShuffleTestUtils {
         return new HsOutputMetrics(new TestCounter(), new TestCounter());
     }
 
-    public static InternalRegion createSingleUnreleasedRegion(
+    public static TestingFileDataIndexRegion createSingleTestRegion(
             int firstBufferIndex, long firstBufferOffset, int 
numBuffersPerRegion) {
-        return new InternalRegion(
-                firstBufferIndex,
-                firstBufferOffset,
-                numBuffersPerRegion,
-                new boolean[numBuffersPerRegion]);
+        return new TestingFileDataIndexRegion.Builder()
+                .setGetSizeSupplier(() -> 
TestingFileDataIndexRegion.REGION_SIZE)
+                .setContainBufferFunction(
+                        bufferIndex ->
+                                getContainBufferFunction(
+                                        bufferIndex, firstBufferIndex, 
numBuffersPerRegion))
+                .setGetFirstBufferIndexSupplier(() -> firstBufferIndex)
+                .setGetRegionFileOffsetSupplier(() -> firstBufferOffset)
+                .setGetNumBuffersSupplier(() -> numBuffersPerRegion)
+                .build();
     }
 
-    public static List<InternalRegion> createAllUnreleasedRegions(
+    public static List<TestingFileDataIndexRegion> createTestRegions(
             int firstBufferIndex, long firstBufferOffset, int 
numBuffersPerRegion, int numRegions) {
-        List<InternalRegion> regions = new ArrayList<>();
+        List<TestingFileDataIndexRegion> regions = new ArrayList<>();
         int bufferIndex = firstBufferIndex;
         long bufferOffset = firstBufferOffset;
+        int numRegionSize = TestingFileDataIndexRegion.REGION_SIZE;
         for (int i = 0; i < numRegions; i++) {
+            final int currentBufferIndex = bufferIndex;
+            final long currentBufferOffset = bufferOffset;
             regions.add(
-                    new InternalRegion(
-                            bufferIndex,
-                            bufferOffset,
-                            numBuffersPerRegion,
-                            new boolean[numBuffersPerRegion]));
+                    new TestingFileDataIndexRegion.Builder()
+                            .setGetSizeSupplier(() -> numRegionSize)
+                            .setGetFirstBufferIndexSupplier(() -> 
currentBufferIndex)
+                            .setGetRegionFileOffsetSupplier(() -> 
currentBufferOffset)
+                            .setGetNumBuffersSupplier(() -> 
numBuffersPerRegion)
+                            .setContainBufferFunction(
+                                    index ->
+                                            getContainBufferFunction(
+                                                    index, firstBufferIndex, 
numBuffersPerRegion))
+                            .build());
             bufferIndex += numBuffersPerRegion;
             bufferOffset += bufferOffset;
         }
         return regions;
     }
 
-    public static void assertRegionEquals(InternalRegion expected, 
InternalRegion region) {
+    public static void assertRegionEquals(
+            FileDataIndexRegionHelper.Region expected, 
FileDataIndexRegionHelper.Region region) {
         
assertThat(region.getFirstBufferIndex()).isEqualTo(expected.getFirstBufferIndex());
-        
assertThat(region.getFirstBufferOffset()).isEqualTo(expected.getFirstBufferOffset());
-        assertThat(region.getNumBuffers()).isEqualTo(region.getNumBuffers());
-        assertThat(region.getReleased()).isEqualTo(region.getReleased());
+        
assertThat(region.getRegionFileOffset()).isEqualTo(expected.getRegionFileOffset());
+        assertThat(region.getNumBuffers()).isEqualTo(expected.getNumBuffers());
+        if (expected instanceof InternalRegion) {
+            assertThat(region).isInstanceOf(InternalRegion.class);
+            assertThat(((InternalRegion) region).getReleased())
+                    .isEqualTo(((InternalRegion) expected).getReleased());
+        }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndexSpilledRegionManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndexSpilledRegionManager.java
index 0c287c9fb25..2971214c0bf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndexSpilledRegionManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndexSpilledRegionManager.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid;
 
-import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexRegionHelper;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexSpilledRegionManager;
 
 import javax.annotation.Nullable;
 
@@ -29,17 +30,17 @@ import java.util.List;
 import java.util.TreeMap;
 import java.util.function.BiConsumer;
 
-/** Mock {@link HsFileDataIndexSpilledRegionManager} for testing. */
-public class TestingFileDataIndexSpilledRegionManager
-        implements HsFileDataIndexSpilledRegionManager {
-    private final List<TreeMap<Integer, InternalRegion>> regions;
+/** Mock {@link FileDataIndexSpilledRegionManager} for testing. */
+public class TestingFileDataIndexSpilledRegionManager<T extends 
FileDataIndexRegionHelper.Region>
+        implements FileDataIndexSpilledRegionManager<T> {
+    private final List<TreeMap<Integer, T>> regions;
 
-    private final BiConsumer<Integer, InternalRegion> cacheRegionConsumer;
+    private final BiConsumer<Integer, T> cacheRegionConsumer;
 
     private int findRegionInvoked = 0;
 
     public TestingFileDataIndexSpilledRegionManager(
-            int numSubpartitions, BiConsumer<Integer, InternalRegion> 
cacheRegionConsumer) {
+            int numSubpartitions, BiConsumer<Integer, T> cacheRegionConsumer) {
         this.regions = new ArrayList<>();
         this.cacheRegionConsumer = cacheRegionConsumer;
         for (int i = 0; i < numSubpartitions; i++) {
@@ -48,7 +49,7 @@ public class TestingFileDataIndexSpilledRegionManager
     }
 
     @Nullable
-    public InternalRegion getRegion(int subpartition, int bufferIndex) {
+    public T getRegion(int subpartition, int bufferIndex) {
         return regions.get(subpartition).get(bufferIndex);
     }
 
@@ -61,15 +62,14 @@ public class TestingFileDataIndexSpilledRegionManager
     }
 
     @Override
-    public void appendOrOverwriteRegion(int subpartition, InternalRegion 
region)
-            throws IOException {
+    public void appendOrOverwriteRegion(int subpartition, T region) {
         regions.get(subpartition).put(region.getFirstBufferIndex(), region);
     }
 
     @Override
     public long findRegion(int subpartition, int bufferIndex, boolean 
loadToCache) {
         findRegionInvoked++;
-        InternalRegion region = regions.get(subpartition).get(bufferIndex);
+        T region = regions.get(subpartition).get(bufferIndex);
         if (region == null) {
             return -1;
         } else {
@@ -87,22 +87,22 @@ public class TestingFileDataIndexSpilledRegionManager
     }
 
     /** Factory of {@link TestingFileDataIndexSpilledRegionManager}. */
-    public static class Factory implements 
HsFileDataIndexSpilledRegionManager.Factory {
-        public static final Factory INSTANCE = new Factory();
+    public static class Factory<T extends FileDataIndexRegionHelper.Region>
+            implements FileDataIndexSpilledRegionManager.Factory<T> {
 
-        public TestingFileDataIndexSpilledRegionManager 
lastSpilledRegionManager;
+        public TestingFileDataIndexSpilledRegionManager<T> 
lastSpilledRegionManager;
 
-        public TestingFileDataIndexSpilledRegionManager 
getLastSpilledRegionManager() {
+        public TestingFileDataIndexSpilledRegionManager<T> 
getLastSpilledRegionManager() {
             return lastSpilledRegionManager;
         }
 
         @Override
-        public HsFileDataIndexSpilledRegionManager create(
+        public FileDataIndexSpilledRegionManager<T> create(
                 int numSubpartitions,
                 Path indexFilePath,
-                BiConsumer<Integer, InternalRegion> cacheRegionConsumer) {
-            TestingFileDataIndexSpilledRegionManager 
testingFileDataIndexSpilledRegionManager =
-                    new TestingFileDataIndexSpilledRegionManager(
+                BiConsumer<Integer, T> cacheRegionConsumer) {
+            TestingFileDataIndexSpilledRegionManager<T> 
testingFileDataIndexSpilledRegionManager =
+                    new TestingFileDataIndexSpilledRegionManager<>(
                             numSubpartitions, cacheRegionConsumer);
             lastSpilledRegionManager = 
testingFileDataIndexSpilledRegionManager;
             return testingFileDataIndexSpilledRegionManager;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCacheTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCacheTest.java
similarity index 62%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCacheTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCacheTest.java
index 5e1f79dce32..1ffd3620975 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCacheTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCacheTest.java
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.partition.hybrid;
+package org.apache.flink.runtime.io.network.partition.hybrid.index;
 
-import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.TestingFileDataIndexSpilledRegionManager;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -31,44 +31,44 @@ import java.util.Optional;
 import java.util.UUID;
 
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.assertRegionEquals;
-import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createAllUnreleasedRegions;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createTestRegions;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link HsFileDataIndexCache}. */
-class HsFileDataIndexCacheTest {
-    private HsFileDataIndexCache indexCache;
+/** Tests for {@link FileDataIndexCache}. */
+class FileDataIndexCacheTest {
+    private FileDataIndexCache<TestingFileDataIndexRegion> indexCache;
 
-    private TestingFileDataIndexSpilledRegionManager spilledRegionManager;
+    private 
TestingFileDataIndexSpilledRegionManager<TestingFileDataIndexRegion>
+            spilledRegionManager;
 
     private final int numSubpartitions = 1;
 
-    private static final int SPILLED_INDEX_SEGMENT_SIZE = 256;
-
     private int numRetainedIndexEntry = 10;
 
     @BeforeEach
     void before(@TempDir Path tmpPath) throws Exception {
         Path indexFilePath = 
Files.createFile(tmpPath.resolve(UUID.randomUUID().toString()));
+        
TestingFileDataIndexSpilledRegionManager.Factory<TestingFileDataIndexRegion>
+                testingSpilledRegionManagerFactory =
+                        new 
TestingFileDataIndexSpilledRegionManager.Factory<>();
         indexCache =
-                new HsFileDataIndexCache(
+                new FileDataIndexCache<>(
                         numSubpartitions,
                         indexFilePath,
                         numRetainedIndexEntry,
-                        
TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE);
-        spilledRegionManager =
-                TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE
-                        .getLastSpilledRegionManager();
+                        testingSpilledRegionManagerFactory);
+        spilledRegionManager = 
testingSpilledRegionManagerFactory.getLastSpilledRegionManager();
     }
 
     @Test
     void testPutAndGet() {
-        indexCache.put(0, createAllUnreleasedRegions(0, 0L, 3, 1));
-        Optional<InternalRegion> regionOpt = indexCache.get(0, 0);
+        indexCache.put(0, createTestRegions(0, 0L, 3, 1));
+        Optional<TestingFileDataIndexRegion> regionOpt = indexCache.get(0, 0);
         assertThat(regionOpt)
                 .hasValueSatisfying(
                         (region) -> {
                             
assertThat(region.getFirstBufferIndex()).isEqualTo(0);
-                            
assertThat(region.getFirstBufferOffset()).isEqualTo(0);
+                            
assertThat(region.getRegionFileOffset()).isEqualTo(0);
                             assertThat(region.getNumBuffers()).isEqualTo(3);
                         });
     }
@@ -77,38 +77,39 @@ class HsFileDataIndexCacheTest {
     void testCachedRegionRemovedWhenExceedsRetainedEntry(@TempDir Path 
tmpPath) throws Exception {
         numRetainedIndexEntry = 3;
         Path indexFilePath = 
Files.createFile(tmpPath.resolve(UUID.randomUUID().toString()));
+        
TestingFileDataIndexSpilledRegionManager.Factory<TestingFileDataIndexRegion>
+                testingSpilledRegionManagerFactory =
+                        new 
TestingFileDataIndexSpilledRegionManager.Factory<>();
         indexCache =
-                new HsFileDataIndexCache(
+                new FileDataIndexCache<>(
                         numSubpartitions,
                         indexFilePath,
                         numRetainedIndexEntry,
-                        
TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE);
-        spilledRegionManager =
-                TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE
-                        .getLastSpilledRegionManager();
+                        testingSpilledRegionManagerFactory);
+        spilledRegionManager = 
testingSpilledRegionManagerFactory.getLastSpilledRegionManager();
 
         // region 0, 1, 2
-        List<InternalRegion> regionList = createAllUnreleasedRegions(0, 0L, 3, 
3);
+        List<TestingFileDataIndexRegion> regionList = createTestRegions(0, 0L, 
3, 3);
         indexCache.put(0, regionList);
         assertThat(spilledRegionManager.getSpilledRegionSize(0)).isZero();
 
         // number of regions exceeds numRetainedIndexEntry, trigger cache 
purge.
-        indexCache.put(0, createAllUnreleasedRegions(9, 9L, 3, 1));
+        indexCache.put(0, createTestRegions(9, 9L, 3, 1));
         assertThat(spilledRegionManager.getSpilledRegionSize(0)).isEqualTo(1);
-        InternalRegion region = spilledRegionManager.getRegion(0, 0);
+        TestingFileDataIndexRegion region = spilledRegionManager.getRegion(0, 
0);
         assertThat(region).isNotNull();
         assertRegionEquals(region, regionList.get(0));
         // number of regions exceeds numRetainedIndexEntry, trigger cache 
purge.
-        indexCache.put(0, createAllUnreleasedRegions(12, 12L, 3, 1));
+        indexCache.put(0, createTestRegions(12, 12L, 3, 1));
         assertThat(spilledRegionManager.getSpilledRegionSize(0)).isEqualTo(2);
-        InternalRegion region2 = spilledRegionManager.getRegion(0, 3);
+        TestingFileDataIndexRegion region2 = spilledRegionManager.getRegion(0, 
3);
         assertThat(region2).isNotNull();
         assertRegionEquals(region2, regionList.get(1));
     }
 
     @Test
     void testGetNonExistentRegion() {
-        Optional<InternalRegion> region = indexCache.get(0, 0);
+        Optional<TestingFileDataIndexRegion> region = indexCache.get(0, 0);
         // get a non-existent region.
         assertThat(region).isNotPresent();
     }
@@ -117,21 +118,22 @@ class HsFileDataIndexCacheTest {
     void testCacheLoadSpilledRegion(@TempDir Path tmpPath) throws Exception {
         numRetainedIndexEntry = 1;
         Path indexFilePath = 
Files.createFile(tmpPath.resolve(UUID.randomUUID().toString()));
+        
TestingFileDataIndexSpilledRegionManager.Factory<TestingFileDataIndexRegion>
+                testingSpilledRegionManagerFactory =
+                        new 
TestingFileDataIndexSpilledRegionManager.Factory<>();
         indexCache =
-                new HsFileDataIndexCache(
+                new FileDataIndexCache<>(
                         numSubpartitions,
                         indexFilePath,
                         numRetainedIndexEntry,
-                        
TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE);
-        spilledRegionManager =
-                TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE
-                        .getLastSpilledRegionManager();
+                        testingSpilledRegionManagerFactory);
+        spilledRegionManager = 
testingSpilledRegionManagerFactory.getLastSpilledRegionManager();
 
-        indexCache.put(0, createAllUnreleasedRegions(0, 0L, 1, 2));
+        indexCache.put(0, createTestRegions(0, 0L, 1, 2));
         assertThat(spilledRegionManager.getSpilledRegionSize(0)).isEqualTo(1);
 
         assertThat(spilledRegionManager.getFindRegionInvoked()).isZero();
-        Optional<InternalRegion> regionOpt = indexCache.get(0, 0);
+        Optional<TestingFileDataIndexRegion> regionOpt = indexCache.get(0, 0);
         assertThat(spilledRegionManager.getSpilledRegionSize(0)).isEqualTo(2);
         assertThat(regionOpt).isPresent();
         assertThat(spilledRegionManager.getFindRegionInvoked()).isEqualTo(1);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManagerImplTest.java
similarity index 51%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImplTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManagerImplTest.java
index bb4c31a4f2d..e488a7f2967 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManagerImplTest.java
@@ -16,9 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.partition.hybrid;
-
-import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
+package org.apache.flink.runtime.io.network.partition.hybrid.index;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -34,13 +32,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
 
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.assertRegionEquals;
-import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createAllUnreleasedRegions;
-import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createSingleUnreleasedRegion;
-import static 
org.apache.flink.runtime.io.network.partition.hybrid.InternalRegionWriteReadUtils.allocateAndConfigureBuffer;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createSingleTestRegion;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createTestRegions;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.index.TestingFileDataIndexRegion.readRegionFromFile;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link HsFileDataIndexSpilledRegionManagerImpl}. */
-class HsFileDataIndexSpilledRegionManagerImplTest {
+/** Tests for {@link FileDataIndexSpilledRegionManagerImpl}. */
+class FileDataIndexSpilledRegionManagerImplTest {
     private Path indexFilePath;
 
     @BeforeEach
@@ -51,7 +49,7 @@ class HsFileDataIndexSpilledRegionManagerImplTest {
     @Test
     void testFindNonExistentRegion() throws Exception {
         CompletableFuture<Void> cachedRegionFuture = new CompletableFuture<>();
-        try (HsFileDataIndexSpilledRegionManager spilledRegionManager =
+        try (FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> 
spilledRegionManager =
                 createSpilledRegionManager(
                         (ignore1, ignore2) -> 
cachedRegionFuture.complete(null))) {
             long regionOffset = spilledRegionManager.findRegion(0, 0, true);
@@ -63,114 +61,100 @@ class HsFileDataIndexSpilledRegionManagerImplTest {
     @Test
     void testAppendOrOverwriteRegion() throws Exception {
         CompletableFuture<Void> cachedRegionFuture = new CompletableFuture<>();
-        try (HsFileDataIndexSpilledRegionManager spilledRegionManager =
+        try (FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> 
spilledRegionManager =
                 createSpilledRegionManager(
                         (ignore1, ignore2) -> 
cachedRegionFuture.complete(null))) {
-            InternalRegion region = createSingleUnreleasedRegion(0, 0L, 1);
+            TestingFileDataIndexRegion region = createSingleTestRegion(0, 0L, 
1);
             // append region to index file.
             spilledRegionManager.appendOrOverwriteRegion(0, region);
             assertThat(cachedRegionFuture).isNotCompleted();
             FileChannel indexFileChannel = FileChannel.open(indexFilePath, 
StandardOpenOption.READ);
-            InternalRegion readRegion =
-                    InternalRegionWriteReadUtils.readRegionFromFile(
-                            indexFileChannel,
-                            
allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE),
-                            0L);
+            TestingFileDataIndexRegion readRegion = 
readRegionFromFile(indexFileChannel, 0L);
             assertRegionEquals(readRegion, region);
 
             // new region must have the same size of old region.
-            InternalRegion newRegion = createSingleUnreleasedRegion(0, 10L, 1);
+            TestingFileDataIndexRegion newRegion = createSingleTestRegion(0, 
10L, 1);
             // overwrite old region.
             spilledRegionManager.appendOrOverwriteRegion(0, newRegion);
             // appendOrOverwriteRegion will not trigger cache load.
             assertThat(cachedRegionFuture).isNotCompleted();
-            InternalRegion readNewRegion =
-                    InternalRegionWriteReadUtils.readRegionFromFile(
-                            indexFileChannel,
-                            
allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE),
-                            0L);
+            TestingFileDataIndexRegion readNewRegion = 
readRegionFromFile(indexFileChannel, 0L);
             assertRegionEquals(readNewRegion, newRegion);
         }
     }
 
     @Test
-    void testWriteMoreThanOneSegment() throws Exception {
-        List<InternalRegion> regions = createAllUnreleasedRegions(0, 0L, 2, 2);
-        int segmentSize = 
regions.stream().mapToInt(InternalRegion::getSize).sum() + 1;
-        try (HsFileDataIndexSpilledRegionManager spilledRegionManager =
-                createSpilledRegionManager(segmentSize, (ignore1, ignore2) -> 
{})) {
+    void testWriteMoreThanOneRegionGroup() throws Exception {
+        List<TestingFileDataIndexRegion> regions = createTestRegions(0, 0L, 2, 
2);
+        int regionGroupSize =
+                
regions.stream().mapToInt(TestingFileDataIndexRegion::getSize).sum() + 1;
+        try (FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> 
spilledRegionManager =
+                createSpilledRegionManager(regionGroupSize, (ignore1, ignore2) 
-> {})) {
             spilledRegionManager.appendOrOverwriteRegion(0, regions.get(0));
             spilledRegionManager.appendOrOverwriteRegion(0, regions.get(1));
-            // segment has no enough space, will start new segment.
-            InternalRegion regionInNewSegment = 
createSingleUnreleasedRegion(4, 4L, 2);
-            spilledRegionManager.appendOrOverwriteRegion(0, 
regionInNewSegment);
+            // region group has no enough space, will start new region group.
+            TestingFileDataIndexRegion regionInNewRegionGroup = 
createSingleTestRegion(4, 4L, 2);
+            spilledRegionManager.appendOrOverwriteRegion(0, 
regionInNewRegionGroup);
             FileChannel indexFileChannel = FileChannel.open(indexFilePath, 
StandardOpenOption.READ);
-            InternalRegion readRegion =
-                    InternalRegionWriteReadUtils.readRegionFromFile(
+            TestingFileDataIndexRegion readRegion =
+                    readRegionFromFile(
                             indexFileChannel,
-                            
allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE),
-                            // offset is segment size instead of two regions 
size to prove that new
-                            // segment is started.
-                            segmentSize);
-            assertRegionEquals(readRegion, regionInNewSegment);
+                            // offset is region group size instead of two 
regions size to prove that
+                            // new region group is started.
+                            regionGroupSize);
+            assertRegionEquals(readRegion, regionInNewRegionGroup);
         }
     }
 
     @Test
     void testWriteBigRegion() throws Exception {
-        int segmentSize = 4;
-        try (HsFileDataIndexSpilledRegionManager spilledRegionManager =
-                createSpilledRegionManager(segmentSize, (ignore1, ignore2) -> 
{})) {
-            List<InternalRegion> regions = createAllUnreleasedRegions(0, 0L, 
1, 2);
-            InternalRegion region1 = regions.get(0);
-            InternalRegion region2 = regions.get(1);
-            assertThat(region1.getSize()).isGreaterThan(segmentSize);
-            assertThat(region2.getSize()).isGreaterThan(segmentSize);
+        int regionGroupSize = 4;
+        try (FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> 
spilledRegionManager =
+                createSpilledRegionManager(regionGroupSize, (ignore1, ignore2) 
-> {})) {
+            List<TestingFileDataIndexRegion> regions = createTestRegions(0, 
0L, 1, 2);
+            TestingFileDataIndexRegion region1 = regions.get(0);
+            TestingFileDataIndexRegion region2 = regions.get(1);
+            assertThat(region1.getSize()).isGreaterThan(regionGroupSize);
+            assertThat(region2.getSize()).isGreaterThan(regionGroupSize);
 
             spilledRegionManager.appendOrOverwriteRegion(0, region1);
             spilledRegionManager.appendOrOverwriteRegion(0, region2);
             FileChannel indexFileChannel = FileChannel.open(indexFilePath, 
StandardOpenOption.READ);
-            InternalRegion readRegion1 =
-                    InternalRegionWriteReadUtils.readRegionFromFile(
-                            indexFileChannel,
-                            
allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE),
-                            0L);
+            TestingFileDataIndexRegion readRegion1 = 
readRegionFromFile(indexFileChannel, 0L);
             assertRegionEquals(readRegion1, region1);
 
-            InternalRegion readRegion2 =
-                    InternalRegionWriteReadUtils.readRegionFromFile(
-                            indexFileChannel,
-                            
allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE),
-                            readRegion1.getSize());
+            TestingFileDataIndexRegion readRegion2 =
+                    readRegionFromFile(indexFileChannel, 
readRegion1.getSize());
             assertRegionEquals(readRegion2, region2);
         }
     }
 
     @Test
-    void testFindRegionFirstBufferIndexInMultipleSegments() throws Exception {
+    void testFindRegionFirstBufferIndexInMultipleRegionGroups() throws 
Exception {
         final int numBuffersPerRegion = 2;
         final int subpartition = 0;
-        List<InternalRegion> loadedRegions = new ArrayList<>();
-        try (HsFileDataIndexSpilledRegionManager spilledRegionManager =
+        List<TestingFileDataIndexRegion> loadedRegions = new ArrayList<>();
+        try (FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> 
spilledRegionManager =
                 createSpilledRegionManager(
-                        // every segment can store two regions.
-                        (InternalRegion.HEADER_SIZE + numBuffersPerRegion) * 2,
+                        // every region group can store two regions.
+                        TestingFileDataIndexRegion.REGION_SIZE * 2,
                         (ignore, region) -> loadedRegions.add(region))) {
-            // segment1: region1(0-2), region2(9-11), min:0, max: 11
+            // region group1: region1(0-1), region2(9-10), min:0, max: 10
             spilledRegionManager.appendOrOverwriteRegion(
-                    subpartition, createSingleUnreleasedRegion(0, 0L, 
numBuffersPerRegion));
+                    subpartition, createSingleTestRegion(0, 0L, 
numBuffersPerRegion));
             spilledRegionManager.appendOrOverwriteRegion(
-                    subpartition, createSingleUnreleasedRegion(9, 9L, 
numBuffersPerRegion));
+                    subpartition, createSingleTestRegion(9, 9L, 
numBuffersPerRegion));
 
-            // segment2: region1(2-4), region2(11-13) min: 2, max: 13
-            InternalRegion targetRegion = createSingleUnreleasedRegion(2, 2L, 
2);
+            // region group2: region1(2-3), region2(11-12) min: 2, max: 12
+            TestingFileDataIndexRegion targetRegion =
+                    createSingleTestRegion(2, 2L, numBuffersPerRegion);
             spilledRegionManager.appendOrOverwriteRegion(subpartition, 
targetRegion);
             spilledRegionManager.appendOrOverwriteRegion(
-                    subpartition, createSingleUnreleasedRegion(11, 11L, 2));
+                    subpartition, createSingleTestRegion(11, 11L, 
numBuffersPerRegion));
 
-            // segment3: region1(7-9) min: 7, max: 9
+            // region group3: region1(7-8) min: 7, max: 8
             spilledRegionManager.appendOrOverwriteRegion(
-                    subpartition, createSingleUnreleasedRegion(7, 7L, 2));
+                    subpartition, createSingleTestRegion(7, 7L, 
numBuffersPerRegion));
 
             // find target region
             long regionOffset = spilledRegionManager.findRegion(subpartition, 
3, true);
@@ -181,15 +165,27 @@ class HsFileDataIndexSpilledRegionManagerImplTest {
         }
     }
 
-    private HsFileDataIndexSpilledRegionManager createSpilledRegionManager(
-            BiConsumer<Integer, InternalRegion> cacheRegionConsumer) {
+    private FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion>
+            createSpilledRegionManager(
+                    BiConsumer<Integer, TestingFileDataIndexRegion> 
cacheRegionConsumer) {
         return createSpilledRegionManager(256, cacheRegionConsumer);
     }
 
-    private HsFileDataIndexSpilledRegionManager createSpilledRegionManager(
-            int segmentSize, BiConsumer<Integer, InternalRegion> 
cacheRegionConsumer) {
+    private FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion>
+            createSpilledRegionManager(
+                    int regionGroupSize,
+                    BiConsumer<Integer, TestingFileDataIndexRegion> 
cacheRegionConsumer) {
         int numSubpartitions = 2;
-        return new 
HsFileDataIndexSpilledRegionManagerImpl.Factory(segmentSize, Long.MAX_VALUE)
+        return new FileDataIndexSpilledRegionManagerImpl.Factory<>(
+                        regionGroupSize,
+                        Long.MAX_VALUE,
+                        TestingFileDataIndexRegion.REGION_SIZE,
+                        new TestingFileDataIndexRegionHelper.Builder()
+                                .setReadRegionFromFileFunction(
+                                        
TestingFileDataIndexRegion::readRegionFromFile)
+                                .setWriteRegionToFileConsumer(
+                                        
TestingFileDataIndexRegion::writeRegionToFile)
+                                .build())
                 .create(numSubpartitions, indexFilePath, cacheRegionConsumer);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java
similarity index 65%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtilsTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java
index e65a8a132d9..fc2cf847002 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.partition.hybrid;
+package org.apache.flink.runtime.io.network.partition.hybrid.index;
 
-import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -34,16 +34,15 @@ import java.util.UUID;
 
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion.HEADER_SIZE;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.assertRegionEquals;
-import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createSingleUnreleasedRegion;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Tests for {@link InternalRegionWriteReadUtils}. */
-class InternalRegionWriteReadUtilsTest {
+/** Tests for {@link FileRegionWriteReadUtils}. */
+class FileRegionWriteReadUtilsTest {
     @Test
     void testAllocateAndConfigureBuffer() {
         final int bufferSize = 16;
-        ByteBuffer buffer = 
InternalRegionWriteReadUtils.allocateAndConfigureBuffer(bufferSize);
+        ByteBuffer buffer = 
FileRegionWriteReadUtils.allocateAndConfigureBuffer(bufferSize);
         assertThat(buffer.capacity()).isEqualTo(16);
         assertThat(buffer.limit()).isEqualTo(16);
         assertThat(buffer.position()).isZero();
@@ -52,27 +51,30 @@ class InternalRegionWriteReadUtilsTest {
     }
 
     @Test
-    void testReadPrematureEndOfFile(@TempDir Path tmpPath) throws Exception {
+    void testReadPrematureEndOfFileForHsInternalRegion(@TempDir Path tmpPath) 
throws Exception {
         FileChannel channel = tmpFileChannel(tmpPath);
-        ByteBuffer buffer = 
InternalRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE);
-        InternalRegionWriteReadUtils.writeRegionToFile(
-                channel, buffer, createSingleUnreleasedRegion(0, 0L, 1));
+        ByteBuffer buffer = 
FileRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE);
+        FileRegionWriteReadUtils.writeHsInternalRegionToFile(
+                channel, buffer, new HsFileDataIndexImpl.InternalRegion(0, 0, 
1, new boolean[1]));
         channel.truncate(channel.position() - 1);
         buffer.flip();
         assertThatThrownBy(
-                        () -> 
InternalRegionWriteReadUtils.readRegionFromFile(channel, buffer, 0L))
+                        () ->
+                                
FileRegionWriteReadUtils.readHsInternalRegionFromFile(
+                                        channel, buffer, 0L))
                 .isInstanceOf(IOException.class);
     }
 
     @Test
-    void testWriteAndReadRegion(@TempDir Path tmpPath) throws Exception {
+    void testWriteAndReadHsInternalRegion(@TempDir Path tmpPath) throws 
Exception {
         FileChannel channel = tmpFileChannel(tmpPath);
-        ByteBuffer buffer = 
InternalRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE);
-        InternalRegion region = createSingleUnreleasedRegion(10, 100L, 1);
-        InternalRegionWriteReadUtils.writeRegionToFile(channel, buffer, 
region);
+        ByteBuffer buffer = 
FileRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE);
+        HsFileDataIndexImpl.InternalRegion region =
+                new HsFileDataIndexImpl.InternalRegion(0, 0, 1, new 
boolean[1]);
+        FileRegionWriteReadUtils.writeHsInternalRegionToFile(channel, buffer, 
region);
         buffer.flip();
-        InternalRegion readRegion =
-                InternalRegionWriteReadUtils.readRegionFromFile(channel, 
buffer, 0L);
+        HsFileDataIndexImpl.InternalRegion readRegion =
+                FileRegionWriteReadUtils.readHsInternalRegionFromFile(channel, 
buffer, 0L);
         assertRegionEquals(readRegion, region);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegion.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegion.java
new file mode 100644
index 00000000000..29ee310fba8
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegion.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.index;
+
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.index.FileRegionWriteReadUtils.allocateAndConfigureBuffer;
+
+/** Testing implementation for {@link FileDataIndexRegionHelper.Region}. */
+public class TestingFileDataIndexRegion implements 
FileDataIndexRegionHelper.Region {
+
+    public static final int REGION_SIZE = Integer.BYTES + Long.BYTES + 
Integer.BYTES;
+
+    private final Supplier<Integer> getSizeSupplier;
+
+    private final Supplier<Integer> getFirstBufferIndexSupplier;
+
+    private final Supplier<Long> getRegionFileOffsetSupplier;
+
+    private final Supplier<Integer> getNumBuffersSupplier;
+
+    private final Function<Integer, Boolean> containBufferFunction;
+
+    private TestingFileDataIndexRegion(
+            Supplier<Integer> getSizeSupplier,
+            Supplier<Integer> getFirstBufferIndexSupplier,
+            Supplier<Long> getRegionFileOffsetSupplier,
+            Supplier<Integer> getNumBuffersSupplier,
+            Function<Integer, Boolean> containBufferFunction) {
+        this.getSizeSupplier = getSizeSupplier;
+        this.getFirstBufferIndexSupplier = getFirstBufferIndexSupplier;
+        this.getRegionFileOffsetSupplier = getRegionFileOffsetSupplier;
+        this.getNumBuffersSupplier = getNumBuffersSupplier;
+        this.containBufferFunction = containBufferFunction;
+    }
+
+    @Override
+    public int getSize() {
+        return getSizeSupplier.get();
+    }
+
+    @Override
+    public int getFirstBufferIndex() {
+        return getFirstBufferIndexSupplier.get();
+    }
+
+    @Override
+    public long getRegionFileOffset() {
+        return getRegionFileOffsetSupplier.get();
+    }
+
+    @Override
+    public int getNumBuffers() {
+        return getNumBuffersSupplier.get();
+    }
+
+    @Override
+    public boolean containBuffer(int bufferIndex) {
+        return containBufferFunction.apply(bufferIndex);
+    }
+
+    public static void writeRegionToFile(FileChannel channel, 
TestingFileDataIndexRegion region)
+            throws IOException {
+        ByteBuffer regionBuffer = allocateHeaderBuffer();
+        regionBuffer.clear();
+        regionBuffer.putInt(region.getFirstBufferIndex());
+        regionBuffer.putInt(region.getNumBuffers());
+        regionBuffer.putLong(region.getRegionFileOffset());
+        regionBuffer.flip();
+        BufferReaderWriterUtil.writeBuffers(channel, regionBuffer.capacity(), 
regionBuffer);
+    }
+
+    public static TestingFileDataIndexRegion readRegionFromFile(
+            FileChannel channel, long fileOffset) throws IOException {
+        ByteBuffer regionBuffer = allocateHeaderBuffer();
+        regionBuffer.clear();
+        BufferReaderWriterUtil.readByteBufferFully(channel, regionBuffer, 
fileOffset);
+        regionBuffer.flip();
+        int firstBufferIndex = regionBuffer.getInt();
+        int numBuffers = regionBuffer.getInt();
+        long firstBufferOffset = regionBuffer.getLong();
+        return new TestingFileDataIndexRegion.Builder()
+                .setGetSizeSupplier(() -> 
TestingFileDataIndexRegion.REGION_SIZE)
+                .setGetFirstBufferIndexSupplier(() -> firstBufferIndex)
+                .setGetRegionFileOffsetSupplier(() -> firstBufferOffset)
+                .setGetNumBuffersSupplier(() -> numBuffers)
+                .setContainBufferFunction(
+                        bufferIndex ->
+                                getContainBufferFunction(bufferIndex, 
firstBufferIndex, numBuffers))
+                .build();
+    }
+
+    private static ByteBuffer allocateHeaderBuffer() {
+        return 
allocateAndConfigureBuffer(TestingFileDataIndexRegion.REGION_SIZE);
+    }
+
+    public static boolean getContainBufferFunction(
+            int bufferIndex, int firstBufferIndex, int numBuffers) {
+        return bufferIndex >= firstBufferIndex && bufferIndex < 
firstBufferIndex + numBuffers;
+    }
+
+    /** Builder for {@link TestingFileDataIndexRegion}. */
+    public static class Builder {
+
+        private Supplier<Integer> getSizeSupplier = () -> 0;
+
+        private Supplier<Integer> getFirstBufferIndexSupplier = () -> 0;
+
+        private Supplier<Long> getRegionFileOffsetSupplier = () -> 0L;
+
+        private Supplier<Integer> getNumBuffersSupplier = () -> 0;
+
+        private Function<Integer, Boolean> containBufferFunction = bufferIndex 
-> false;
+
+        public TestingFileDataIndexRegion.Builder setGetSizeSupplier(
+                Supplier<Integer> getSizeSupplier) {
+            this.getSizeSupplier = getSizeSupplier;
+            return this;
+        }
+
+        public TestingFileDataIndexRegion.Builder 
setGetFirstBufferIndexSupplier(
+                Supplier<Integer> getFirstBufferIndexSupplier) {
+            this.getFirstBufferIndexSupplier = getFirstBufferIndexSupplier;
+            return this;
+        }
+
+        public TestingFileDataIndexRegion.Builder 
setGetRegionFileOffsetSupplier(
+                Supplier<Long> getRegionFileOffsetSupplier) {
+            this.getRegionFileOffsetSupplier = getRegionFileOffsetSupplier;
+            return this;
+        }
+
+        public TestingFileDataIndexRegion.Builder setGetNumBuffersSupplier(
+                Supplier<Integer> getNumBuffersSupplier) {
+            this.getNumBuffersSupplier = getNumBuffersSupplier;
+            return this;
+        }
+
+        public TestingFileDataIndexRegion.Builder setContainBufferFunction(
+                Function<Integer, Boolean> containBufferFunction) {
+            this.containBufferFunction = containBufferFunction;
+            return this;
+        }
+
+        public TestingFileDataIndexRegion build() {
+            return new TestingFileDataIndexRegion(
+                    getSizeSupplier,
+                    getFirstBufferIndexSupplier,
+                    getRegionFileOffsetSupplier,
+                    getNumBuffersSupplier,
+                    containBufferFunction);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegionHelper.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegionHelper.java
new file mode 100644
index 00000000000..3a36e3ca6ac
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegionHelper.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.index;
+
+import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.BiFunctionWithException;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+/** Testing implementation for {@link FileDataIndexRegionHelper}. */
+public class TestingFileDataIndexRegionHelper
+        implements FileDataIndexRegionHelper<TestingFileDataIndexRegion> {
+
+    private final BiConsumerWithException<FileChannel, 
TestingFileDataIndexRegion, IOException>
+            writeRegionToFileConsumer;
+
+    private final BiFunctionWithException<
+                    FileChannel, Long, TestingFileDataIndexRegion, IOException>
+            readRegionFromFileFunction;
+
+    private TestingFileDataIndexRegionHelper(
+            BiConsumerWithException<FileChannel, TestingFileDataIndexRegion, 
IOException>
+                    writeRegionToFileConsumer,
+            BiFunctionWithException<FileChannel, Long, 
TestingFileDataIndexRegion, IOException>
+                    readRegionFromFileFunction) {
+        this.writeRegionToFileConsumer = writeRegionToFileConsumer;
+        this.readRegionFromFileFunction = readRegionFromFileFunction;
+    }
+
+    @Override
+    public void writeRegionToFile(FileChannel channel, 
TestingFileDataIndexRegion region)
+            throws IOException {
+        writeRegionToFileConsumer.accept(channel, region);
+    }
+
+    @Override
+    public TestingFileDataIndexRegion readRegionFromFile(FileChannel channel, 
long fileOffset)
+            throws IOException {
+        return readRegionFromFileFunction.apply(channel, fileOffset);
+    }
+
+    /** Builder for {@link TestingFileDataIndexRegionHelper}. */
+    public static class Builder {
+        private BiConsumerWithException<FileChannel, 
TestingFileDataIndexRegion, IOException>
+                writeRegionToFileConsumer = (fileChannel, testRegion) -> {};
+
+        private BiFunctionWithException<FileChannel, Long, 
TestingFileDataIndexRegion, IOException>
+                readRegionFromFileFunction = (fileChannel, fileOffset) -> null;
+
+        public TestingFileDataIndexRegionHelper.Builder 
setWriteRegionToFileConsumer(
+                BiConsumerWithException<FileChannel, 
TestingFileDataIndexRegion, IOException>
+                        writeRegionToFileConsumer) {
+            this.writeRegionToFileConsumer = writeRegionToFileConsumer;
+            return this;
+        }
+
+        public TestingFileDataIndexRegionHelper.Builder 
setReadRegionFromFileFunction(
+                BiFunctionWithException<FileChannel, Long, 
TestingFileDataIndexRegion, IOException>
+                        readRegionFromFileFunction) {
+            this.readRegionFromFileFunction = readRegionFromFileFunction;
+            return this;
+        }
+
+        public TestingFileDataIndexRegionHelper build() {
+            return new TestingFileDataIndexRegionHelper(
+                    writeRegionToFileConsumer, readRegionFromFileFunction);
+        }
+    }
+}

Reply via email to