This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e50e24797fbb154cdf7c484775f0fafa94cf34c
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Jan 5 15:34:09 2023 +0800

    [FLINK-30332][network] HsFileDataIndex supports caching index entry and 
introduce config option to configure cache size.
    
    This closes #21603
---
 .../generated/all_taskmanager_network_section.html | 12 +++++
 .../netty_shuffle_environment_configuration.html   | 12 +++++
 .../NettyShuffleEnvironmentOptions.java            | 18 ++++++++
 .../io/network/NettyShuffleServiceFactory.java     |  4 +-
 .../network/partition/ResultPartitionFactory.java  | 35 +++++++++-----
 .../network/partition/hybrid/HsFileDataIndex.java  |  3 ++
 .../partition/hybrid/HsFileDataIndexImpl.java      | 53 +++++++++++++---------
 .../partition/hybrid/HsFileDataManager.java        |  7 +--
 .../partition/hybrid/HsResultPartition.java        | 12 ++++-
 .../hybrid/HybridShuffleConfiguration.java         | 46 +++++++++++++++++--
 .../NettyShuffleEnvironmentConfiguration.java      | 32 ++++++++++++-
 .../io/network/NettyShuffleEnvironmentBuilder.java | 21 ++++++++-
 .../network/partition/ResultPartitionBuilder.java  | 21 ++++++++-
 .../partition/ResultPartitionFactoryTest.java      |  4 +-
 .../partition/hybrid/HsFileDataIndexImplTest.java  |  8 +++-
 .../partition/hybrid/HsFileDataManagerTest.java    | 13 ++++--
 .../partition/hybrid/HsMemoryDataManagerTest.java  | 11 ++++-
 .../hybrid/HsSubpartitionFileReaderImplTest.java   | 22 ++++++---
 .../partition/hybrid/HsSubpartitionViewTest.java   |  3 +-
 .../partition/hybrid/TestingFileDataIndex.java     |  5 ++
 20 files changed, 279 insertions(+), 63 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html 
b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
index 34deb37d450..d4d5b2bed0a 100644
--- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
+++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
@@ -32,6 +32,18 @@
             <td>Boolean</td>
             <td>Boolean flag to enable/disable more detailed metrics about 
inbound/outbound network queue lengths.</td>
         </tr>
+        <tr>
+            
<td><h5>taskmanager.network.hybrid-shuffle.num-retained-in-memory-regions-max</h5></td>
+            <td style="word-wrap: break-word;">1048576</td>
+            <td>Long</td>
+            <td>Controls the max number of hybrid retained regions in 
memory.</td>
+        </tr>
+        <tr>
+            
<td><h5>taskmanager.network.hybrid-shuffle.spill-index-segment-size</h5></td>
+            <td style="word-wrap: break-word;">1024</td>
+            <td>Integer</td>
+            <td>Controls the segment size(in bytes) of hybrid spilled file 
data index.</td>
+        </tr>
         <tr>
             <td><h5>taskmanager.network.max-num-tcp-connections</h5></td>
             <td style="word-wrap: break-word;">1</td>
diff --git 
a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
 
b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
index 4ed81c078e3..adcdef70d27 100644
--- 
a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
@@ -50,6 +50,18 @@
             <td>Boolean</td>
             <td>Boolean flag to enable/disable more detailed metrics about 
inbound/outbound network queue lengths.</td>
         </tr>
+        <tr>
+            
<td><h5>taskmanager.network.hybrid-shuffle.num-retained-in-memory-regions-max</h5></td>
+            <td style="word-wrap: break-word;">1048576</td>
+            <td>Long</td>
+            <td>Controls the max number of hybrid retained regions in 
memory.</td>
+        </tr>
+        <tr>
+            
<td><h5>taskmanager.network.hybrid-shuffle.spill-index-segment-size</h5></td>
+            <td style="word-wrap: break-word;">1024</td>
+            <td>Integer</td>
+            <td>Controls the segment size(in bytes) of hybrid spilled file 
data index.</td>
+        </tr>
         <tr>
             <td><h5>taskmanager.network.max-num-tcp-connections</h5></td>
             <td style="word-wrap: break-word;">1</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index 802ee049c34..5e943363d52 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -270,6 +270,24 @@ public class NettyShuffleEnvironmentOptions {
                                     // this raw value must be changed 
correspondingly
                                     
"taskmanager.memory.framework.off-heap.batch-shuffle.size"));
 
