This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8e50e24797fbb154cdf7c484775f0fafa94cf34c Author: Weijie Guo <[email protected]> AuthorDate: Thu Jan 5 15:34:09 2023 +0800 [FLINK-30332][network] HsFileDataIndex supports caching index entry and introduce config option to configure cache size. This closes #21603 --- .../generated/all_taskmanager_network_section.html | 12 +++++ .../netty_shuffle_environment_configuration.html | 12 +++++ .../NettyShuffleEnvironmentOptions.java | 18 ++++++++ .../io/network/NettyShuffleServiceFactory.java | 4 +- .../network/partition/ResultPartitionFactory.java | 35 +++++++++----- .../network/partition/hybrid/HsFileDataIndex.java | 3 ++ .../partition/hybrid/HsFileDataIndexImpl.java | 53 +++++++++++++--------- .../partition/hybrid/HsFileDataManager.java | 7 +-- .../partition/hybrid/HsResultPartition.java | 12 ++++- .../hybrid/HybridShuffleConfiguration.java | 46 +++++++++++++++++-- .../NettyShuffleEnvironmentConfiguration.java | 32 ++++++++++++- .../io/network/NettyShuffleEnvironmentBuilder.java | 21 ++++++++- .../network/partition/ResultPartitionBuilder.java | 21 ++++++++- .../partition/ResultPartitionFactoryTest.java | 4 +- .../partition/hybrid/HsFileDataIndexImplTest.java | 8 +++- .../partition/hybrid/HsFileDataManagerTest.java | 13 ++++-- .../partition/hybrid/HsMemoryDataManagerTest.java | 11 ++++- .../hybrid/HsSubpartitionFileReaderImplTest.java | 22 ++++++--- .../partition/hybrid/HsSubpartitionViewTest.java | 3 +- .../partition/hybrid/TestingFileDataIndex.java | 5 ++ 20 files changed, 279 insertions(+), 63 deletions(-) diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html index 34deb37d450..d4d5b2bed0a 100644 --- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html +++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html @@ -32,6 +32,18 @@ <td>Boolean</td> <td>Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.</td> </tr> + <tr> + <td><h5>taskmanager.network.hybrid-shuffle.num-retained-in-memory-regions-max</h5></td> + <td style="word-wrap: break-word;">1048576</td> + <td>Long</td> + <td>Controls the max number of hybrid retained regions in memory.</td> + </tr> + <tr> + <td><h5>taskmanager.network.hybrid-shuffle.spill-index-segment-size</h5></td> + <td style="word-wrap: break-word;">1024</td> + <td>Integer</td> + <td>Controls the segment size(in bytes) of hybrid spilled file data index.</td> + </tr> <tr> <td><h5>taskmanager.network.max-num-tcp-connections</h5></td> <td style="word-wrap: break-word;">1</td> diff --git a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html index 4ed81c078e3..adcdef70d27 100644 --- a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html +++ b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html @@ -50,6 +50,18 @@ <td>Boolean</td> <td>Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.</td> </tr> + <tr> + <td><h5>taskmanager.network.hybrid-shuffle.num-retained-in-memory-regions-max</h5></td> + <td style="word-wrap: break-word;">1048576</td> + <td>Long</td> + <td>Controls the max number of hybrid retained regions in memory.</td> + </tr> + <tr> + <td><h5>taskmanager.network.hybrid-shuffle.spill-index-segment-size</h5></td> + <td style="word-wrap: break-word;">1024</td> + <td>Integer</td> + <td>Controls the segment size(in bytes) of hybrid spilled file data index.</td> + </tr> <tr> <td><h5>taskmanager.network.max-num-tcp-connections</h5></td> <td style="word-wrap: break-word;">1</td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java index 802ee049c34..5e943363d52 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java @@ -270,6 +270,24 @@ public class NettyShuffleEnvironmentOptions { // this raw value must be changed correspondingly "taskmanager.memory.framework.off-heap.batch-shuffle.size")); + /** Segment size of hybrid spilled file data index. */ + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + public static final ConfigOption<Integer> HYBRID_SHUFFLE_SPILLED_INDEX_SEGMENT_SIZE = + key("taskmanager.network.hybrid-shuffle.spill-index-segment-size") + .intType() + .defaultValue(1024) + .withDescription( + "Controls the segment size(in bytes) of hybrid spilled file data index."); + + /** Max number of hybrid retained regions in memory. */ + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + public static final ConfigOption<Long> HYBRID_SHUFFLE_NUM_RETAINED_IN_MEMORY_REGIONS_MAX = + key("taskmanager.network.hybrid-shuffle.num-retained-in-memory-regions-max") + .longType() + .defaultValue(1024 * 1024L) + .withDescription( + "Controls the max number of hybrid retained regions in memory."); + /** Number of max buffers can be used for each output subpartition. */ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) public static final ConfigOption<Integer> NETWORK_MAX_BUFFERS_PER_CHANNEL = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java index bd6847f29e4..e2f2166397b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java @@ -210,7 +210,9 @@ public class NettyShuffleServiceFactory config.sortShuffleMinBuffers(), config.sortShuffleMinParallelism(), config.isSSLEnabled(), - config.getMaxOverdraftBuffersPerGate()); + config.getMaxOverdraftBuffersPerGate(), + config.getHybridShuffleSpilledIndexSegmentSize(), + config.getHybridShuffleNumRetainedInMemoryRegionsMax()); SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java index 7858f4c6709..968fbcc80dc 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java @@ -75,6 +75,10 @@ public class ResultPartitionFactory { private final int sortShuffleMinParallelism; + private final int hybridShuffleSpilledIndexSegmentSize; + + private final long hybridShuffleNumRetainedInMemoryRegionsMax; + private final boolean sslEnabled; private final int maxOverdraftBuffersPerGate; @@ -95,7 +99,9 @@ public class ResultPartitionFactory { int sortShuffleMinBuffers, int sortShuffleMinParallelism, boolean sslEnabled, - int maxOverdraftBuffersPerGate) { + int maxOverdraftBuffersPerGate, + int hybridShuffleSpilledIndexSegmentSize, + long hybridShuffleNumRetainedInMemoryRegionsMax) { this.partitionManager = partitionManager; this.channelManager = channelManager; @@ -113,6 +119,9 @@ public class ResultPartitionFactory { this.sortShuffleMinParallelism = sortShuffleMinParallelism; this.sslEnabled = sslEnabled; this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate; + this.hybridShuffleSpilledIndexSegmentSize = hybridShuffleSpilledIndexSegmentSize; + this.hybridShuffleNumRetainedInMemoryRegionsMax = + hybridShuffleNumRetainedInMemoryRegionsMax; } public ResultPartition create( @@ -240,16 +249,7 @@ public class ResultPartitionFactory { partitionManager, channelManager.createChannel().getPath(), networkBufferSize, - HybridShuffleConfiguration.builder( - numberOfSubpartitions, - batchShuffleReadBufferPool.getNumBuffersPerRequest()) - .setSpillingStrategyType( - type == ResultPartitionType.HYBRID_FULL - ? HybridShuffleConfiguration - .SpillingStrategyType.FULL - : HybridShuffleConfiguration - .SpillingStrategyType.SELECTIVE) - .build(), + getHybridShuffleConfiguration(numberOfSubpartitions, type), bufferCompressor, isBroadcast, bufferPoolFactory); @@ -262,6 +262,19 @@ public class ResultPartitionFactory { return partition; } + private HybridShuffleConfiguration getHybridShuffleConfiguration( + int numberOfSubpartitions, ResultPartitionType resultPartitionType) { + return HybridShuffleConfiguration.builder( + numberOfSubpartitions, batchShuffleReadBufferPool.getNumBuffersPerRequest()) + .setSpillingStrategyType( + resultPartitionType == ResultPartitionType.HYBRID_FULL + ? HybridShuffleConfiguration.SpillingStrategyType.FULL + : HybridShuffleConfiguration.SpillingStrategyType.SELECTIVE) + .setSpilledIndexSegmentSize(hybridShuffleSpilledIndexSegmentSize) + .setNumRetainedInMemoryRegionsMax(hybridShuffleNumRetainedInMemoryRegionsMax) + .build(); + } + private static void initializeBoundedBlockingPartitions( ResultSubpartition[] subpartitions, BoundedBlockingResultPartition parent, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndex.java index b9bc5c01de4..ef1bd7f1f45 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndex.java @@ -64,6 +64,9 @@ public interface HsFileDataIndex { */ void markBufferReleased(int subpartitionId, int bufferIndex); + /** Close this file data index. */ + void close(); + /** * Represents a series of physically continuous buffers in the file, which are readable, from * the same subpartition, and has sequential buffer index. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java index 5249b644b5c..c832d715329 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java @@ -18,9 +18,13 @@ package org.apache.flink.runtime.io.network.partition.hybrid; +import org.apache.flink.util.ExceptionUtils; + import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import java.io.IOException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -29,7 +33,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.TreeMap; import static org.apache.flink.util.Preconditions.checkArgument; @@ -38,15 +41,35 @@ import static org.apache.flink.util.Preconditions.checkArgument; public class HsFileDataIndexImpl implements HsFileDataIndex { @GuardedBy("lock") - private final List<TreeMap<Integer, InternalRegion>> - subpartitionFirstBufferIndexInternalRegions; + private final HsFileDataIndexCache indexCache; + /** + * {@link HsFileDataIndexCache} is not thread-safe, any access to it needs to hold this lock. + */ private final Object lock = new Object(); - public HsFileDataIndexImpl(int numSubpartitions) { - this.subpartitionFirstBufferIndexInternalRegions = new ArrayList<>(numSubpartitions); - for (int subpartitionId = 0; subpartitionId < numSubpartitions; ++subpartitionId) { - subpartitionFirstBufferIndexInternalRegions.add(new TreeMap<>()); + public HsFileDataIndexImpl( + int numSubpartitions, + Path indexFilePath, + int spilledIndexSegmentSize, + long numRetainedInMemoryRegionsMax) { + this.indexCache = + new HsFileDataIndexCache( + numSubpartitions, + indexFilePath, + numRetainedInMemoryRegionsMax, + new HsFileDataIndexSpilledRegionManagerImpl.Factory( + spilledIndexSegmentSize, numRetainedInMemoryRegionsMax)); + } + + @Override + public void close() { + synchronized (lock) { + try { + indexCache.close(); + } catch (IOException e) { + ExceptionUtils.rethrow(e); + } } } @@ -67,14 +90,7 @@ public class HsFileDataIndexImpl implements HsFileDataIndex { final Map<Integer, List<InternalRegion>> subpartitionInternalRegions = convertToInternalRegions(spilledBuffers); synchronized (lock) { - subpartitionInternalRegions.forEach( - (subpartition, internalRegions) -> { - TreeMap<Integer, InternalRegion> treeMap = - subpartitionFirstBufferIndexInternalRegions.get(subpartition); - for (InternalRegion internalRegion : internalRegions) { - treeMap.put(internalRegion.firstBufferIndex, internalRegion); - } - }); + subpartitionInternalRegions.forEach(indexCache::put); } } @@ -88,12 +104,7 @@ public class HsFileDataIndexImpl implements HsFileDataIndex { @GuardedBy("lock") private Optional<InternalRegion> getInternalRegion(int subpartitionId, int bufferIndex) { - return Optional.ofNullable( - subpartitionFirstBufferIndexInternalRegions - .get(subpartitionId) - .floorEntry(bufferIndex)) - .map(Map.Entry::getValue) - .filter(internalRegion -> internalRegion.containBuffer(bufferIndex)); + return indexCache.get(subpartitionId, bufferIndex); } private static Map<Integer, List<InternalRegion>> convertToInternalRegions( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java index a54456cabcc..dc898dacfb8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java @@ -179,7 +179,8 @@ public class HsFileDataManager implements Runnable, BufferRecycler { } } - public void deleteShuffleFile() { + public void closeDataIndexAndDeleteShuffleFile() { + dataIndex.close(); IOUtils.deleteFileQuietly(dataFilePath); } @@ -207,8 +208,8 @@ public class HsFileDataManager implements Runnable, BufferRecycler { failSubpartitionReaders( pendingReaders, new IllegalStateException("Result partition has been already released.")); - // delete the shuffle file only when no reader is reading now. - releaseFuture.thenRun(this::deleteShuffleFile); + // close data index and delete shuffle file only when no reader is reading now. + releaseFuture.thenRun(this::closeDataIndexAndDeleteShuffleFile); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java index 33f630fd79c..56cdef6e006 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java @@ -60,6 +60,8 @@ import static org.apache.flink.util.Preconditions.checkState; public class HsResultPartition extends ResultPartition { public static final String DATA_FILE_SUFFIX = ".hybrid.data"; + public static final String INDEX_FILE_SUFFIX = ".hybrid.index"; + public static final int BROADCAST_CHANNEL = 0; private final HsFileDataIndex dataIndex; @@ -68,6 +70,8 @@ public class HsResultPartition extends ResultPartition { private final Path dataFilePath; + private final Path indexFilePath; + private final int networkBufferSize; private final HybridShuffleConfiguration hybridShuffleConfiguration; @@ -109,8 +113,14 @@ public class HsResultPartition extends ResultPartition { bufferCompressor, bufferPoolFactory); this.networkBufferSize = networkBufferSize; - this.dataIndex = new HsFileDataIndexImpl(isBroadcastOnly ? 1 : numSubpartitions); this.dataFilePath = new File(dataFileBashPath + DATA_FILE_SUFFIX).toPath(); + this.indexFilePath = new File(dataFileBashPath + INDEX_FILE_SUFFIX).toPath(); + this.dataIndex = + new HsFileDataIndexImpl( + isBroadcastOnly ? 1 : numSubpartitions, + indexFilePath, + hybridShuffleConfiguration.getSpilledIndexSegmentSize(), + hybridShuffleConfiguration.getNumRetainedInMemoryRegionsMax()); this.hybridShuffleConfiguration = hybridShuffleConfiguration; this.isBroadcastOnly = isBroadcastOnly; this.fileDataManager = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java index f3df0b2ab5c..fdc252ad02b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java @@ -38,6 +38,10 @@ public class HybridShuffleConfiguration { private static final long DEFAULT_BUFFER_POLL_SIZE_CHECK_INTERVAL_MS = 1000; + private static final long DEFAULT_NUM_RETAINED_IN_MEMORY_REGIONS_MAX = Long.MAX_VALUE; + + private static final int DEFAULT_SPILLED_INDEX_SEGMENT_SIZE = 256; + private static final SpillingStrategyType DEFAULT_SPILLING_STRATEGY_NAME = SpillingStrategyType.FULL; @@ -49,6 +53,12 @@ public class HybridShuffleConfiguration { private final SpillingStrategyType spillingStrategyType; + private final long numRetainedInMemoryRegionsMax; + + private final int spilledIndexSegmentSize; + + private final long bufferPoolSizeCheckIntervalMs; + // ---------------------------------------- // Selective Spilling Strategy // ---------------------------------------- @@ -65,8 +75,6 @@ public class HybridShuffleConfiguration { private final float fullStrategyReleaseBufferRatio; - private final long bufferPoolSizeCheckIntervalMs; - private HybridShuffleConfiguration( int maxBuffersReadAhead, Duration bufferRequestTimeout, @@ -77,7 +85,9 @@ public class HybridShuffleConfiguration { float fullStrategyReleaseThreshold, float fullStrategyReleaseBufferRatio, SpillingStrategyType spillingStrategyType, - long bufferPoolSizeCheckIntervalMs) { + long bufferPoolSizeCheckIntervalMs, + long numRetainedInMemoryRegionsMax, + int spilledIndexSegmentSize) { this.maxBuffersReadAhead = maxBuffersReadAhead; this.bufferRequestTimeout = bufferRequestTimeout; this.maxRequestedBuffers = maxRequestedBuffers; @@ -89,6 +99,8 @@ public class HybridShuffleConfiguration { this.fullStrategyReleaseBufferRatio = fullStrategyReleaseBufferRatio; this.spillingStrategyType = spillingStrategyType; this.bufferPoolSizeCheckIntervalMs = bufferPoolSizeCheckIntervalMs; + this.numRetainedInMemoryRegionsMax = numRetainedInMemoryRegionsMax; + this.spilledIndexSegmentSize = spilledIndexSegmentSize; } public static Builder builder(int numSubpartitions, int numBuffersPerRequest) { @@ -159,6 +171,16 @@ public class HybridShuffleConfiguration { return bufferPoolSizeCheckIntervalMs; } + /** Segment size of hybrid spilled file data index. */ + public int getSpilledIndexSegmentSize() { + return spilledIndexSegmentSize; + } + + /** Max number of hybrid retained regions in memory. */ + public long getNumRetainedInMemoryRegionsMax() { + return numRetainedInMemoryRegionsMax; + } + /** Type of {@link HsSpillingStrategy}. */ public enum SpillingStrategyType { FULL, @@ -187,6 +209,10 @@ public class HybridShuffleConfiguration { private SpillingStrategyType spillingStrategyType = DEFAULT_SPILLING_STRATEGY_NAME; + private long numRetainedInMemoryRegionsMax = DEFAULT_NUM_RETAINED_IN_MEMORY_REGIONS_MAX; + + private int spilledIndexSegmentSize = DEFAULT_SPILLED_INDEX_SEGMENT_SIZE; + private final int numSubpartitions; private final int numBuffersPerRequest; @@ -244,6 +270,16 @@ public class HybridShuffleConfiguration { return this; } + public Builder setNumRetainedInMemoryRegionsMax(long numRetainedInMemoryRegionsMax) { + this.numRetainedInMemoryRegionsMax = numRetainedInMemoryRegionsMax; + return this; + } + + public Builder setSpilledIndexSegmentSize(int spilledIndexSegmentSize) { + this.spilledIndexSegmentSize = spilledIndexSegmentSize; + return this; + } + public HybridShuffleConfiguration build() { return new HybridShuffleConfiguration( maxBuffersReadAhead, @@ -255,7 +291,9 @@ public class HybridShuffleConfiguration { fullStrategyReleaseThreshold, fullStrategyReleaseBufferRatio, spillingStrategyType, - bufferPoolSizeCheckIntervalMs); + bufferPoolSizeCheckIntervalMs, + numRetainedInMemoryRegionsMax, + spilledIndexSegmentSize); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java index f18256649cc..2bc9effdfa0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java @@ -99,6 +99,10 @@ public class NettyShuffleEnvironmentConfiguration { private final int maxOverdraftBuffersPerGate; + private final int hybridShuffleSpilledIndexSegmentSize; + + private final long hybridShuffleNumRetainedInMemoryRegionsMax; + public NettyShuffleEnvironmentConfiguration( int numNetworkBuffers, int networkBufferSize, @@ -120,7 +124,9 @@ public class NettyShuffleEnvironmentConfiguration { BufferDebloatConfiguration debloatConfiguration, int maxNumberOfConnections, boolean connectionReuseEnabled, - int maxOverdraftBuffersPerGate) { + int maxOverdraftBuffersPerGate, + int hybridShuffleSpilledIndexSegmentSize, + long hybridShuffleNumRetainedInMemoryRegionsMax) { this.numNetworkBuffers = numNetworkBuffers; this.networkBufferSize = networkBufferSize; @@ -143,6 +149,9 @@ public class NettyShuffleEnvironmentConfiguration { this.maxNumberOfConnections = maxNumberOfConnections; this.connectionReuseEnabled = connectionReuseEnabled; this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate; + this.hybridShuffleSpilledIndexSegmentSize = hybridShuffleSpilledIndexSegmentSize; + this.hybridShuffleNumRetainedInMemoryRegionsMax = + hybridShuffleNumRetainedInMemoryRegionsMax; } // ------------------------------------------------------------------------ @@ -235,6 +244,14 @@ public class NettyShuffleEnvironmentConfiguration { return maxOverdraftBuffersPerGate; } + public long getHybridShuffleNumRetainedInMemoryRegionsMax() { + return hybridShuffleNumRetainedInMemoryRegionsMax; + } + + public int getHybridShuffleSpilledIndexSegmentSize() { + return hybridShuffleSpilledIndexSegmentSize; + } + // ------------------------------------------------------------------------ /** @@ -333,6 +350,15 @@ public class NettyShuffleEnvironmentConfiguration { configuration.get( NettyShuffleEnvironmentOptions.TCP_CONNECTION_REUSE_ACROSS_JOBS_ENABLED); + int hybridShuffleSpilledIndexSegmentSize = + configuration.get( + NettyShuffleEnvironmentOptions.HYBRID_SHUFFLE_SPILLED_INDEX_SEGMENT_SIZE); + + long hybridShuffleNumRetainedInMemoryRegionsMax = + configuration.get( + NettyShuffleEnvironmentOptions + .HYBRID_SHUFFLE_NUM_RETAINED_IN_MEMORY_REGIONS_MAX); + return new NettyShuffleEnvironmentConfiguration( numberOfNetworkBuffers, pageSize, @@ -354,7 +380,9 @@ public class NettyShuffleEnvironmentConfiguration { BufferDebloatConfiguration.fromConfiguration(configuration), maxNumConnections, connectionReuseEnabled, - maxOverdraftBuffersPerGate); + maxOverdraftBuffersPerGate, + hybridShuffleSpilledIndexSegmentSize, + hybridShuffleNumRetainedInMemoryRegionsMax); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java index 1f9ed2e21b4..2862ab0c4c5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java @@ -88,6 +88,10 @@ public class NettyShuffleEnvironmentBuilder { private int maxNumberOfConnections = 1; + private long hybridShuffleNumRetainedInMemoryRegionsMax = Long.MAX_VALUE; + + private int hybridShuffleSpilledIndexSegmentSize = 256; + public NettyShuffleEnvironmentBuilder setTaskManagerLocation(ResourceID taskManagerLocation) { this.taskManagerLocation = taskManagerLocation; return this; @@ -204,6 +208,19 @@ public class NettyShuffleEnvironmentBuilder { return this; } + public NettyShuffleEnvironmentBuilder setHybridShuffleNumRetainedInMemoryRegionsMax( + long hybridShuffleNumRetainedInMemoryRegionsMax) { + this.hybridShuffleNumRetainedInMemoryRegionsMax = + hybridShuffleNumRetainedInMemoryRegionsMax; + return this; + } + + public NettyShuffleEnvironmentBuilder setHybridShuffleSpilledIndexSegmentSize( + int hybridShuffleSpilledIndexSegmentSize) { + this.hybridShuffleSpilledIndexSegmentSize = hybridShuffleSpilledIndexSegmentSize; + return this; + } + public NettyShuffleEnvironment build() { return NettyShuffleServiceFactory.createNettyShuffleEnvironment( new NettyShuffleEnvironmentConfiguration( @@ -227,7 +244,9 @@ public class NettyShuffleEnvironmentBuilder { debloatConfiguration, maxNumberOfConnections, connectionReuseEnabled, - maxOverdraftBuffersPerGate), + maxOverdraftBuffersPerGate, + hybridShuffleSpilledIndexSegmentSize, + hybridShuffleNumRetainedInMemoryRegionsMax), taskManagerLocation, new TaskEventDispatcher(), resultPartitionManager, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java index 7ad6cd08b84..0a635136638 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java @@ -85,6 +85,10 @@ public class ResultPartitionBuilder { private int maxOverdraftBuffersPerGate = 5; + private int hybridShuffleSpilledIndexSegmentSize = 256; + + private long hybridShuffleNumRetainedInMemoryRegionsMax = Long.MAX_VALUE; + public ResultPartitionBuilder setResultPartitionIndex(int partitionIndex) { this.partitionIndex = partitionIndex; return this; @@ -218,6 +222,19 @@ public class ResultPartitionBuilder { return this; } + public ResultPartitionBuilder setHybridShuffleNumRetainedInMemoryRegionsMax( + long hybridShuffleNumRetainedInMemoryRegionsMax) { + this.hybridShuffleNumRetainedInMemoryRegionsMax = + hybridShuffleNumRetainedInMemoryRegionsMax; + return this; + } + + public ResultPartitionBuilder setHybridShuffleSpilledIndexSegmentSize( + int hybridShuffleSpilledIndexSegmentSize) { + this.hybridShuffleSpilledIndexSegmentSize = hybridShuffleSpilledIndexSegmentSize; + return this; + } + public ResultPartition build() { ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory( @@ -236,7 +253,9 @@ public class ResultPartitionBuilder { sortShuffleMinBuffers, sortShuffleMinParallelism, sslEnabled, - maxOverdraftBuffersPerGate); + maxOverdraftBuffersPerGate, + hybridShuffleSpilledIndexSegmentSize, + hybridShuffleNumRetainedInMemoryRegionsMax); SupplierWithException<BufferPool, IOException> factory = bufferPoolFactory.orElseGet( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java index 79a0aa89463..07d4a97f28b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java @@ -174,7 +174,9 @@ class ResultPartitionFactoryTest { 10, sortShuffleMinParallelism, false, - 0); + 0, + 256, + Long.MAX_VALUE); final ResultPartitionDeploymentDescriptor descriptor = new ResultPartitionDeploymentDescriptor( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java index 0f05cfdd9a6..7b54105ec0d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImplTest.java @@ -25,7 +25,9 @@ import org.apache.flink.util.TestLoggerExtension; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -42,8 +44,10 @@ class HsFileDataIndexImplTest { private HsFileDataIndex hsDataIndex; @BeforeEach - void before() { - hsDataIndex = new HsFileDataIndexImpl(NUM_SUBPARTITIONS); + void before(@TempDir Path tempDir) throws Exception { + hsDataIndex = + new HsFileDataIndexImpl( + NUM_SUBPARTITIONS, tempDir.resolve(".index"), 256, Long.MAX_VALUE); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java index 401f95ff823..f2b6591884c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java @@ -96,7 +96,8 @@ class HsFileDataManagerTest { new HsFileDataManager( bufferPool, ioExecutor, - new HsFileDataIndexImpl(NUM_SUBPARTITIONS), + new HsFileDataIndexImpl( + NUM_SUBPARTITIONS, tempDir.resolve(".index"), 256, Long.MAX_VALUE), dataFilePath, factory, HybridShuffleConfiguration.builder( @@ -218,7 +219,7 @@ class HsFileDataManagerTest { } @Test - void testRunRequestBufferTimeout() throws Exception { + void testRunRequestBufferTimeout(@TempDir Path tempDir) throws Exception { Duration bufferRequestTimeout = Duration.ofSeconds(3); // request all buffer first. @@ -229,7 +230,8 @@ class HsFileDataManagerTest { new HsFileDataManager( bufferPool, ioExecutor, - new HsFileDataIndexImpl(NUM_SUBPARTITIONS), + new HsFileDataIndexImpl( + NUM_SUBPARTITIONS, tempDir.resolve(".index"), 256, Long.MAX_VALUE), dataFilePath, factory, HybridShuffleConfiguration.builder( @@ -351,7 +353,7 @@ class HsFileDataManagerTest { * release subpartition reader and subpartition reader fail should not be inside lock. */ @Test - void testConsumeWhileReleaseNoDeadlock() throws Exception { + void testConsumeWhileReleaseNoDeadlock(@TempDir Path tempDir) throws Exception { CompletableFuture<Void> consumerStart = new CompletableFuture<>(); CompletableFuture<Void> readerFail = new CompletableFuture<>(); HsSubpartitionConsumer subpartitionView = @@ -363,7 +365,8 @@ class HsFileDataManagerTest { DEFAULT, dataFileChannel, subpartitionView, - new HsFileDataIndexImpl(NUM_SUBPARTITIONS), + new HsFileDataIndexImpl( + NUM_SUBPARTITIONS, tempDir.resolve(".index"), 256, Long.MAX_VALUE), 5, fileDataManager::releaseSubpartitionReader, BufferReaderWriterUtil.allocatedHeaderBuffer()) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java index e3c3663af06..10ad07b69f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java @@ -55,9 +55,12 @@ class HsMemoryDataManagerTest { private Path dataFilePath; + private Path indexFilePath; + @BeforeEach void before(@TempDir Path tempDir) { this.dataFilePath = tempDir.resolve(".data"); + this.indexFilePath = tempDir.resolve(".index"); } @Test @@ -242,7 +245,9 @@ class HsMemoryDataManagerTest { private HsMemoryDataManager createMemoryDataManager(HsSpillingStrategy spillStrategy) throws Exception { - return createMemoryDataManager(spillStrategy, new HsFileDataIndexImpl(NUM_SUBPARTITIONS)); + return createMemoryDataManager( + spillStrategy, + new HsFileDataIndexImpl(NUM_SUBPARTITIONS, indexFilePath, 256, Long.MAX_VALUE)); } private HsMemoryDataManager createMemoryDataManager( @@ -255,7 +260,9 @@ class HsMemoryDataManagerTest { private HsMemoryDataManager createMemoryDataManager( HsSpillingStrategy spillingStrategy, BufferPool bufferPool) throws Exception { return createMemoryDataManager( - bufferPool, spillingStrategy, new HsFileDataIndexImpl(NUM_SUBPARTITIONS)); + bufferPool, + spillingStrategy, + new HsFileDataIndexImpl(NUM_SUBPARTITIONS, indexFilePath, 256, Long.MAX_VALUE)); } private HsMemoryDataManager createMemoryDataManager( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java index b8f5d206a78..2bef058599f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java @@ -79,14 +79,17 @@ class HsSubpartitionFileReaderImplTest { private FileChannel dataFileChannel; + private Path indexFilePath; + private long currentFileOffset; @BeforeEach void before(@TempDir Path tempPath) throws Exception { random = new Random(); Path dataFilePath = Files.createFile(tempPath.resolve(UUID.randomUUID().toString())); + indexFilePath = tempPath.resolve(UUID.randomUUID().toString()); dataFileChannel = openFileChannel(dataFilePath); - diskIndex = new HsFileDataIndexImpl(1); + diskIndex = createDataIndex(1, indexFilePath); subpartitionOperation = new TestingSubpartitionConsumerInternalOperation(); currentFileOffset = 0L; } @@ -97,8 +100,8 @@ class HsSubpartitionFileReaderImplTest { } @Test - void testReadBuffer() throws Exception { - diskIndex = new HsFileDataIndexImpl(2); + void testReadBuffer(@TempDir Path tmpPath) throws Exception { + diskIndex = createDataIndex(2, tmpPath.resolve(".index")); TestingSubpartitionConsumerInternalOperation viewNotifier1 = new TestingSubpartitionConsumerInternalOperation(); TestingSubpartitionConsumerInternalOperation viewNotifier2 = @@ -135,13 +138,14 @@ class HsSubpartitionFileReaderImplTest { @ParameterizedTest @ValueSource(strings = {"LZ4", "LZO", "ZSTD"}) - void testReadBufferCompressed(String compressionFactoryName) throws Exception { + void testReadBufferCompressed(String compressionFactoryName, @TempDir Path tmpPath) + throws Exception { BufferCompressor bufferCompressor = new BufferCompressor(bufferSize, compressionFactoryName); BufferDecompressor bufferDecompressor = new BufferDecompressor(bufferSize, compressionFactoryName); - diskIndex = new HsFileDataIndexImpl(1); + diskIndex = createDataIndex(1, tmpPath.resolve(".index")); TestingSubpartitionConsumerInternalOperation viewNotifier = new TestingSubpartitionConsumerInternalOperation(); HsSubpartitionFileReaderImpl fileReader1 = createSubpartitionFileReader(0, viewNotifier); @@ -360,8 +364,8 @@ class HsSubpartitionFileReaderImplTest { } @Test - void testCompareTo() throws Exception { - diskIndex = new HsFileDataIndexImpl(2); + void testCompareTo(@TempDir Path tempPath) throws Exception { + diskIndex = createDataIndex(2, tempPath.resolve(".index")); TestingSubpartitionConsumerInternalOperation viewNotifier1 = new TestingSubpartitionConsumerInternalOperation(); TestingSubpartitionConsumerInternalOperation viewNotifier2 = @@ -602,6 +606,10 @@ class HsSubpartitionFileReaderImplTest { BufferReaderWriterUtil.allocatedHeaderBuffer()); } + private HsFileDataIndexImpl createDataIndex(int numSubpartitions, Path indexFilePath) { + return new HsFileDataIndexImpl(numSubpartitions, indexFilePath, 256, Long.MAX_VALUE); + } + private static FileChannel openFileChannel(Path path) throws IOException { return FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java index 11ea5c3766b..176d02e649b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java @@ -113,7 +113,8 @@ class HsSubpartitionViewTest { bufferSize, bufferPool, spillingStrategy, - new HsFileDataIndexImpl(1), + new HsFileDataIndexImpl( + 1, dataFilePath.resolve(".index"), 256, Long.MAX_VALUE), dataFilePath.resolve(".data"), null, 0); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndex.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndex.java index 115f1f5c66b..841ec4a5e80 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndex.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndex.java @@ -60,6 +60,11 @@ public class TestingFileDataIndex implements HsFileDataIndex { markBufferReadableConsumer.accept(subpartitionId, bufferIndex); } + @Override + public void close() { + // do nothing. + } + public static Builder builder() { return new Builder(); }