+    /** Segment size of hybrid spilled file data index. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> 
HYBRID_SHUFFLE_SPILLED_INDEX_SEGMENT_SIZE =
+            key("taskmanager.network.hybrid-shuffle.spill-index-segment-size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "Controls the segment size(in bytes) of hybrid 
spilled file data index.");
+
+    /** Max number of hybrid retained regions in memory. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Long> 
HYBRID_SHUFFLE_NUM_RETAINED_IN_MEMORY_REGIONS_MAX =
+            
key("taskmanager.network.hybrid-shuffle.num-retained-in-memory-regions-max")
+                    .longType()
+                    .defaultValue(1024 * 1024L)
+                    .withDescription(
+                            "Controls the max number of hybrid retained 
regions in memory.");
+
     /** Number of max buffers can be used for each output subpartition. */
     @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
     public static final ConfigOption<Integer> NETWORK_MAX_BUFFERS_PER_CHANNEL =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index bd6847f29e4..e2f2166397b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -210,7 +210,9 @@ public class NettyShuffleServiceFactory
                         config.sortShuffleMinBuffers(),
                         config.sortShuffleMinParallelism(),
                         config.isSSLEnabled(),
-                        config.getMaxOverdraftBuffersPerGate());
+                        config.getMaxOverdraftBuffersPerGate(),
+                        config.getHybridShuffleSpilledIndexSegmentSize(),
+                        
config.getHybridShuffleNumRetainedInMemoryRegionsMax());
 
         SingleInputGateFactory singleInputGateFactory =
                 new SingleInputGateFactory(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 7858f4c6709..968fbcc80dc 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -75,6 +75,10 @@ public class ResultPartitionFactory {
 
     private final int sortShuffleMinParallelism;
 
+    private final int hybridShuffleSpilledIndexSegmentSize;
+
+    private final long hybridShuffleNumRetainedInMemoryRegionsMax;
+
     private final boolean sslEnabled;
 
     private final int maxOverdraftBuffersPerGate;
@@ -95,7 +99,9 @@ public class ResultPartitionFactory {
             int sortShuffleMinBuffers,
             int sortShuffleMinParallelism,
             boolean sslEnabled,
-            int maxOverdraftBuffersPerGate) {
+            int maxOverdraftBuffersPerGate,
+            int hybridShuffleSpilledIndexSegmentSize,
+            long hybridShuffleNumRetainedInMemoryRegionsMax) {
 
         this.partitionManager = partitionManager;
         this.channelManager = channelManager;
@@ -113,6 +119,9 @@ public class ResultPartitionFactory {
         this.sortShuffleMinParallelism = sortShuffleMinParallelism;
         this.sslEnabled = sslEnabled;
         this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate;
+        this.hybridShuffleSpilledIndexSegmentSize = 
hybridShuffleSpilledIndexSegmentSize;
+        this.hybridShuffleNumRetainedInMemoryRegionsMax =
+                hybridShuffleNumRetainedInMemoryRegionsMax;
     }
 
     public ResultPartition create(
@@ -240,16 +249,7 @@ public class ResultPartitionFactory {
                             partitionManager,
                             channelManager.createChannel().getPath(),
                             networkBufferSize,
-                            HybridShuffleConfiguration.builder(
-                                            numberOfSubpartitions,
-                                            
batchShuffleReadBufferPool.getNumBuffersPerRequest())
-                                    .setSpillingStrategyType(
-                                            type == 
ResultPartitionType.HYBRID_FULL
-                                                    ? 
HybridShuffleConfiguration
-                                                            
.SpillingStrategyType.FULL
-                                                    : 
HybridShuffleConfiguration
-                                                            
.SpillingStrategyType.SELECTIVE)
-                                    .build(),
+                            
getHybridShuffleConfiguration(numberOfSubpartitions, type),
                             bufferCompressor,
                             isBroadcast,
                             bufferPoolFactory);
@@ -262,6 +262,19 @@ public class ResultPartitionFactory {
         return partition;
     }
 
+    private HybridShuffleConfiguration getHybridShuffleConfiguration(
+            int numberOfSubpartitions, ResultPartitionType 
resultPartitionType) {
+        return HybridShuffleConfiguration.builder(
+                        numberOfSubpartitions, 
batchShuffleReadBufferPool.getNumBuffersPerRequest())
+                .setSpillingStrategyType(
+                        resultPartitionType == ResultPartitionType.HYBRID_FULL
+                                ? 
HybridShuffleConfiguration.SpillingStrategyType.FULL
+                                : 
HybridShuffleConfiguration.SpillingStrategyType.SELECTIVE)
+                
.setSpilledIndexSegmentSize(hybridShuffleSpilledIndexSegmentSize)
+                
.setNumRetainedInMemoryRegionsMax(hybridShuffleNumRetainedInMemoryRegionsMax)
+                .build();
+    }
+
     private static void initializeBoundedBlockingPartitions(
             ResultSubpartition[] subpartitions,
             BoundedBlockingResultPartition parent,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndex.java
index b9bc5c01de4..ef1bd7f1f45 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndex.java
@@ -64,6 +64,9 @@ public interface HsFileDataIndex {
      */
     void markBufferReleased(int subpartitionId, int bufferIndex);
 
+    /** Close this file data index. */
+    void close();
+
     /**
      * Represents a series of physically continuous buffers in the file, which 
are readable, from
      * the same subpartition, and has sequential buffer index.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
index 5249b644b5c..c832d715329 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
@@ -18,9 +18,13 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid;
 
+import org.apache.flink.util.ExceptionUtils;
+
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 
+import java.io.IOException;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -29,7 +33,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.TreeMap;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -38,15 +41,35 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 public class HsFileDataIndexImpl implements HsFileDataIndex {
 
     @GuardedBy("lock")
-    private final List<TreeMap<Integer, InternalRegion>>
-            subpartitionFirstBufferIndexInternalRegions;
+    private final HsFileDataIndexCache indexCache;
 
+    /**
+     * {@link HsFileDataIndexCache} is not thread-safe, any access to it needs 
to hold this lock.
+     */
     private final Object lock = new Object();
 
-    public HsFileDataIndexImpl(int numSubpartitions) {
-        this.subpartitionFirstBufferIndexInternalRegions = new 
ArrayList<>(numSubpartitions);
-        for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
-            subpartitionFirstBufferIndexInternalRegions.add(new TreeMap<>());
+    public HsFileDataIndexImpl(
+            int numSubpartitions,
+            Path indexFilePath,
+            int spilledIndexSegmentSize,
+            long numRetainedInMemoryRegionsMax) {
+        this.indexCache =
+                new HsFileDataIndexCache(
+                        numSubpartitions,
+                        indexFilePath,
+                        numRetainedInMemoryRegionsMax,
+                        new HsFileDataIndexSpilledRegionManagerImpl.Factory(
+                                spilledIndexSegmentSize, 
numRetainedInMemoryRegionsMax));
+    }
+
+    @Override
+    public void close() {
+        synchronized (lock) {
+            try {
+                indexCache.close();
+            } catch (IOException e) {
+                ExceptionUtils.rethrow(e);
+            }
         }
     }
 
@@ -67,14 +90,7 @@ public class HsFileDataIndexImpl implements HsFileDataIndex {
         final Map<Integer, List<InternalRegion>> subpartitionInternalRegions =
                 convertToInternalRegions(spilledBuffers);
         synchronized (lock) {
-            subpartitionInternalRegions.forEach(
-                    (subpartition, internalRegions) -> {
-                        TreeMap<Integer, InternalRegion> treeMap =
-                                
subpartitionFirstBufferIndexInternalRegions.get(subpartition);
-                        for (InternalRegion internalRegion : internalRegions) {
-                            treeMap.put(internalRegion.firstBufferIndex, 
internalRegion);
-                        }
-                    });
+            subpartitionInternalRegions.forEach(indexCache::put);
         }
     }
 
@@ -88,12 +104,7 @@ public class HsFileDataIndexImpl implements HsFileDataIndex 
{
 
     @GuardedBy("lock")
     private Optional<InternalRegion> getInternalRegion(int subpartitionId, int 
bufferIndex) {
-        return Optional.ofNullable(
-                        subpartitionFirstBufferIndexInternalRegions
-                                .get(subpartitionId)
-                                .floorEntry(bufferIndex))
-                .map(Map.Entry::getValue)
-                .filter(internalRegion -> 
internalRegion.containBuffer(bufferIndex));
+        return indexCache.get(subpartitionId, bufferIndex);
     }
 
     private static Map<Integer, List<InternalRegion>> convertToInternalRegions(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index a54456cabcc..dc898dacfb8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -179,7 +179,8 @@ public class HsFileDataManager implements Runnable, 
BufferRecycler {
         }
     }
 
-    public void deleteShuffleFile() {
+    public void closeDataIndexAndDeleteShuffleFile() {
+        dataIndex.close();
         IOUtils.deleteFileQuietly(dataFilePath);
     }
 
@@ -207,8 +208,8 @@ public class HsFileDataManager implements Runnable, 
BufferRecycler {
             failSubpartitionReaders(
                     pendingReaders,
                     new IllegalStateException("Result partition has been 
already released."));
-            // delete the shuffle file only when no reader is reading now.
-            releaseFuture.thenRun(this::deleteShuffleFile);
+            // close data index and delete shuffle file only when no reader is 
reading now.
+            releaseFuture.thenRun(this::closeDataIndexAndDeleteShuffleFile);
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
index 33f630fd79c..56cdef6e006 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
@@ -60,6 +60,8 @@ import static org.apache.flink.util.Preconditions.checkState;
 public class HsResultPartition extends ResultPartition {
     public static final String DATA_FILE_SUFFIX = ".hybrid.data";
 
+    public static final String INDEX_FILE_SUFFIX = ".hybrid.index";
+
     public static final int BROADCAST_CHANNEL = 0;
 
     private final HsFileDataIndex dataIndex;
@@ -68,6 +70,8 @@ public class HsResultPartition extends ResultPartition {
 
     private final Path dataFilePath;
 
+    private final Path indexFilePath;
+
     private final int networkBufferSize;
 
     private final HybridShuffleConfiguration hybridShuffleConfiguration;
@@ -109,8 +113,14 @@ public class HsResultPartition extends ResultPartition {
                 bufferCompressor,
                 bufferPoolFactory);
         this.networkBufferSize = networkBufferSize;
-        this.dataIndex = new HsFileDataIndexImpl(isBroadcastOnly ? 1 : 
numSubpartitions);
         this.dataFilePath = new File(dataFileBashPath + 
DATA_FILE_SUFFIX).toPath();
+        this.indexFilePath = new File(dataFileBashPath + 
INDEX_FILE_SUFFIX).toPath();
+        this.dataIndex =
+                new HsFileDataIndexImpl(
+                        isBroadcastOnly ? 1 : numSubpartitions,
+                        indexFilePath,
+                        
hybridShuffleConfiguration.getSpilledIndexSegmentSize(),
+                        
hybridShuffleConfiguration.getNumRetainedInMemoryRegionsMax());
         this.hybridShuffleConfiguration = hybridShuffleConfiguration;
         this.isBroadcastOnly = isBroadcastOnly;
         this.fileDataManager =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
index f3df0b2ab5c..fdc252ad02b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java
@@ -38,6 +38,10 @@ public class HybridShuffleConfiguration {
 
     private static final long DEFAULT_BUFFER_POLL_SIZE_CHECK_INTERVAL_MS = 
1000;
 
+    private static final long DEFAULT_NUM_RETAINED_IN_MEMORY_REGIONS_MAX = 
Long.MAX_VALUE;
+
+    private static final int DEFAULT_SPILLED_INDEX_SEGMENT_SIZE = 256;
+
     private static final SpillingStrategyType DEFAULT_SPILLING_STRATEGY_NAME =
             SpillingStrategyType.FULL;
 
@@ -49,6 +53,12 @@ public class HybridShuffleConfiguration {
 
     private final SpillingStrategyType spillingStrategyType;
 
+    private final long numRetainedInMemoryRegionsMax;
+
+    private final int spilledIndexSegmentSize;
+
+    private final long bufferPoolSizeCheckIntervalMs;
+
     // ----------------------------------------
     //        Selective Spilling Strategy
     // ----------------------------------------
@@ -65,8 +75,6 @@ public class HybridShuffleConfiguration {
 
     private final float fullStrategyReleaseBufferRatio;
 
-    private final long bufferPoolSizeCheckIntervalMs;
-
     private HybridShuffleConfiguration(
             int maxBuffersReadAhead,
             Duration bufferRequestTimeout,
@@ -77,7 +85,9 @@ public class HybridShuffleConfiguration {
             float fullStrategyReleaseThreshold,
             float fullStrategyReleaseBufferRatio,
             SpillingStrategyType spillingStrategyType,
-            long bufferPoolSizeCheckIntervalMs) {
+            long bufferPoolSizeCheckIntervalMs,
+            long numRetainedInMemoryRegionsMax,
+            int spilledIndexSegmentSize) {
         this.maxBuffersReadAhead = maxBuffersReadAhead;
         this.bufferRequestTimeout = bufferRequestTimeout;
         this.maxRequestedBuffers = maxRequestedBuffers;
@@ -89,6 +99,8 @@ public class HybridShuffleConfiguration {
         this.fullStrategyReleaseBufferRatio = fullStrategyReleaseBufferRatio;
         this.spillingStrategyType = spillingStrategyType;
         this.bufferPoolSizeCheckIntervalMs = bufferPoolSizeCheckIntervalMs;
+        this.numRetainedInMemoryRegionsMax = numRetainedInMemoryRegionsMax;
+        this.spilledIndexSegmentSize = spilledIndexSegmentSize;
     }
 
     public static Builder builder(int numSubpartitions, int 
numBuffersPerRequest) {
@@ -159,6 +171,16 @@ public class HybridShuffleConfiguration {
         return bufferPoolSizeCheckIntervalMs;
     }
 
+    /** Segment size of hybrid spilled file data index. */
+    public int getSpilledIndexSegmentSize() {
+        return spilledIndexSegmentSize;
+    }
+
+    /** Max number of hybrid retained regions in memory. */
+    public long getNumRetainedInMemoryRegionsMax() {
+        return numRetainedInMemoryRegionsMax;
+    }
+
     /** Type of {@link HsSpillingStrategy}. */
     public enum SpillingStrategyType {
         FULL,
@@ -187,6 +209,10 @@ public class HybridShuffleConfiguration {
 
         private SpillingStrategyType spillingStrategyType = 
DEFAULT_SPILLING_STRATEGY_NAME;
 
+        private long numRetainedInMemoryRegionsMax = 
DEFAULT_NUM_RETAINED_IN_MEMORY_REGIONS_MAX;
+
+        private int spilledIndexSegmentSize = 
DEFAULT_SPILLED_INDEX_SEGMENT_SIZE;
+
         private final int numSubpartitions;
 
         private final int numBuffersPerRequest;
@@ -244,6 +270,16 @@ public class HybridShuffleConfiguration {
             return this;
         }
 
+        public Builder setNumRetainedInMemoryRegionsMax(long 
numRetainedInMemoryRegionsMax) {
+            this.numRetainedInMemoryRegionsMax = numRetainedInMemoryRegionsMax;
+            return this;
+        }
+
+        public Builder setSpilledIndexSegmentSize(int spilledIndexSegmentSize) 
{
+            this.spilledIndexSegmentSize = spilledIndexSegmentSize;
+            return this;
+        }
+
         public HybridShuffleConfiguration build() {
             return new HybridShuffleConfiguration(
                     maxBuffersReadAhead,
@@ -255,7 +291,9 @@ public class HybridShuffleConfiguration {
                     fullStrategyReleaseThreshold,
                     fullStrategyReleaseBufferRatio,
                     spillingStrategyType,
-                    bufferPoolSizeCheckIntervalMs);
+                    bufferPoolSizeCheckIntervalMs,
+                    numRetainedInMemoryRegionsMax,
+                    spilledIndexSegmentSize);
         }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index f18256649cc..2bc9effdfa0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -99,6 +99,10 @@ public class NettyShuffleEnvironmentConfiguration {
 
     private final int maxOverdraftBuffersPerGate;
 
+    private final int hybridShuffleSpilledIndexSegmentSize;
+
+    private final long hybridShuffleNumRetainedInMemoryRegionsMax;
+
     public NettyShuffleEnvironmentConfiguration(
             int numNetworkBuffers,
             int networkBufferSize,
@@ -120,7 +124,9 @@ public class NettyShuffleEnvironmentConfiguration {
             BufferDebloatConfiguration debloatConfiguration,
             int maxNumberOfConnections,
             boolean connectionReuseEnabled,
-            int maxOverdraftBuffersPerGate) {
+            int maxOverdraftBuffersPerGate,
+            int hybridShuffleSpilledIndexSegmentSize,
+            long hybridShuffleNumRetainedInMemoryRegionsMax) {
 
         this.numNetworkBuffers = numNetworkBuffers;
         this.networkBufferSize = networkBufferSize;
@@ -143,6 +149,9 @@ public class NettyShuffleEnvironmentConfiguration {
         this.maxNumberOfConnections = maxNumberOfConnections;
         this.connectionReuseEnabled = connectionReuseEnabled;
         this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate;
+        this.hybridShuffleSpilledIndexSegmentSize = 
hybridShuffleSpilledIndexSegmentSize;
+        this.hybridShuffleNumRetainedInMemoryRegionsMax =
+                hybridShuffleNumRetainedInMemoryRegionsMax;
     }
 
     // ------------------------------------------------------------------------
@@ -235,6 +244,14 @@ public class NettyShuffleEnvironmentConfiguration {
         return maxOverdraftBuffersPerGate;
     }
 
+    public long getHybridShuffleNumRetainedInMemoryRegionsMax() {
+        return hybridShuffleNumRetainedInMemoryRegionsMax;
+    }
+
+    public int getHybridShuffleSpilledIndexSegmentSize() {
+        return hybridShuffleSpilledIndexSegmentSize;
+    }
+
     // ------------------------------------------------------------------------
 
     /**
@@ -333,6 +350,15 @@ public class NettyShuffleEnvironmentConfiguration {
                 configuration.get(
                         
NettyShuffleEnvironmentOptions.TCP_CONNECTION_REUSE_ACROSS_JOBS_ENABLED);
 
+        int hybridShuffleSpilledIndexSegmentSize =
+                configuration.get(
+                        
NettyShuffleEnvironmentOptions.HYBRID_SHUFFLE_SPILLED_INDEX_SEGMENT_SIZE);
+
+        long hybridShuffleNumRetainedInMemoryRegionsMax =
+                configuration.get(
+                        NettyShuffleEnvironmentOptions
+                                
.HYBRID_SHUFFLE_NUM_RETAINED_IN_MEMORY_REGIONS_MAX);
+
         return new NettyShuffleEnvironmentConfiguration(
                 numberOfNetworkBuffers,
                 pageSize,
@@ -354,7 +380,9 @@ public class NettyShuffleEnvironmentConfiguration {
                 BufferDebloatConfiguration.fromConfiguration(configuration),
                 maxNumConnections,
                 connectionReuseEnabled,
-                maxOverdraftBuffersPerGate);
+                maxOverdraftBuffersPerGate,
+                hybridShuffleSpilledIndexSegmentSize,
+                hybridShuffleNumRetainedInMemoryRegionsMax);
     }
 
     /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index 1f9ed2e21b4..2862ab0c4c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -88,6 +88,10 @@ public class NettyShuffleEnvironmentBuilder {
 
     private int maxNumberOfConnections = 1;
 
+    private long hybridShuffleNumRetainedInMemoryRegionsMax = Long.MAX_VALUE;
+
+    private int hybridShuffleSpilledIndexSegmentSize = 256;
+
     public NettyShuffleEnvironmentBuilder setTaskManagerLocation(ResourceID 
taskManagerLocation) {
         this.taskManagerLocation = taskManagerLocation;
         return this;
@@ -204,6 +208,19 @@ public class NettyShuffleEnvironmentBuilder {
         return this;
     }
 
+    public NettyShuffleEnvironmentBuilder 
setHybridShuffleNumRetainedInMemoryRegionsMax(
+            long hybridShuffleNumRetainedInMemoryRegionsMax) {
+        this.hybridShuffleNumRetainedInMemoryRegionsMax =
+                hybridShuffleNumRetainedInMemoryRegionsMax;
+        return this;
+    }
+
+    public NettyShuffleEnvironmentBuilder 
setHybridShuffleSpilledIndexSegmentSize(
+            int hybridShuffleSpilledIndexSegmentSize) {
+        this.hybridShuffleSpilledIndexSegmentSize = 
hybridShuffleSpilledIndexSegmentSize;
+        return this;
+    }
+
     public NettyShuffleEnvironment build() {
         return NettyShuffleServiceFactory.createNettyShuffleEnvironment(
                 new NettyShuffleEnvironmentConfiguration(
@@ -227,7 +244,9 @@ public class NettyShuffleEnvironmentBuilder {
                         debloatConfiguration,
                         maxNumberOfConnections,
                         connectionReuseEnabled,
-                        maxOverdraftBuffersPerGate),
+                        maxOverdraftBuffersPerGate,
+                        hybridShuffleSpilledIndexSegmentSize,
+                        hybridShuffleNumRetainedInMemoryRegionsMax),
                 taskManagerLocation,
                 new TaskEventDispatcher(),
                 resultPartitionManager,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 7ad6cd08b84..0a635136638 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -85,6 +85,10 @@ public class ResultPartitionBuilder {
 
     private int maxOverdraftBuffersPerGate = 5;
 
+    private int hybridShuffleSpilledIndexSegmentSize = 256;
+
+    private long hybridShuffleNumRetainedInMemoryRegionsMax = Long.MAX_VALUE;
+
     public ResultPartitionBuilder setResultPartitionIndex(int partitionIndex) {
         this.partitionIndex = partitionIndex;
         return this;
@@ -218,6 +222,19 @@ public class ResultPartitionBuilder {
         return this;
     }
 
+    public ResultPartitionBuilder 
setHybridShuffleNumRetainedInMemoryRegionsMax(
+            long hybridShuffleNumRetainedInMemoryRegionsMax) {
+        this.hybridShuffleNumRetainedInMemoryRegionsMax =
+                hybridShuffleNumRetainedInMemoryRegionsMax;
+        return this;
+    }
+
+    public ResultPartitionBuilder setHybridShuffleSpilledIndexSegmentSize(
+            int hybridShuffleSpilledIndexSegmentSize) {
+        this.hybridShuffleSpilledIndexSegmentSize = 
hybridShuffleSpilledIndexSegmentSize;
+        return this;
+    }
+
     public ResultPartition build() {
         ResultPartitionFactory resultPartitionFactory =
                 new ResultPartitionFactory(
@@ -236,7 +253,9 @@ public class ResultPartitionBuilder {
                         sortShuffleMinBuffers,
                         sortShuffleMinParallelism,
                         sslEnabled,
-                        maxOverdraftBuffersPerGate);
+                        maxOverdraftBuffersPerGate,
+                        hybridShuffleSpilledIndexSegmentSize,
+                        hybridShuffleNumRetainedInMemoryRegionsMax);
 
         SupplierWithException<BufferPool, IOException> factory =
                 bufferPoolFactory.orElseGet(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 79a0aa89463..07d4a97f28b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -174,7 +174,9 @@ class ResultPartitionFactoryTest {
                         10,
                         sortShuffleMinParallelism,
                         false,
-                        0);
+                        0,
+                        256,
+                        Long.MAX_VALUE);
 
         final ResultPartitionDeploymentDescriptor descriptor =
                 new ResultPartitionDeploymentDescriptor(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java
index 0f05cfdd9a6..7b54105ec0d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java
@@ -25,7 +25,9 @@ import org.apache.flink.util.TestLoggerExtension;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -42,8 +44,10 @@ class HsFileDataIndexImplTest {
     private HsFileDataIndex hsDataIndex;
 
     @BeforeEach
-    void before() {
-        hsDataIndex = new HsFileDataIndexImpl(NUM_SUBPARTITIONS);
+    void before(@TempDir Path tempDir) throws Exception {
+        hsDataIndex =
+                new HsFileDataIndexImpl(
+                        NUM_SUBPARTITIONS, tempDir.resolve(".index"), 256, 
Long.MAX_VALUE);
     }
 
     /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
index 401f95ff823..f2b6591884c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
@@ -96,7 +96,8 @@ class HsFileDataManagerTest {
                 new HsFileDataManager(
                         bufferPool,
                         ioExecutor,
-                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        new HsFileDataIndexImpl(
+                                NUM_SUBPARTITIONS, tempDir.resolve(".index"), 
256, Long.MAX_VALUE),
                         dataFilePath,
                         factory,
                         HybridShuffleConfiguration.builder(
@@ -218,7 +219,7 @@ class HsFileDataManagerTest {
     }
 
     @Test
-    void testRunRequestBufferTimeout() throws Exception {
+    void testRunRequestBufferTimeout(@TempDir Path tempDir) throws Exception {
         Duration bufferRequestTimeout = Duration.ofSeconds(3);
 
         // request all buffer first.
@@ -229,7 +230,8 @@ class HsFileDataManagerTest {
                 new HsFileDataManager(
                         bufferPool,
                         ioExecutor,
-                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        new HsFileDataIndexImpl(
+                                NUM_SUBPARTITIONS, tempDir.resolve(".index"), 
256, Long.MAX_VALUE),
                         dataFilePath,
                         factory,
                         HybridShuffleConfiguration.builder(
@@ -351,7 +353,7 @@ class HsFileDataManagerTest {
      * release subpartition reader and subpartition reader fail should not be 
inside lock.
      */
     @Test
-    void testConsumeWhileReleaseNoDeadlock() throws Exception {
+    void testConsumeWhileReleaseNoDeadlock(@TempDir Path tempDir) throws 
Exception {
         CompletableFuture<Void> consumerStart = new CompletableFuture<>();
         CompletableFuture<Void> readerFail = new CompletableFuture<>();
         HsSubpartitionConsumer subpartitionView =
@@ -363,7 +365,8 @@ class HsFileDataManagerTest {
                         DEFAULT,
                         dataFileChannel,
                         subpartitionView,
-                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        new HsFileDataIndexImpl(
+                                NUM_SUBPARTITIONS, tempDir.resolve(".index"), 
256, Long.MAX_VALUE),
                         5,
                         fileDataManager::releaseSubpartitionReader,
                         BufferReaderWriterUtil.allocatedHeaderBuffer()) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
index e3c3663af06..10ad07b69f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
@@ -55,9 +55,12 @@ class HsMemoryDataManagerTest {
 
     private Path dataFilePath;
 
+    private Path indexFilePath;
+
     @BeforeEach
     void before(@TempDir Path tempDir) {
         this.dataFilePath = tempDir.resolve(".data");
+        this.indexFilePath = tempDir.resolve(".index");
     }
 
     @Test
@@ -242,7 +245,9 @@ class HsMemoryDataManagerTest {
 
     private HsMemoryDataManager createMemoryDataManager(HsSpillingStrategy 
spillStrategy)
             throws Exception {
-        return createMemoryDataManager(spillStrategy, new 
HsFileDataIndexImpl(NUM_SUBPARTITIONS));
+        return createMemoryDataManager(
+                spillStrategy,
+                new HsFileDataIndexImpl(NUM_SUBPARTITIONS, indexFilePath, 256, 
Long.MAX_VALUE));
     }
 
     private HsMemoryDataManager createMemoryDataManager(
@@ -255,7 +260,9 @@ class HsMemoryDataManagerTest {
     private HsMemoryDataManager createMemoryDataManager(
             HsSpillingStrategy spillingStrategy, BufferPool bufferPool) throws 
Exception {
         return createMemoryDataManager(
-                bufferPool, spillingStrategy, new 
HsFileDataIndexImpl(NUM_SUBPARTITIONS));
+                bufferPool,
+                spillingStrategy,
+                new HsFileDataIndexImpl(NUM_SUBPARTITIONS, indexFilePath, 256, 
Long.MAX_VALUE));
     }
 
     private HsMemoryDataManager createMemoryDataManager(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
index b8f5d206a78..2bef058599f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
@@ -79,14 +79,17 @@ class HsSubpartitionFileReaderImplTest {
 
     private FileChannel dataFileChannel;
 
+    private Path indexFilePath;
+
     private long currentFileOffset;
 
     @BeforeEach
     void before(@TempDir Path tempPath) throws Exception {
         random = new Random();
         Path dataFilePath = 
Files.createFile(tempPath.resolve(UUID.randomUUID().toString()));
+        indexFilePath = tempPath.resolve(UUID.randomUUID().toString());
         dataFileChannel = openFileChannel(dataFilePath);
-        diskIndex = new HsFileDataIndexImpl(1);
+        diskIndex = createDataIndex(1, indexFilePath);
         subpartitionOperation = new 
TestingSubpartitionConsumerInternalOperation();
         currentFileOffset = 0L;
     }
@@ -97,8 +100,8 @@ class HsSubpartitionFileReaderImplTest {
     }
 
     @Test
-    void testReadBuffer() throws Exception {
-        diskIndex = new HsFileDataIndexImpl(2);
+    void testReadBuffer(@TempDir Path tmpPath) throws Exception {
+        diskIndex = createDataIndex(2, tmpPath.resolve(".index"));
         TestingSubpartitionConsumerInternalOperation viewNotifier1 =
                 new TestingSubpartitionConsumerInternalOperation();
         TestingSubpartitionConsumerInternalOperation viewNotifier2 =
@@ -135,13 +138,14 @@ class HsSubpartitionFileReaderImplTest {
 
     @ParameterizedTest
     @ValueSource(strings = {"LZ4", "LZO", "ZSTD"})
-    void testReadBufferCompressed(String compressionFactoryName) throws 
Exception {
+    void testReadBufferCompressed(String compressionFactoryName, @TempDir Path 
tmpPath)
+            throws Exception {
         BufferCompressor bufferCompressor =
                 new BufferCompressor(bufferSize, compressionFactoryName);
         BufferDecompressor bufferDecompressor =
                 new BufferDecompressor(bufferSize, compressionFactoryName);
 
-        diskIndex = new HsFileDataIndexImpl(1);
+        diskIndex = createDataIndex(1, tmpPath.resolve(".index"));
         TestingSubpartitionConsumerInternalOperation viewNotifier =
                 new TestingSubpartitionConsumerInternalOperation();
         HsSubpartitionFileReaderImpl fileReader1 = 
createSubpartitionFileReader(0, viewNotifier);
@@ -360,8 +364,8 @@ class HsSubpartitionFileReaderImplTest {
     }
 
     @Test
-    void testCompareTo() throws Exception {
-        diskIndex = new HsFileDataIndexImpl(2);
+    void testCompareTo(@TempDir Path tempPath) throws Exception {
+        diskIndex = createDataIndex(2, tempPath.resolve(".index"));
         TestingSubpartitionConsumerInternalOperation viewNotifier1 =
                 new TestingSubpartitionConsumerInternalOperation();
         TestingSubpartitionConsumerInternalOperation viewNotifier2 =
@@ -602,6 +606,10 @@ class HsSubpartitionFileReaderImplTest {
                 BufferReaderWriterUtil.allocatedHeaderBuffer());
     }
 
+    private HsFileDataIndexImpl createDataIndex(int numSubpartitions, Path 
indexFilePath) {
+        return new HsFileDataIndexImpl(numSubpartitions, indexFilePath, 256, 
Long.MAX_VALUE);
+    }
+
     private static FileChannel openFileChannel(Path path) throws IOException {
         return FileChannel.open(path, StandardOpenOption.READ, 
StandardOpenOption.WRITE);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
index 11ea5c3766b..176d02e649b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
@@ -113,7 +113,8 @@ class HsSubpartitionViewTest {
                         bufferSize,
                         bufferPool,
                         spillingStrategy,
-                        new HsFileDataIndexImpl(1),
+                        new HsFileDataIndexImpl(
+                                1, dataFilePath.resolve(".index"), 256, 
Long.MAX_VALUE),
                         dataFilePath.resolve(".data"),
                         null,
                         0);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndex.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndex.java
index 115f1f5c66b..841ec4a5e80 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndex.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndex.java
@@ -60,6 +60,11 @@ public class TestingFileDataIndex implements HsFileDataIndex 
{
         markBufferReadableConsumer.accept(subpartitionId, bufferIndex);
     }
 
+    @Override
+    public void close() {
+        // do nothing.
+    }
+
     public static Builder builder() {
         return new Builder();
     }


Reply via email to