This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0ef0dbd4077161d47b29cc6130a2da3bc2a8048b Author: Yuxin Tan <[email protected]> AuthorDate: Mon Jul 17 15:02:04 2023 +0800 [FLINK-32576][network] Extract file data index cache for hybrid shuffle --- .../generated/all_taskmanager_network_section.html | 4 +- .../netty_shuffle_environment_configuration.html | 4 +- .../NettyShuffleEnvironmentOptions.java | 10 +- .../io/network/NettyShuffleServiceFactory.java | 2 +- .../network/partition/ResultPartitionFactory.java | 8 +- .../partition/hybrid/HsFileDataIndexImpl.java | 125 ++++--- .../HsFileDataIndexSpilledRegionManagerImpl.java | 402 -------------------- .../partition/hybrid/HsResultPartition.java | 2 +- .../hybrid/HybridShuffleConfiguration.java | 20 +- .../FileDataIndexCache.java} | 65 ++-- .../hybrid/index/FileDataIndexRegionHelper.java | 92 +++++ .../FileDataIndexSpilledRegionManager.java} | 17 +- .../FileDataIndexSpilledRegionManagerImpl.java | 415 +++++++++++++++++++++ .../FileRegionWriteReadUtils.java} | 26 +- .../NettyShuffleEnvironmentConfiguration.java | 13 +- .../partition/hybrid/HybridShuffleTestUtils.java | 55 ++- .../TestingFileDataIndexSpilledRegionManager.java | 38 +- .../FileDataIndexCacheTest.java} | 72 ++-- ...FileDataIndexSpilledRegionManagerImplTest.java} | 142 ++++--- .../FileRegionWriteReadUtilsTest.java} | 36 +- .../hybrid/index/TestingFileDataIndexRegion.java | 176 +++++++++ .../index/TestingFileDataIndexRegionHelper.java | 86 +++++ 22 files changed, 1118 insertions(+), 692 deletions(-) diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html index b82a5bee720..e71647520cd 100644 --- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html +++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html @@ -39,10 +39,10 @@ <td>Controls the max number of hybrid retained regions in memory.</td> </tr> <tr> - <td><h5>taskmanager.network.hybrid-shuffle.spill-index-segment-size</h5></td> + <td><h5>taskmanager.network.hybrid-shuffle.spill-index-region-group-size</h5></td> <td style="word-wrap: break-word;">1024</td> <td>Integer</td> - <td>Controls the segment size(in bytes) of hybrid spilled file data index.</td> + <td>Controls the region group size(in bytes) of hybrid spilled file data index. </td> </tr> <tr> <td><h5>taskmanager.network.max-num-tcp-connections</h5></td> diff --git a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html index 9730839c0c4..6cd4621656a 100644 --- a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html +++ b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html @@ -57,10 +57,10 @@ <td>Controls the max number of hybrid retained regions in memory.</td> </tr> <tr> - <td><h5>taskmanager.network.hybrid-shuffle.spill-index-segment-size</h5></td> + <td><h5>taskmanager.network.hybrid-shuffle.spill-index-region-group-size</h5></td> <td style="word-wrap: break-word;">1024</td> <td>Integer</td> - <td>Controls the segment size(in bytes) of hybrid spilled file data index.</td> + <td>Controls the region group size(in bytes) of hybrid spilled file data index. </td> </tr> <tr> <td><h5>taskmanager.network.max-num-tcp-connections</h5></td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java index 1af9deadf35..d1fbe2474ce 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java @@ -315,14 +315,16 @@ public class NettyShuffleEnvironmentOptions { // this raw value must be changed correspondingly "taskmanager.memory.framework.off-heap.batch-shuffle.size")); - /** Segment size of hybrid spilled file data index. */ + /** Region group size of hybrid spilled file data index. */ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) - public static final ConfigOption<Integer> HYBRID_SHUFFLE_SPILLED_INDEX_SEGMENT_SIZE = - key("taskmanager.network.hybrid-shuffle.spill-index-segment-size") + public static final ConfigOption<Integer> HYBRID_SHUFFLE_SPILLED_INDEX_REGION_GROUP_SIZE = + key("taskmanager.network.hybrid-shuffle.spill-index-region-group-size") .intType() .defaultValue(1024) + .withDeprecatedKeys( + "taskmanager.network.hybrid-shuffle.spill-index-segment-size") .withDescription( - "Controls the segment size(in bytes) of hybrid spilled file data index."); + "Controls the region group size(in bytes) of hybrid spilled file data index. "); /** Max number of hybrid retained regions in memory. */ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java index b20f5820e4f..de8ba5f120b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java @@ -211,7 +211,7 @@ public class NettyShuffleServiceFactory config.sortShuffleMinParallelism(), config.isSSLEnabled(), config.getMaxOverdraftBuffersPerGate(), - config.getHybridShuffleSpilledIndexSegmentSize(), + config.getHybridShuffleSpilledIndexRegionGroupSize(), config.getHybridShuffleNumRetainedInMemoryRegionsMax()); SingleInputGateFactory singleInputGateFactory = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java index 069536e8664..4642312da87 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java @@ -75,7 +75,7 @@ public class ResultPartitionFactory { private final int sortShuffleMinParallelism; - private final int hybridShuffleSpilledIndexSegmentSize; + private final int hybridShuffleSpilledIndexRegionGroupSize; private final long hybridShuffleNumRetainedInMemoryRegionsMax; @@ -100,7 +100,7 @@ public class ResultPartitionFactory { int sortShuffleMinParallelism, boolean sslEnabled, int maxOverdraftBuffersPerGate, - int hybridShuffleSpilledIndexSegmentSize, + int hybridShuffleSpilledIndexRegionGroupSize, long hybridShuffleNumRetainedInMemoryRegionsMax) { this.partitionManager = partitionManager; @@ -119,7 +119,7 @@ public class ResultPartitionFactory { this.sortShuffleMinParallelism = sortShuffleMinParallelism; this.sslEnabled = sslEnabled; this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate; - this.hybridShuffleSpilledIndexSegmentSize = hybridShuffleSpilledIndexSegmentSize; + this.hybridShuffleSpilledIndexRegionGroupSize = hybridShuffleSpilledIndexRegionGroupSize; this.hybridShuffleNumRetainedInMemoryRegionsMax = hybridShuffleNumRetainedInMemoryRegionsMax; } @@ -261,7 +261,7 @@ public class ResultPartitionFactory { resultPartitionType == ResultPartitionType.HYBRID_FULL ? HybridShuffleConfiguration.SpillingStrategyType.FULL : HybridShuffleConfiguration.SpillingStrategyType.SELECTIVE) - .setSpilledIndexSegmentSize(hybridShuffleSpilledIndexSegmentSize) + .setRegionGroupSizeInBytes(hybridShuffleSpilledIndexRegionGroupSize) .setNumRetainedInMemoryRegionsMax(hybridShuffleNumRetainedInMemoryRegionsMax) .build(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java index c832d715329..4b4221225cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java @@ -18,12 +18,18 @@ package org.apache.flink.runtime.io.network.partition.hybrid; +import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexCache; +import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexRegionHelper; +import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexSpilledRegionManagerImpl; +import org.apache.flink.runtime.io.network.partition.hybrid.index.FileRegionWriteReadUtils; import org.apache.flink.util.ExceptionUtils; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; @@ -34,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import static org.apache.flink.runtime.io.network.partition.hybrid.index.FileRegionWriteReadUtils.allocateAndConfigureBuffer; import static org.apache.flink.util.Preconditions.checkArgument; /** Default implementation of {@link HsFileDataIndex}. */ @@ -41,25 +48,26 @@ import static org.apache.flink.util.Preconditions.checkArgument; public class HsFileDataIndexImpl implements HsFileDataIndex { @GuardedBy("lock") - private final HsFileDataIndexCache indexCache; + private final FileDataIndexCache<InternalRegion> indexCache; - /** - * {@link HsFileDataIndexCache} is not thread-safe, any access to it needs to hold this lock. - */ + /** {@link FileDataIndexCache} is not thread-safe, any access to it needs to hold this lock. */ private final Object lock = new Object(); public HsFileDataIndexImpl( int numSubpartitions, Path indexFilePath, - int spilledIndexSegmentSize, + int regionGroupSizeInBytes, long numRetainedInMemoryRegionsMax) { this.indexCache = - new HsFileDataIndexCache( + new FileDataIndexCache<>( numSubpartitions, indexFilePath, numRetainedInMemoryRegionsMax, - new HsFileDataIndexSpilledRegionManagerImpl.Factory( - spilledIndexSegmentSize, numRetainedInMemoryRegionsMax)); + new FileDataIndexSpilledRegionManagerImpl.Factory<>( + regionGroupSizeInBytes, + numRetainedInMemoryRegionsMax, + InternalRegion.HEADER_SIZE, + HsFileDataIndexRegionHelper.INSTANCE)); } @Override @@ -163,24 +171,11 @@ public class HsFileDataIndexImpl implements HsFileDataIndex { } /** - * A {@link InternalRegion} represents a series of physically continuous buffers in the file, - * which are from the same subpartition, and has sequential buffer index. - * - * <p>The following example illustrates some physically continuous buffers in a file and regions - * upon them, where `x-y` denotes buffer from subpartition x with buffer index y, and `()` - * denotes a region. - * - * <p>(1-1, 1-2), (2-1), (2-2, 2-3), (1-5, 1-6), (1-4) - * - * <p>Note: The file may not contain all the buffers. E.g., 1-3 is missing in the above example. - * - * <p>Note: Buffers in file may have different orders than their buffer index. E.g., 1-4 comes - * after 1-6 in the above example. - * - * <p>Note: This index may not always maintain the longest possible regions. E.g., 2-1, 2-2, 2-3 - * are in two separate regions. + * A {@link InternalRegion} is an implementation of {@link FileDataIndexRegionHelper.Region}. + * Note that this class introduced a new field to indicate whether each buffer in the region is + * released. */ - static class InternalRegion { + public static class InternalRegion implements FileDataIndexRegionHelper.Region { /** * {@link InternalRegion} is consists of header and payload. (firstBufferIndex, * firstBufferOffset, numBuffer) are immutable header part that have fixed size. The array @@ -189,30 +184,51 @@ public class HsFileDataIndexImpl implements HsFileDataIndex { public static final int HEADER_SIZE = Integer.BYTES + Long.BYTES + Integer.BYTES; private final int firstBufferIndex; - private final long firstBufferOffset; + private final long regionFileOffset; private final int numBuffers; private final boolean[] released; - private InternalRegion(int firstBufferIndex, long firstBufferOffset, int numBuffers) { + private InternalRegion(int firstBufferIndex, long regionFileOffset, int numBuffers) { this.firstBufferIndex = firstBufferIndex; - this.firstBufferOffset = firstBufferOffset; + this.regionFileOffset = regionFileOffset; this.numBuffers = numBuffers; this.released = new boolean[numBuffers]; Arrays.fill(released, false); } - InternalRegion( - int firstBufferIndex, long firstBufferOffset, int numBuffers, boolean[] released) { + public InternalRegion( + int firstBufferIndex, long regionFileOffset, int numBuffers, boolean[] released) { this.firstBufferIndex = firstBufferIndex; - this.firstBufferOffset = firstBufferOffset; + this.regionFileOffset = regionFileOffset; this.numBuffers = numBuffers; this.released = released; } - boolean containBuffer(int bufferIndex) { + @Override + public boolean containBuffer(int bufferIndex) { return bufferIndex >= firstBufferIndex && bufferIndex < firstBufferIndex + numBuffers; } + @Override + public int getSize() { + return HEADER_SIZE + numBuffers; + } + + @Override + public int getFirstBufferIndex() { + return firstBufferIndex; + } + + @Override + public long getRegionFileOffset() { + return regionFileOffset; + } + + @Override + public int getNumBuffers() { + return numBuffers; + } + private HsFileDataIndex.ReadableRegion toReadableRegion( int bufferIndex, int consumingOffset) { int nSkip = bufferIndex - firstBufferIndex; @@ -223,32 +239,49 @@ public class HsFileDataIndexImpl implements HsFileDataIndex { } ++nReadable; } - return new ReadableRegion(nSkip, nReadable, firstBufferOffset); + return new ReadableRegion(nSkip, nReadable, regionFileOffset); } private void markBufferReleased(int bufferIndex) { released[bufferIndex - firstBufferIndex] = true; } - /** Get the total size in bytes of this region, including header and payload. */ - int getSize() { - return HEADER_SIZE + numBuffers; + public boolean[] getReleased() { + return released; } + } - int getFirstBufferIndex() { - return firstBufferIndex; - } + /** + * The implementation of {@link FileDataIndexRegionHelper} to writing a region to the file or + * reading a region from the file. + * + * <p>Note that this type of region's length may be variable because it contains an array to + * indicate each buffer's release state. + */ + public static class HsFileDataIndexRegionHelper + implements FileDataIndexRegionHelper<InternalRegion> { - long getFirstBufferOffset() { - return firstBufferOffset; - } + /** Reusable buffer used to read and write the immutable part of region. */ + private final ByteBuffer regionHeaderBuffer = + allocateAndConfigureBuffer(HsFileDataIndexImpl.InternalRegion.HEADER_SIZE); - int getNumBuffers() { - return numBuffers; + public static final HsFileDataIndexRegionHelper INSTANCE = + new HsFileDataIndexRegionHelper(); + + private HsFileDataIndexRegionHelper() {} + + @Override + public void writeRegionToFile(FileChannel channel, InternalRegion region) + throws IOException { + FileRegionWriteReadUtils.writeHsInternalRegionToFile( + channel, regionHeaderBuffer, region); } - boolean[] getReleased() { - return released; + @Override + public InternalRegion readRegionFromFile(FileChannel channel, long fileOffset) + throws IOException { + return FileRegionWriteReadUtils.readHsInternalRegionFromFile( + channel, regionHeaderBuffer, fileOffset); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImpl.java deleted file mode 100644 index c87c0242b7e..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImpl.java +++ /dev/null @@ -1,402 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.partition.hybrid; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; -import org.apache.flink.util.ExceptionUtils; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.TreeMap; -import java.util.function.BiConsumer; - -import static org.apache.flink.runtime.io.network.partition.hybrid.InternalRegionWriteReadUtils.allocateAndConfigureBuffer; - -/** - * Default implementation of {@link HsFileDataIndexSpilledRegionManager}. This manager will handle - * and spill regions in the following way: - * - * <ul> - * <li>All regions will be written to the same file, namely index file. - * <li>Multiple regions belonging to the same subpartition form a segment. - * <li>The regions in the same segment have no special relationship, but are only related to the - * order in which they are spilled. - * <li>Each segment is independent. Even if the previous segment is not full, the next segment can - * still be allocated. - * <li>If a region has been written to the index file already, spill it again will overwrite the - * previous region. - * <li>The very large region will monopolize a single segment. - * </ul> - * - * <p>The relationships between index file and segment are shown below. - * - * <pre> - * - * - - - - - - - - - Index File - - — - - - - - - - - - - * | | - * | - - — -Segment1 - - - - - - - - Segment2- - - - | - * ||SP1 R1||SP1 R2| Free | |SP2 R3| SP2 R1| SP2 R2 | | - * | - - - - - - - - - - - - - - - - - - - - - - - - | - * | | - * | - - - - - - - -Segment3 - - - - - - - | - * || Big Region | | - * | - - - - - - - - - - - - - - - - - - - | - * - - - - - - - - - - - - - - - - - - - - - -- - - - - - * </pre> - */ -public class HsFileDataIndexSpilledRegionManagerImpl - implements HsFileDataIndexSpilledRegionManager { - - /** Reusable buffer used to read and write the immutable part of region. */ - private final ByteBuffer regionHeaderBuffer = - allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE); - - /** - * List of subpartition's segment meta. Each element is a treeMap contains all {@link - * SegmentMeta}'s of specific subpartition corresponding to the subscript. The value of this - * treeMap is a {@link SegmentMeta}, and the key is minBufferIndex of this segment. Only - * finished(i.e. no longer appended) segment will be put to here. - */ - private final List<TreeMap<Integer, SegmentMeta>> subpartitionFinishedSegmentMetas; - - private FileChannel channel; - - /** The Offset of next segment, new segment will start from this offset. */ - private long nextSegmentOffset = 0L; - - private final long[] subpartitionCurrentOffset; - - /** Free space of every subpartition's current segment. */ - private final int[] subpartitionFreeSpaceInBytes; - - /** Metadata of every subpartition's current segment. */ - private final SegmentMeta[] currentSegmentMeta; - - /** - * Default size of segment. If the size of a region is larger than this value, it will be - * allocated and occupy a single segment. - */ - private final int segmentSizeInBytes; - - /** - * This consumer is used to load region to cache. The first parameter is subpartition id, and - * second parameter is the region to load. - */ - private final BiConsumer<Integer, InternalRegion> cacheRegionConsumer; - - /** - * When region in segment needs to be loaded to cache, whether to load all regions of the entire - * segment. - */ - private final boolean loadEntireSegmentToCache; - - public HsFileDataIndexSpilledRegionManagerImpl( - int numSubpartitions, - Path indexFilePath, - int segmentSizeInBytes, - long maxCacheCapacity, - BiConsumer<Integer, InternalRegion> cacheRegionConsumer) { - try { - this.channel = - FileChannel.open( - indexFilePath, - StandardOpenOption.CREATE_NEW, - StandardOpenOption.READ, - StandardOpenOption.WRITE); - } catch (IOException e) { - ExceptionUtils.rethrow(e); - } - this.loadEntireSegmentToCache = - shouldLoadEntireSegmentToCache( - numSubpartitions, segmentSizeInBytes, maxCacheCapacity); - this.subpartitionFinishedSegmentMetas = new ArrayList<>(numSubpartitions); - this.subpartitionCurrentOffset = new long[numSubpartitions]; - this.subpartitionFreeSpaceInBytes = new int[numSubpartitions]; - this.currentSegmentMeta = new SegmentMeta[numSubpartitions]; - for (int i = 0; i < numSubpartitions; i++) { - subpartitionFinishedSegmentMetas.add(new TreeMap<>()); - } - this.cacheRegionConsumer = cacheRegionConsumer; - this.segmentSizeInBytes = segmentSizeInBytes; - } - - @Override - public long findRegion(int subpartition, int bufferIndex, boolean loadToCache) { - // first of all, find the region from current writing segment. - SegmentMeta segmentMeta = currentSegmentMeta[subpartition]; - if (segmentMeta != null) { - long regionOffset = - findRegionInSegment(subpartition, bufferIndex, segmentMeta, loadToCache); - if (regionOffset != -1) { - return regionOffset; - } - } - - // next, find the region from finished segments. - TreeMap<Integer, SegmentMeta> subpartitionSegmentMetaTreeMap = - subpartitionFinishedSegmentMetas.get(subpartition); - // all segments with a minBufferIndex less than or equal to this target buffer index may - // contain the target region. - for (SegmentMeta meta : - subpartitionSegmentMetaTreeMap.headMap(bufferIndex, true).values()) { - long regionOffset = findRegionInSegment(subpartition, bufferIndex, meta, loadToCache); - if (regionOffset != -1) { - return regionOffset; - } - } - return -1; - } - - private long findRegionInSegment( - int subpartition, int bufferIndex, SegmentMeta meta, boolean loadToCache) { - if (bufferIndex <= meta.getMaxBufferIndex()) { - try { - return readSegmentAndLoadToCacheIfNeeded( - subpartition, bufferIndex, meta, loadToCache); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - // -1 indicates that target region is not founded from this segment - return -1; - } - - private long readSegmentAndLoadToCacheIfNeeded( - int subpartition, int bufferIndex, SegmentMeta meta, boolean loadToCache) - throws IOException { - // read all regions belong to this segment. - List<Tuple2<InternalRegion, Long>> regionAndOffsets = - readSegment(meta.getOffset(), meta.getNumRegions()); - // -1 indicates that target region is not founded from this segment. - long targetRegionOffset = -1; - InternalRegion targetRegion = null; - // traverse all regions to find target. - Iterator<Tuple2<InternalRegion, Long>> it = regionAndOffsets.iterator(); - while (it.hasNext()) { - Tuple2<InternalRegion, Long> regionAndOffset = it.next(); - InternalRegion region = regionAndOffset.f0; - // whether the region contains this buffer. - if (region.containBuffer(bufferIndex)) { - // target region is founded. - targetRegion = region; - targetRegionOffset = regionAndOffset.f1; - it.remove(); - } - } - - // target region is founded and need to load to cache. - if (targetRegion != null && loadToCache) { - if (loadEntireSegmentToCache) { - // first of all, load all regions except target to cache. - regionAndOffsets.forEach( - (regionAndOffsetTuple) -> - cacheRegionConsumer.accept(subpartition, regionAndOffsetTuple.f0)); - // load target region to cache in the end, this is to prevent the target - // from being eliminated. - cacheRegionConsumer.accept(subpartition, targetRegion); - } else { - // only load target region to cache. - cacheRegionConsumer.accept(subpartition, targetRegion); - } - } - // return the offset of target region. - return targetRegionOffset; - } - - @Override - public void appendOrOverwriteRegion(int subpartition, InternalRegion newRegion) - throws IOException { - // This method will only be called when we want to eliminate a region. We can't let the - // region be reloaded into the cache, otherwise it will lead to an infinite loop. - long oldRegionOffset = findRegion(subpartition, newRegion.getFirstBufferIndex(), false); - if (oldRegionOffset != -1) { - // if region is already exists in file, overwrite it. - writeRegionToOffset(oldRegionOffset, newRegion); - } else { - // otherwise, append region to segment. - appendRegion(subpartition, newRegion); - } - } - - @Override - public void close() throws IOException { - if (channel != null) { - channel.close(); - } - } - - private static boolean shouldLoadEntireSegmentToCache( - int numSubpartitions, int segmentSizeInBytes, long maxCacheCapacity) { - // If the cache can put at least two segments (one for reading and one for writing) for each - // subpartition, it is reasonable to load the entire segment into memory, which can improve - // the cache hit rate. On the contrary, if the cache capacity is small, loading a large - // number of regions will lead to performance degradation,only the target region should be - // loaded. - return ((long) 2 * numSubpartitions * segmentSizeInBytes) / InternalRegion.HEADER_SIZE - <= maxCacheCapacity; - } - - private void appendRegion(int subpartition, InternalRegion region) throws IOException { - int regionSize = region.getSize(); - // check whether we have enough space to append this region. - if (subpartitionFreeSpaceInBytes[subpartition] < regionSize) { - // No enough free space, start a new segment. Note that if region is larger than - // segment's size, this will start a new segment only contains the big region. - startNewSegment(subpartition, Math.max(regionSize, segmentSizeInBytes)); - } - // spill this region to current offset of file index. - writeRegionToOffset(subpartitionCurrentOffset[subpartition], region); - // a new region was appended to segment, update it. - updateSegment(subpartition, region); - } - - private void writeRegionToOffset(long offset, InternalRegion region) throws IOException { - channel.position(offset); - InternalRegionWriteReadUtils.writeRegionToFile(channel, regionHeaderBuffer, region); - } - - private void startNewSegment(int subpartition, int newSegmentSize) { - SegmentMeta oldSegmentMeta = currentSegmentMeta[subpartition]; - currentSegmentMeta[subpartition] = new SegmentMeta(nextSegmentOffset); - subpartitionCurrentOffset[subpartition] = nextSegmentOffset; - nextSegmentOffset += newSegmentSize; - subpartitionFreeSpaceInBytes[subpartition] = newSegmentSize; - if (oldSegmentMeta != null) { - // put the finished segment to subpartitionFinishedSegmentMetas. - subpartitionFinishedSegmentMetas - .get(subpartition) - .put(oldSegmentMeta.minBufferIndex, oldSegmentMeta); - } - } - - private void updateSegment(int subpartition, InternalRegion region) { - int regionSize = region.getSize(); - subpartitionFreeSpaceInBytes[subpartition] -= regionSize; - subpartitionCurrentOffset[subpartition] += regionSize; - SegmentMeta segmentMeta = currentSegmentMeta[subpartition]; - segmentMeta.addRegion( - region.getFirstBufferIndex(), - region.getFirstBufferIndex() + region.getNumBuffers() - 1); - } - - /** - * Read segment from index file. - * - * @param offset offset of this segment. - * @param numRegions number of regions of this segment. - * @return List of all regions and its offset belong to this segment. - */ - private List<Tuple2<InternalRegion, Long>> readSegment(long offset, int numRegions) - throws IOException { - List<Tuple2<InternalRegion, Long>> regionAndOffsets = new ArrayList<>(); - for (int i = 0; i < numRegions; i++) { - InternalRegion region = - InternalRegionWriteReadUtils.readRegionFromFile( - channel, regionHeaderBuffer, offset); - regionAndOffsets.add(Tuple2.of(region, offset)); - offset += region.getSize(); - } - return regionAndOffsets; - } - - /** - * Metadata of spilled regions segment. When a segment is finished(i.e. no longer appended), its - * corresponding {@link SegmentMeta} becomes immutable. - */ - private static class SegmentMeta { - /** - * Minimum buffer index of this segment. It is the smallest bufferIndex(inclusive) in all - * regions belong to this segment. - */ - private int minBufferIndex; - - /** - * Maximum buffer index of this segment. It is the largest bufferIndex(inclusive) in all - * regions belong to this segment. - */ - private int maxBufferIndex; - - /** Number of regions belong to this segment. */ - private int numRegions; - - /** The index file offset of this segment. */ - private final long offset; - - public SegmentMeta(long offset) { - this.offset = offset; - this.minBufferIndex = Integer.MAX_VALUE; - this.maxBufferIndex = 0; - this.numRegions = 0; - } - - public int getMaxBufferIndex() { - return maxBufferIndex; - } - - public long getOffset() { - return offset; - } - - public int getNumRegions() { - return numRegions; - } - - public void addRegion(int firstBufferIndexOfRegion, int maxBufferIndexOfRegion) { - if (firstBufferIndexOfRegion < minBufferIndex) { - this.minBufferIndex = firstBufferIndexOfRegion; - } - if (maxBufferIndexOfRegion > maxBufferIndex) { - this.maxBufferIndex = maxBufferIndexOfRegion; - } - this.numRegions++; - } - } - - /** Factory of {@link HsFileDataIndexSpilledRegionManager}. */ - public static class Factory implements HsFileDataIndexSpilledRegionManager.Factory { - private final int segmentSizeInBytes; - - private final long maxCacheCapacity; - - public Factory(int segmentSizeInBytes, long maxCacheCapacity) { - this.segmentSizeInBytes = segmentSizeInBytes; - this.maxCacheCapacity = maxCacheCapacity; - } - - @Override - public HsFileDataIndexSpilledRegionManager create( - int numSubpartitions, - Path indexFilePath, - BiConsumer<Integer, InternalRegion> cacheRegionConsumer) { - return new HsFileDataIndexSpilledRegionManagerImpl( - numSubpartitions, - indexFilePath, - segmentSizeInBytes, - maxCacheCapacity, - cacheRegionConsumer); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java index 602e13a62fa..c0298325caf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java @@ -116,7 +116,7 @@ public class HsResultPartition extends ResultPartition { new HsFileDataIndexImpl( isBroadcastOnly ? 1 : numSubpartitions, new File(dataFileBashPath + INDEX_FILE_SUFFIX).toPath(), - hybridShuffleConfiguration.getSpilledIndexSegmentSize(), + hybridShuffleConfiguration.getRegionGroupSizeInBytes(), hybridShuffleConfiguration.getNumRetainedInMemoryRegionsMax()); this.hybridShuffleConfiguration = hybridShuffleConfiguration; this.isBroadcastOnly = isBroadcastOnly; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java index 27667afcd47..c41f19e9e24 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleConfiguration.java @@ -40,7 +40,7 @@ public class HybridShuffleConfiguration { private static final long DEFAULT_NUM_RETAINED_IN_MEMORY_REGIONS_MAX = Long.MAX_VALUE; - private static final int DEFAULT_SPILLED_INDEX_SEGMENT_SIZE = 256; + private static final int DEFAULT_REGION_GROUP_SIZE_IN_BYTES = 256; private static final SpillingStrategyType DEFAULT_SPILLING_STRATEGY_NAME = SpillingStrategyType.FULL; @@ -55,7 +55,7 @@ public class HybridShuffleConfiguration { private final long numRetainedInMemoryRegionsMax; - private final int spilledIndexSegmentSize; + private final int regionGroupSizeInBytes; private final long bufferPoolSizeCheckIntervalMs; @@ -87,7 +87,7 @@ public class HybridShuffleConfiguration { SpillingStrategyType spillingStrategyType, long bufferPoolSizeCheckIntervalMs, long numRetainedInMemoryRegionsMax, - int spilledIndexSegmentSize) { + int regionGroupSizeInBytes) { this.maxBuffersReadAhead = maxBuffersReadAhead; this.bufferRequestTimeout = bufferRequestTimeout; this.maxRequestedBuffers = maxRequestedBuffers; @@ -100,7 +100,7 @@ public class HybridShuffleConfiguration { this.spillingStrategyType = spillingStrategyType; this.bufferPoolSizeCheckIntervalMs = bufferPoolSizeCheckIntervalMs; this.numRetainedInMemoryRegionsMax = numRetainedInMemoryRegionsMax; - this.spilledIndexSegmentSize = spilledIndexSegmentSize; + this.regionGroupSizeInBytes = regionGroupSizeInBytes; } public static Builder builder(int numSubpartitions, int numBuffersPerRequest) { @@ -172,8 +172,8 @@ public class HybridShuffleConfiguration { } /** Segment size of hybrid spilled file data index. */ - public int getSpilledIndexSegmentSize() { - return spilledIndexSegmentSize; + public int getRegionGroupSizeInBytes() { + return regionGroupSizeInBytes; } /** Max number of hybrid retained regions in memory. */ @@ -211,7 +211,7 @@ public class HybridShuffleConfiguration { private long numRetainedInMemoryRegionsMax = DEFAULT_NUM_RETAINED_IN_MEMORY_REGIONS_MAX; - private int spilledIndexSegmentSize = DEFAULT_SPILLED_INDEX_SEGMENT_SIZE; + private int regionGroupSizeInBytes = DEFAULT_REGION_GROUP_SIZE_IN_BYTES; private final int numSubpartitions; @@ -275,8 +275,8 @@ public class HybridShuffleConfiguration { return this; } - public Builder setSpilledIndexSegmentSize(int spilledIndexSegmentSize) { - this.spilledIndexSegmentSize = spilledIndexSegmentSize; + public Builder setRegionGroupSizeInBytes(int regionGroupSizeInBytes) { + this.regionGroupSizeInBytes = regionGroupSizeInBytes; return this; } @@ -293,7 +293,7 @@ public class HybridShuffleConfiguration { spillingStrategyType, bufferPoolSizeCheckIntervalMs, numRetainedInMemoryRegionsMax, - spilledIndexSegmentSize); + regionGroupSizeInBytes); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCache.java similarity index 77% rename from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCache.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCache.java index 171007e3220..24029da69a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCache.java @@ -16,9 +16,8 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.partition.hybrid; +package org.apache.flink.runtime.io.network.partition.hybrid.index; -import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.IOUtils; @@ -42,25 +41,25 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * and automatically caches some indexes in memory. When there are too many cached indexes, it is * this class's responsibility to decide and eliminate some indexes to disk. */ -public class HsFileDataIndexCache { +public class FileDataIndexCache<T extends FileDataIndexRegionHelper.Region> { /** - * This struct stores all in memory {@link InternalRegion}s. Each element is a treeMap contains - * all in memory {@link InternalRegion}'s of specific subpartition corresponding to the - * subscript. The value of this treeMap is a {@link InternalRegion}, and the key is - * firstBufferIndex of this region. Only cached in memory region will be put to here. + * This struct stores all in memory {@link FileDataIndexRegionHelper.Region}s. Each element is a + * treeMap contains all in memory {@link FileDataIndexRegionHelper.Region}'s of specific + * subpartition corresponding to the subscript. The value of this treeMap is a {@link + * FileDataIndexRegionHelper.Region}, and the key is firstBufferIndex of this region. Only + * cached in memory region will be put to here. */ - private final List<TreeMap<Integer, InternalRegion>> - subpartitionFirstBufferIndexInternalRegions; + private final List<TreeMap<Integer, T>> subpartitionFirstBufferIndexRegions; /** * This cache is used to help eliminate regions from memory. It is only maintains the key of * each in memory region, the value is just a placeholder. Note that this internal cache must be - * consistent with subpartitionFirstBufferIndexInternalRegions, that means both of them must add + * consistent with subpartitionFirstBufferIndexHsBaseRegions, that means both of them must add * or delete elements at the same time. */ private final Cache<CachedRegionKey, Object> internalCache; - private final HsFileDataIndexSpilledRegionManager spilledRegionManager; + private final FileDataIndexSpilledRegionManager<T> spilledRegionManager; private final Path indexFilePath; @@ -70,14 +69,14 @@ public class HsFileDataIndexCache { */ public static final Object PLACEHOLDER = new Object(); - public HsFileDataIndexCache( + public FileDataIndexCache( int numSubpartitions, Path indexFilePath, long numRetainedInMemoryRegionsMax, - HsFileDataIndexSpilledRegionManager.Factory spilledRegionManagerFactory) { - this.subpartitionFirstBufferIndexInternalRegions = new ArrayList<>(numSubpartitions); + FileDataIndexSpilledRegionManager.Factory<T> spilledRegionManagerFactory) { + this.subpartitionFirstBufferIndexRegions = new ArrayList<>(numSubpartitions); for (int subpartitionId = 0; subpartitionId < numSubpartitions; ++subpartitionId) { - subpartitionFirstBufferIndexInternalRegions.add(new TreeMap<>()); + subpartitionFirstBufferIndexRegions.add(new TreeMap<>()); } this.internalCache = CacheBuilder.newBuilder() @@ -93,7 +92,7 @@ public class HsFileDataIndexCache { if (!getCachedRegionContainsTargetBufferIndex( subpartition, region.getFirstBufferIndex()) .isPresent()) { - subpartitionFirstBufferIndexInternalRegions + subpartitionFirstBufferIndexRegions .get(subpartition) .put(region.getFirstBufferIndex(), region); internalCache.put( @@ -117,12 +116,12 @@ public class HsFileDataIndexCache { * @return If target region can be founded from memory or disk, return optional contains target * region. Otherwise, return {@code Optional#empty()}; */ - public Optional<InternalRegion> get(int subpartitionId, int bufferIndex) { + public Optional<T> get(int subpartitionId, int bufferIndex) { // first of all, try to get region in memory. - Optional<InternalRegion> regionOpt = + Optional<T> regionOpt = getCachedRegionContainsTargetBufferIndex(subpartitionId, bufferIndex); if (regionOpt.isPresent()) { - InternalRegion region = regionOpt.get(); + T region = regionOpt.get(); checkNotNull( // this is needed for cache entry remove algorithm like LRU. internalCache.getIfPresent( @@ -139,22 +138,20 @@ public class HsFileDataIndexCache { * Put regions to cache. * * @param subpartition the subpartition's id of regions. - * @param internalRegions regions to be cached. + * @param fileRegions regions to be cached. */ - public void put(int subpartition, List<InternalRegion> internalRegions) { - TreeMap<Integer, InternalRegion> treeMap = - subpartitionFirstBufferIndexInternalRegions.get(subpartition); - for (InternalRegion internalRegion : internalRegions) { + public void put(int subpartition, List<T> fileRegions) { + TreeMap<Integer, T> treeMap = subpartitionFirstBufferIndexRegions.get(subpartition); + for (T region : fileRegions) { internalCache.put( - new CachedRegionKey(subpartition, internalRegion.getFirstBufferIndex()), - PLACEHOLDER); - treeMap.put(internalRegion.getFirstBufferIndex(), internalRegion); + new CachedRegionKey(subpartition, region.getFirstBufferIndex()), PLACEHOLDER); + treeMap.put(region.getFirstBufferIndex(), region); } } /** - * Close {@link HsFileDataIndexCache}, this will delete the index file. After that, the index - * can no longer be read or written. + * Close {@link FileDataIndexCache}, this will delete the index file. After that, the index can + * no longer be read or written. */ public void close() throws IOException { spilledRegionManager.close(); @@ -165,8 +162,8 @@ public class HsFileDataIndexCache { private void handleRemove(RemovalNotification<CachedRegionKey, Object> removedEntry) { CachedRegionKey removedKey = removedEntry.getKey(); // remove the corresponding region from memory. - InternalRegion removedRegion = - subpartitionFirstBufferIndexInternalRegions + T removedRegion = + subpartitionFirstBufferIndexRegions .get(removedKey.getSubpartition()) .remove(removedKey.getFirstBufferIndex()); @@ -175,7 +172,7 @@ public class HsFileDataIndexCache { writeRegion(removedKey.getSubpartition(), removedRegion); } - private void writeRegion(int subpartition, InternalRegion region) { + private void writeRegion(int subpartition, T region) { try { spilledRegionManager.appendOrOverwriteRegion(subpartition, region); } catch (IOException e) { @@ -191,10 +188,10 @@ public class HsFileDataIndexCache { * @return If target region is cached in memory, return optional contains target region. * Otherwise, return {@code Optional#empty()}; */ - private Optional<InternalRegion> getCachedRegionContainsTargetBufferIndex( + private Optional<T> getCachedRegionContainsTargetBufferIndex( int subpartitionId, int bufferIndex) { return Optional.ofNullable( - subpartitionFirstBufferIndexInternalRegions + subpartitionFirstBufferIndexRegions .get(subpartitionId) .floorEntry(bufferIndex)) .map(Map.Entry::getValue) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java new file mode 100644 index 00000000000..c0d4270e109 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.index; + +import java.io.IOException; +import java.nio.channels.FileChannel; + +/** + * {@link FileDataIndexRegionHelper} is responsible for writing a {@link Region} to the file or + * reading a {@link Region} from file. + */ +public interface FileDataIndexRegionHelper<T extends FileDataIndexRegionHelper.Region> { + + /** + * Write the region to the file. + * + * @param channel the file channel to write the region + * @param region the region to be written to the file + */ + void writeRegionToFile(FileChannel channel, T region) throws IOException; + + /** + * Read a region from the file. + * + * @param channel the file channel to read the region + * @param fileOffset the current region data is from this file offset, so start reading the file + * from the offset when reading the region + * @return the region read from the file + */ + T readRegionFromFile(FileChannel channel, long fileOffset) throws IOException; + + /** + * A {@link Region} Represents a series of buffers that are: + * + * <ul> + * <li>From the same subpartition + * <li>Logically (i.e. buffer index) consecutive + * <li>Physically (i.e. offset in the file) consecutive + * </ul> + * + * <p>The following example illustrates some physically continuous buffers in a file and regions + * upon them, where `x-y` denotes buffer from subpartition x with buffer index y, and `()` + * denotes a region. + * + * <p>(1-1, 1-2), (2-1), (2-2, 2-3), (1-5, 1-6), (1-4) + * + * <p>Note: The file may not contain all the buffers. E.g., 1-3 is missing in the above example. + * + * <p>Note: Buffers in file may have different orders than their buffer index. E.g., 1-4 comes + * after 1-6 in the above example. + * + * <p>Note: This index may not always maintain the longest possible regions. E.g., 2-1, 2-2, 2-3 + * are in two separate regions. + */ + interface Region { + + /** Get the total size in bytes of this region, including the fields and the buffers. */ + int getSize(); + + /** Get the first buffer index of this region. */ + int getFirstBufferIndex(); + + /** Get the file start offset of this region. */ + long getRegionFileOffset(); + + /** Get the number of buffers in this region. */ + int getNumBuffers(); + + /** + * Whether the current region contain the buffer. + * + * @param bufferIndex the specific buffer index + */ + boolean containBuffer(int bufferIndex); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManager.java similarity index 77% rename from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManager.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManager.java index e0a413a5c9f..2e3e6064a82 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManager.java @@ -16,23 +16,22 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.partition.hybrid; - -import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; +package org.apache.flink.runtime.io.network.partition.hybrid.index; import java.io.IOException; import java.nio.file.Path; import java.util.function.BiConsumer; /** This class is responsible for spilling region to disk and managing these spilled regions. */ -public interface HsFileDataIndexSpilledRegionManager extends AutoCloseable { +public interface FileDataIndexSpilledRegionManager<T extends FileDataIndexRegionHelper.Region> + extends AutoCloseable { /** * Write this region to index file. If target region already spilled, overwrite it. * * @param subpartition the subpartition id of this region. * @param region the region to be spilled to index file. */ - void appendOrOverwriteRegion(int subpartition, InternalRegion region) throws IOException; + void appendOrOverwriteRegion(int subpartition, T region) throws IOException; /** * Find the region contains target bufferIndex and belong to target subpartition. @@ -48,11 +47,11 @@ public interface HsFileDataIndexSpilledRegionManager extends AutoCloseable { /** Close this spilled region manager. */ void close() throws IOException; - /** Factory of {@link HsFileDataIndexSpilledRegionManager}. */ - interface Factory { - HsFileDataIndexSpilledRegionManager create( + /** Factory of {@link FileDataIndexSpilledRegionManager}. */ + interface Factory<T extends FileDataIndexRegionHelper.Region> { + FileDataIndexSpilledRegionManager<T> create( int numSubpartitions, Path indexFilePath, - BiConsumer<Integer, InternalRegion> cacheRegionConsumer); + BiConsumer<Integer, T> cacheRegionConsumer); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManagerImpl.java new file mode 100644 index 00000000000..d468e2f2dd2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManagerImpl.java @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.index; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.ExceptionUtils; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.TreeMap; +import java.util.function.BiConsumer; + +/** + * Default implementation of {@link FileDataIndexSpilledRegionManager}. This manager will handle and + * spill regions in the following way: + * + * <ul> + * <li>All regions will be written to the same file, namely index file. + * <li>Multiple regions belonging to the same subpartition form a region group. + * <li>The regions in the same region group have no special relationship, but are only related to + * the order in which they are spilled. + * <li>Each region group is independent. Even if the previous region group is not full, the next + * region group can still be allocated. + * <li>If a region has been written to the index file already, spill it again will overwrite the + * previous region. + * <li>The very large region will monopolize a single region group. + * </ul> + * + * <p>The relationships between index file and region group are shown below. + * + * <pre> + * + * - - - - - - - - - Index File - - — - - - - - - - - - + * | | + * | - - — -RegionGroup1 - - - - RegionGroup2- - - - | + * ||SP1 R1||SP1 R2| Free | |SP2 R3| SP2 R1| SP2 R2 | | + * | - - - - - - - - - - - - - - - - - - - - - - - - | + * | | + * | - - - - - - - -RegionGroup3 - - - - - | + * || Big Region | | + * | - - - - - - - - - - - - - - - - - - - | + * - - - - - - - - - - - - - - - - - - - - - -- - - - - + * </pre> + */ +public class FileDataIndexSpilledRegionManagerImpl<T extends FileDataIndexRegionHelper.Region> + implements FileDataIndexSpilledRegionManager<T> { + + /** + * List of subpartition's region group meta. Each element is a treeMap contains all {@link + * RegionGroup}'s of specific subpartition corresponding to the subscript. The value of this + * treeMap is a {@link RegionGroup}, and the key is minBufferIndex of this region group. Only + * finished(i.e. no longer appended) region group will be put to here. + */ + private final List<TreeMap<Integer, RegionGroup>> subpartitionFinishedRegionGroupMetas; + + private FileChannel channel; + + /** The Offset of next region group, new region group will start from this offset. */ + private long nextRegionGroupOffset = 0L; + + private final long[] subpartitionCurrentOffset; + + /** Free space of every subpartition's current region group. */ + private final int[] subpartitionFreeSpaceInBytes; + + /** Metadata of every subpartition's current region group. */ + private final RegionGroup[] currentRegionGroup; + + /** + * Default size of region group. If the size of a region is larger than this value, it will be + * allocated and occupy a single region group. + */ + private final int regionGroupSizeInBytes; + + /** + * This consumer is used to load region to cache. The first parameter is subpartition id, and + * second parameter is the region to load. + */ + private final BiConsumer<Integer, T> cacheRegionConsumer; + + private final FileDataIndexRegionHelper<T> fileDataIndexRegionHelper; + + /** + * When region in region group needs to be loaded to cache, whether to load all regions of the + * entire region group. + */ + private final boolean loadEntireRegionGroupToCache; + + public FileDataIndexSpilledRegionManagerImpl( + int numSubpartitions, + Path indexFilePath, + int regionGroupSizeInBytes, + long maxCacheCapacity, + int regionHeaderSize, + BiConsumer<Integer, T> cacheRegionConsumer, + FileDataIndexRegionHelper<T> fileDataIndexRegionHelper) { + try { + this.channel = + FileChannel.open( + indexFilePath, + StandardOpenOption.CREATE_NEW, + StandardOpenOption.READ, + StandardOpenOption.WRITE); + } catch (IOException e) { + ExceptionUtils.rethrow(e); + } + this.loadEntireRegionGroupToCache = + shouldLoadEntireRegionGroupToCache( + numSubpartitions, + regionGroupSizeInBytes, + maxCacheCapacity, + regionHeaderSize); + this.subpartitionFinishedRegionGroupMetas = new ArrayList<>(numSubpartitions); + this.subpartitionCurrentOffset = new long[numSubpartitions]; + this.subpartitionFreeSpaceInBytes = new int[numSubpartitions]; + this.currentRegionGroup = new RegionGroup[numSubpartitions]; + for (int i = 0; i < numSubpartitions; i++) { + subpartitionFinishedRegionGroupMetas.add(new TreeMap<>()); + } + this.cacheRegionConsumer = cacheRegionConsumer; + this.fileDataIndexRegionHelper = fileDataIndexRegionHelper; + this.regionGroupSizeInBytes = regionGroupSizeInBytes; + } + + @Override + public long findRegion(int subpartition, int bufferIndex, boolean loadToCache) { + // first of all, find the region from current writing region group. + RegionGroup regionGroup = currentRegionGroup[subpartition]; + if (regionGroup != null) { + long regionOffset = + findRegionInRegionGroup(subpartition, bufferIndex, regionGroup, loadToCache); + if (regionOffset != -1) { + return regionOffset; + } + } + + // next, find the region from finished region groups. + TreeMap<Integer, RegionGroup> subpartitionRegionGroupMetaTreeMap = + subpartitionFinishedRegionGroupMetas.get(subpartition); + // all region groups with a minBufferIndex less than or equal to this target buffer index + // may contain the target region. + for (RegionGroup meta : + subpartitionRegionGroupMetaTreeMap.headMap(bufferIndex, true).values()) { + long regionOffset = + findRegionInRegionGroup(subpartition, bufferIndex, meta, loadToCache); + if (regionOffset != -1) { + return regionOffset; + } + } + return -1; + } + + private long findRegionInRegionGroup( + int subpartition, int bufferIndex, RegionGroup meta, boolean loadToCache) { + if (bufferIndex <= meta.getMaxBufferIndex()) { + try { + return readRegionGroupAndLoadToCacheIfNeeded( + subpartition, bufferIndex, meta, loadToCache); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + // -1 indicates that target region is not founded from this region group + return -1; + } + + private long readRegionGroupAndLoadToCacheIfNeeded( + int subpartition, int bufferIndex, RegionGroup meta, boolean loadToCache) + throws IOException { + // read all regions belong to this region group. + List<Tuple2<T, Long>> regionAndOffsets = + readRegionGroup(meta.getOffset(), meta.getNumRegions()); + // -1 indicates that target region is not founded from this region group. + long targetRegionOffset = -1; + T targetRegion = null; + // traverse all regions to find target. + Iterator<Tuple2<T, Long>> it = regionAndOffsets.iterator(); + while (it.hasNext()) { + Tuple2<T, Long> regionAndOffset = it.next(); + T region = regionAndOffset.f0; + // whether the region contains this buffer. + if (region.containBuffer(bufferIndex)) { + // target region is founded. + targetRegion = region; + targetRegionOffset = regionAndOffset.f1; + it.remove(); + } + } + + // target region is founded and need to load to cache. + if (targetRegion != null && loadToCache) { + if (loadEntireRegionGroupToCache) { + // first of all, load all regions except target to cache. + regionAndOffsets.forEach( + (regionAndOffsetTuple) -> + cacheRegionConsumer.accept(subpartition, regionAndOffsetTuple.f0)); + // load target region to cache in the end, this is to prevent the target + // from being eliminated. + cacheRegionConsumer.accept(subpartition, targetRegion); + } else { + // only load target region to cache. + cacheRegionConsumer.accept(subpartition, targetRegion); + } + } + // return the offset of target region. + return targetRegionOffset; + } + + @Override + public void appendOrOverwriteRegion(int subpartition, T newRegion) throws IOException { + // This method will only be called when we want to eliminate a region. We can't let the + // region be reloaded into the cache, otherwise it will lead to an infinite loop. + long oldRegionOffset = findRegion(subpartition, newRegion.getFirstBufferIndex(), false); + if (oldRegionOffset != -1) { + // if region is already exists in file, overwrite it. + writeRegionToOffset(oldRegionOffset, newRegion); + } else { + // otherwise, append region to region group. + appendRegion(subpartition, newRegion); + } + } + + @Override + public void close() throws IOException { + if (channel != null) { + channel.close(); + } + } + + private static boolean shouldLoadEntireRegionGroupToCache( + int numSubpartitions, + int regionGroupSizeInBytes, + long maxCacheCapacity, + int regionHeaderSize) { + // If the cache can put at least two region groups (one for reading and one for writing) for + // each subpartition, it is reasonable to load the entire region group into memory, which + // can improve the cache hit rate. On the contrary, if the cache capacity is small, loading + // a large number of regions will lead to performance degradation,only the target region + // should be loaded. + return ((long) 2 * numSubpartitions * regionGroupSizeInBytes) / regionHeaderSize + <= maxCacheCapacity; + } + + private void appendRegion(int subpartition, T region) throws IOException { + int regionSize = region.getSize(); + // check whether we have enough space to append this region. + if (subpartitionFreeSpaceInBytes[subpartition] < regionSize) { + // No enough free space, start a new region group. Note that if region is larger than + // region group's size, this will start a new region group only contains the big region. + startNewRegionGroup(subpartition, Math.max(regionSize, regionGroupSizeInBytes)); + } + // spill this region to current offset of file index. + writeRegionToOffset(subpartitionCurrentOffset[subpartition], region); + // a new region was appended to region group, update it. + updateRegionGroup(subpartition, region); + } + + private void writeRegionToOffset(long offset, T region) throws IOException { + channel.position(offset); + fileDataIndexRegionHelper.writeRegionToFile(channel, region); + } + + private void startNewRegionGroup(int subpartition, int newRegionGroupSize) { + RegionGroup oldRegionGroup = currentRegionGroup[subpartition]; + currentRegionGroup[subpartition] = new RegionGroup(nextRegionGroupOffset); + subpartitionCurrentOffset[subpartition] = nextRegionGroupOffset; + nextRegionGroupOffset += newRegionGroupSize; + subpartitionFreeSpaceInBytes[subpartition] = newRegionGroupSize; + if (oldRegionGroup != null) { + // put the finished region group to subpartitionFinishedRegionGroupMetas. + subpartitionFinishedRegionGroupMetas + .get(subpartition) + .put(oldRegionGroup.minBufferIndex, oldRegionGroup); + } + } + + private void updateRegionGroup(int subpartition, T region) { + int regionSize = region.getSize(); + subpartitionFreeSpaceInBytes[subpartition] -= regionSize; + subpartitionCurrentOffset[subpartition] += regionSize; + RegionGroup regionGroup = currentRegionGroup[subpartition]; + regionGroup.addRegion( + region.getFirstBufferIndex(), + region.getFirstBufferIndex() + region.getNumBuffers() - 1); + } + + /** + * Read region group from index file. + * + * @param offset offset of this region group. + * @param numRegions number of regions of this region group. + * @return List of all regions and its offset belong to this region group. + */ + private List<Tuple2<T, Long>> readRegionGroup(long offset, int numRegions) throws IOException { + List<Tuple2<T, Long>> regionAndOffsets = new ArrayList<>(); + for (int i = 0; i < numRegions; i++) { + T region = fileDataIndexRegionHelper.readRegionFromFile(channel, offset); + regionAndOffsets.add(Tuple2.of(region, offset)); + offset += region.getSize(); + } + return regionAndOffsets; + } + + /** + * Metadata of spilled regions region group. When a region group is finished(i.e. no longer + * appended), its corresponding {@link RegionGroup} becomes immutable. + */ + private static class RegionGroup { + /** + * Minimum buffer index of this region group. It is the smallest bufferIndex(inclusive) in + * all regions belong to this region group. + */ + private int minBufferIndex; + + /** + * Maximum buffer index of this region group. It is the largest bufferIndex(inclusive) in + * all regions belong to this region group. + */ + private int maxBufferIndex; + + /** Number of regions belong to this region group. */ + private int numRegions; + + /** The index file offset of this region group. */ + private final long offset; + + public RegionGroup(long offset) { + this.offset = offset; + this.minBufferIndex = Integer.MAX_VALUE; + this.maxBufferIndex = 0; + this.numRegions = 0; + } + + public int getMaxBufferIndex() { + return maxBufferIndex; + } + + public long getOffset() { + return offset; + } + + public int getNumRegions() { + return numRegions; + } + + public void addRegion(int firstBufferIndexOfRegion, int maxBufferIndexOfRegion) { + if (firstBufferIndexOfRegion < minBufferIndex) { + this.minBufferIndex = firstBufferIndexOfRegion; + } + if (maxBufferIndexOfRegion > maxBufferIndex) { + this.maxBufferIndex = maxBufferIndexOfRegion; + } + this.numRegions++; + } + } + + /** Factory of {@link FileDataIndexSpilledRegionManager}. */ + public static class Factory<T extends FileDataIndexRegionHelper.Region> + implements FileDataIndexSpilledRegionManager.Factory<T> { + private final int regionGroupSizeInBytes; + + private final long maxCacheCapacity; + + private final int regionHeaderSize; + + private final FileDataIndexRegionHelper<T> fileDataIndexRegionHelper; + + public Factory( + int regionGroupSizeInBytes, + long maxCacheCapacity, + int regionHeaderSize, + FileDataIndexRegionHelper<T> fileDataIndexRegionHelper) { + this.regionGroupSizeInBytes = regionGroupSizeInBytes; + this.maxCacheCapacity = maxCacheCapacity; + this.regionHeaderSize = regionHeaderSize; + this.fileDataIndexRegionHelper = fileDataIndexRegionHelper; + } + + @Override + public FileDataIndexSpilledRegionManager<T> create( + int numSubpartitions, + Path indexFilePath, + BiConsumer<Integer, T> cacheRegionConsumer) { + return new FileDataIndexSpilledRegionManagerImpl<>( + numSubpartitions, + indexFilePath, + regionGroupSizeInBytes, + maxCacheCapacity, + regionHeaderSize, + cacheRegionConsumer, + fileDataIndexRegionHelper); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java similarity index 79% rename from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtils.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java index 08726fb8afb..443bdbf863e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.partition.hybrid; +package org.apache.flink.runtime.io.network.partition.hybrid.index; import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil; import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; @@ -26,8 +26,8 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.FileChannel; -/** Utils for read and write {@link InternalRegion}. */ -public class InternalRegionWriteReadUtils { +/** Utils for read and write {@link FileDataIndexRegionHelper.Region}. */ +public class FileRegionWriteReadUtils { /** * Allocate a buffer with specific size and configure it to native order. @@ -44,18 +44,21 @@ public class InternalRegionWriteReadUtils { /** * Write {@link InternalRegion} to {@link FileChannel}. * + * <p>Note that this type of region's length may be variable because it contains an array to + * indicate each buffer's release state. + * * @param channel the file's channel to write. * @param headerBuffer the buffer to write {@link InternalRegion}'s header. * @param region the region to be written to channel. */ - public static void writeRegionToFile( + public static void writeHsInternalRegionToFile( FileChannel channel, ByteBuffer headerBuffer, InternalRegion region) throws IOException { // write header buffer. headerBuffer.clear(); headerBuffer.putInt(region.getFirstBufferIndex()); headerBuffer.putInt(region.getNumBuffers()); - headerBuffer.putLong(region.getFirstBufferOffset()); + headerBuffer.putLong(region.getRegionFileOffset()); headerBuffer.flip(); // write payload buffer. @@ -76,22 +79,25 @@ public class InternalRegionWriteReadUtils { /** * Read {@link InternalRegion} from {@link FileChannel}. * + * <p>Note that this type of region's length may be variable because it contains an array to + * indicate each buffer's release state. + * * @param channel the channel to read. * @param headerBuffer the buffer to read {@link InternalRegion}'s header. - * @param position position to start read. + * @param fileOffset the file offset to start read. * @return the {@link InternalRegion} that read from this channel. */ - public static InternalRegion readRegionFromFile( - FileChannel channel, ByteBuffer headerBuffer, long position) throws IOException { + public static InternalRegion readHsInternalRegionFromFile( + FileChannel channel, ByteBuffer headerBuffer, long fileOffset) throws IOException { headerBuffer.clear(); - BufferReaderWriterUtil.readByteBufferFully(channel, headerBuffer, position); + BufferReaderWriterUtil.readByteBufferFully(channel, headerBuffer, fileOffset); headerBuffer.flip(); int firstBufferIndex = headerBuffer.getInt(); int numBuffers = headerBuffer.getInt(); long firstBufferOffset = headerBuffer.getLong(); ByteBuffer payloadBuffer = allocateAndConfigureBuffer(numBuffers); BufferReaderWriterUtil.readByteBufferFully( - channel, payloadBuffer, position + InternalRegion.HEADER_SIZE); + channel, payloadBuffer, fileOffset + InternalRegion.HEADER_SIZE); boolean[] released = new boolean[numBuffers]; payloadBuffer.flip(); for (int i = 0; i < numBuffers; i++) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java index 2e49c60699a..856065a0421 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java @@ -105,7 +105,7 @@ public class NettyShuffleEnvironmentConfiguration { private final int maxOverdraftBuffersPerGate; - private final int hybridShuffleSpilledIndexSegmentSize; + private final int hybridShuffleSpilledIndexRegionGroupSize; private final long hybridShuffleNumRetainedInMemoryRegionsMax; @@ -134,7 +134,7 @@ public class NettyShuffleEnvironmentConfiguration { int maxNumberOfConnections, boolean connectionReuseEnabled, int maxOverdraftBuffersPerGate, - int hybridShuffleSpilledIndexSegmentSize, + int hybridShuffleSpilledIndexRegionGroupSize, long hybridShuffleNumRetainedInMemoryRegionsMax, @Nullable TieredStorageConfiguration tieredStorageConfiguration) { @@ -160,7 +160,7 @@ public class NettyShuffleEnvironmentConfiguration { this.maxNumberOfConnections = maxNumberOfConnections; this.connectionReuseEnabled = connectionReuseEnabled; this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate; - this.hybridShuffleSpilledIndexSegmentSize = hybridShuffleSpilledIndexSegmentSize; + this.hybridShuffleSpilledIndexRegionGroupSize = hybridShuffleSpilledIndexRegionGroupSize; this.hybridShuffleNumRetainedInMemoryRegionsMax = hybridShuffleNumRetainedInMemoryRegionsMax; this.tieredStorageConfiguration = tieredStorageConfiguration; @@ -264,8 +264,8 @@ public class NettyShuffleEnvironmentConfiguration { return hybridShuffleNumRetainedInMemoryRegionsMax; } - public int getHybridShuffleSpilledIndexSegmentSize() { - return hybridShuffleSpilledIndexSegmentSize; + public int getHybridShuffleSpilledIndexRegionGroupSize() { + return hybridShuffleSpilledIndexRegionGroupSize; } public TieredStorageConfiguration getTieredStorageConfiguration() { @@ -376,7 +376,8 @@ public class NettyShuffleEnvironmentConfiguration { int hybridShuffleSpilledIndexSegmentSize = configuration.get( - NettyShuffleEnvironmentOptions.HYBRID_SHUFFLE_SPILLED_INDEX_SEGMENT_SIZE); + NettyShuffleEnvironmentOptions + .HYBRID_SHUFFLE_SPILLED_INDEX_REGION_GROUP_SIZE); long hybridShuffleNumRetainedInMemoryRegionsMax = configuration.get( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java index 279f12d2782..bbb88314b4c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java @@ -25,12 +25,15 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; +import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexRegionHelper; +import org.apache.flink.runtime.io.network.partition.hybrid.index.TestingFileDataIndexRegion; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; import java.util.List; +import static org.apache.flink.runtime.io.network.partition.hybrid.index.TestingFileDataIndexRegion.getContainBufferFunction; import static org.assertj.core.api.Assertions.assertThat; /** Test utils for hybrid shuffle mode. */ @@ -73,37 +76,55 @@ public class HybridShuffleTestUtils { return new HsOutputMetrics(new TestCounter(), new TestCounter()); } - public static InternalRegion createSingleUnreleasedRegion( + public static TestingFileDataIndexRegion createSingleTestRegion( int firstBufferIndex, long firstBufferOffset, int numBuffersPerRegion) { - return new InternalRegion( - firstBufferIndex, - firstBufferOffset, - numBuffersPerRegion, - new boolean[numBuffersPerRegion]); + return new TestingFileDataIndexRegion.Builder() + .setGetSizeSupplier(() -> TestingFileDataIndexRegion.REGION_SIZE) + .setContainBufferFunction( + bufferIndex -> + getContainBufferFunction( + bufferIndex, firstBufferIndex, numBuffersPerRegion)) + .setGetFirstBufferIndexSupplier(() -> firstBufferIndex) + .setGetRegionFileOffsetSupplier(() -> firstBufferOffset) + .setGetNumBuffersSupplier(() -> numBuffersPerRegion) + .build(); } - public static List<InternalRegion> createAllUnreleasedRegions( + public static List<TestingFileDataIndexRegion> createTestRegions( int firstBufferIndex, long firstBufferOffset, int numBuffersPerRegion, int numRegions) { - List<InternalRegion> regions = new ArrayList<>(); + List<TestingFileDataIndexRegion> regions = new ArrayList<>(); int bufferIndex = firstBufferIndex; long bufferOffset = firstBufferOffset; + int numRegionSize = TestingFileDataIndexRegion.REGION_SIZE; for (int i = 0; i < numRegions; i++) { + final int currentBufferIndex = bufferIndex; + final long currentBufferOffset = bufferOffset; regions.add( - new InternalRegion( - bufferIndex, - bufferOffset, - numBuffersPerRegion, - new boolean[numBuffersPerRegion])); + new TestingFileDataIndexRegion.Builder() + .setGetSizeSupplier(() -> numRegionSize) + .setGetFirstBufferIndexSupplier(() -> currentBufferIndex) + .setGetRegionFileOffsetSupplier(() -> currentBufferOffset) + .setGetNumBuffersSupplier(() -> numBuffersPerRegion) + .setContainBufferFunction( + index -> + getContainBufferFunction( + index, firstBufferIndex, numBuffersPerRegion)) + .build()); bufferIndex += numBuffersPerRegion; bufferOffset += bufferOffset; } return regions; } - public static void assertRegionEquals(InternalRegion expected, InternalRegion region) { + public static void assertRegionEquals( + FileDataIndexRegionHelper.Region expected, FileDataIndexRegionHelper.Region region) { assertThat(region.getFirstBufferIndex()).isEqualTo(expected.getFirstBufferIndex()); - assertThat(region.getFirstBufferOffset()).isEqualTo(expected.getFirstBufferOffset()); - assertThat(region.getNumBuffers()).isEqualTo(region.getNumBuffers()); - assertThat(region.getReleased()).isEqualTo(region.getReleased()); + assertThat(region.getRegionFileOffset()).isEqualTo(expected.getRegionFileOffset()); + assertThat(region.getNumBuffers()).isEqualTo(expected.getNumBuffers()); + if (expected instanceof InternalRegion) { + assertThat(region).isInstanceOf(InternalRegion.class); + assertThat(((InternalRegion) region).getReleased()) + .isEqualTo(((InternalRegion) expected).getReleased()); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndexSpilledRegionManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndexSpilledRegionManager.java index 0c287c9fb25..2971214c0bf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndexSpilledRegionManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndexSpilledRegionManager.java @@ -18,7 +18,8 @@ package org.apache.flink.runtime.io.network.partition.hybrid; -import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; +import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexRegionHelper; +import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexSpilledRegionManager; import javax.annotation.Nullable; @@ -29,17 +30,17 @@ import java.util.List; import java.util.TreeMap; import java.util.function.BiConsumer; -/** Mock {@link HsFileDataIndexSpilledRegionManager} for testing. */ -public class TestingFileDataIndexSpilledRegionManager - implements HsFileDataIndexSpilledRegionManager { - private final List<TreeMap<Integer, InternalRegion>> regions; +/** Mock {@link FileDataIndexSpilledRegionManager} for testing. */ +public class TestingFileDataIndexSpilledRegionManager<T extends FileDataIndexRegionHelper.Region> + implements FileDataIndexSpilledRegionManager<T> { + private final List<TreeMap<Integer, T>> regions; - private final BiConsumer<Integer, InternalRegion> cacheRegionConsumer; + private final BiConsumer<Integer, T> cacheRegionConsumer; private int findRegionInvoked = 0; public TestingFileDataIndexSpilledRegionManager( - int numSubpartitions, BiConsumer<Integer, InternalRegion> cacheRegionConsumer) { + int numSubpartitions, BiConsumer<Integer, T> cacheRegionConsumer) { this.regions = new ArrayList<>(); this.cacheRegionConsumer = cacheRegionConsumer; for (int i = 0; i < numSubpartitions; i++) { @@ -48,7 +49,7 @@ public class TestingFileDataIndexSpilledRegionManager } @Nullable - public InternalRegion getRegion(int subpartition, int bufferIndex) { + public T getRegion(int subpartition, int bufferIndex) { return regions.get(subpartition).get(bufferIndex); } @@ -61,15 +62,14 @@ public class TestingFileDataIndexSpilledRegionManager } @Override - public void appendOrOverwriteRegion(int subpartition, InternalRegion region) - throws IOException { + public void appendOrOverwriteRegion(int subpartition, T region) { regions.get(subpartition).put(region.getFirstBufferIndex(), region); } @Override public long findRegion(int subpartition, int bufferIndex, boolean loadToCache) { findRegionInvoked++; - InternalRegion region = regions.get(subpartition).get(bufferIndex); + T region = regions.get(subpartition).get(bufferIndex); if (region == null) { return -1; } else { @@ -87,22 +87,22 @@ public class TestingFileDataIndexSpilledRegionManager } /** Factory of {@link TestingFileDataIndexSpilledRegionManager}. */ - public static class Factory implements HsFileDataIndexSpilledRegionManager.Factory { - public static final Factory INSTANCE = new Factory(); + public static class Factory<T extends FileDataIndexRegionHelper.Region> + implements FileDataIndexSpilledRegionManager.Factory<T> { - public TestingFileDataIndexSpilledRegionManager lastSpilledRegionManager; + public TestingFileDataIndexSpilledRegionManager<T> lastSpilledRegionManager; - public TestingFileDataIndexSpilledRegionManager getLastSpilledRegionManager() { + public TestingFileDataIndexSpilledRegionManager<T> getLastSpilledRegionManager() { return lastSpilledRegionManager; } @Override - public HsFileDataIndexSpilledRegionManager create( + public FileDataIndexSpilledRegionManager<T> create( int numSubpartitions, Path indexFilePath, - BiConsumer<Integer, InternalRegion> cacheRegionConsumer) { - TestingFileDataIndexSpilledRegionManager testingFileDataIndexSpilledRegionManager = - new TestingFileDataIndexSpilledRegionManager( + BiConsumer<Integer, T> cacheRegionConsumer) { + TestingFileDataIndexSpilledRegionManager<T> testingFileDataIndexSpilledRegionManager = + new TestingFileDataIndexSpilledRegionManager<>( numSubpartitions, cacheRegionConsumer); lastSpilledRegionManager = testingFileDataIndexSpilledRegionManager; return testingFileDataIndexSpilledRegionManager; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCacheTest.java similarity index 62% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCacheTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCacheTest.java index 5e1f79dce32..1ffd3620975 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCacheTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCacheTest.java @@ -16,9 +16,9 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.partition.hybrid; +package org.apache.flink.runtime.io.network.partition.hybrid.index; -import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; +import org.apache.flink.runtime.io.network.partition.hybrid.TestingFileDataIndexSpilledRegionManager; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -31,44 +31,44 @@ import java.util.Optional; import java.util.UUID; import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.assertRegionEquals; -import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createAllUnreleasedRegions; +import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createTestRegions; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link HsFileDataIndexCache}. */ -class HsFileDataIndexCacheTest { - private HsFileDataIndexCache indexCache; +/** Tests for {@link FileDataIndexCache}. */ +class FileDataIndexCacheTest { + private FileDataIndexCache<TestingFileDataIndexRegion> indexCache; - private TestingFileDataIndexSpilledRegionManager spilledRegionManager; + private TestingFileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> + spilledRegionManager; private final int numSubpartitions = 1; - private static final int SPILLED_INDEX_SEGMENT_SIZE = 256; - private int numRetainedIndexEntry = 10; @BeforeEach void before(@TempDir Path tmpPath) throws Exception { Path indexFilePath = Files.createFile(tmpPath.resolve(UUID.randomUUID().toString())); + TestingFileDataIndexSpilledRegionManager.Factory<TestingFileDataIndexRegion> + testingSpilledRegionManagerFactory = + new TestingFileDataIndexSpilledRegionManager.Factory<>(); indexCache = - new HsFileDataIndexCache( + new FileDataIndexCache<>( numSubpartitions, indexFilePath, numRetainedIndexEntry, - TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE); - spilledRegionManager = - TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE - .getLastSpilledRegionManager(); + testingSpilledRegionManagerFactory); + spilledRegionManager = testingSpilledRegionManagerFactory.getLastSpilledRegionManager(); } @Test void testPutAndGet() { - indexCache.put(0, createAllUnreleasedRegions(0, 0L, 3, 1)); - Optional<InternalRegion> regionOpt = indexCache.get(0, 0); + indexCache.put(0, createTestRegions(0, 0L, 3, 1)); + Optional<TestingFileDataIndexRegion> regionOpt = indexCache.get(0, 0); assertThat(regionOpt) .hasValueSatisfying( (region) -> { assertThat(region.getFirstBufferIndex()).isEqualTo(0); - assertThat(region.getFirstBufferOffset()).isEqualTo(0); + assertThat(region.getRegionFileOffset()).isEqualTo(0); assertThat(region.getNumBuffers()).isEqualTo(3); }); } @@ -77,38 +77,39 @@ class HsFileDataIndexCacheTest { void testCachedRegionRemovedWhenExceedsRetainedEntry(@TempDir Path tmpPath) throws Exception { numRetainedIndexEntry = 3; Path indexFilePath = Files.createFile(tmpPath.resolve(UUID.randomUUID().toString())); + TestingFileDataIndexSpilledRegionManager.Factory<TestingFileDataIndexRegion> + testingSpilledRegionManagerFactory = + new TestingFileDataIndexSpilledRegionManager.Factory<>(); indexCache = - new HsFileDataIndexCache( + new FileDataIndexCache<>( numSubpartitions, indexFilePath, numRetainedIndexEntry, - TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE); - spilledRegionManager = - TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE - .getLastSpilledRegionManager(); + testingSpilledRegionManagerFactory); + spilledRegionManager = testingSpilledRegionManagerFactory.getLastSpilledRegionManager(); // region 0, 1, 2 - List<InternalRegion> regionList = createAllUnreleasedRegions(0, 0L, 3, 3); + List<TestingFileDataIndexRegion> regionList = createTestRegions(0, 0L, 3, 3); indexCache.put(0, regionList); assertThat(spilledRegionManager.getSpilledRegionSize(0)).isZero(); // number of regions exceeds numRetainedIndexEntry, trigger cache purge. - indexCache.put(0, createAllUnreleasedRegions(9, 9L, 3, 1)); + indexCache.put(0, createTestRegions(9, 9L, 3, 1)); assertThat(spilledRegionManager.getSpilledRegionSize(0)).isEqualTo(1); - InternalRegion region = spilledRegionManager.getRegion(0, 0); + TestingFileDataIndexRegion region = spilledRegionManager.getRegion(0, 0); assertThat(region).isNotNull(); assertRegionEquals(region, regionList.get(0)); // number of regions exceeds numRetainedIndexEntry, trigger cache purge. - indexCache.put(0, createAllUnreleasedRegions(12, 12L, 3, 1)); + indexCache.put(0, createTestRegions(12, 12L, 3, 1)); assertThat(spilledRegionManager.getSpilledRegionSize(0)).isEqualTo(2); - InternalRegion region2 = spilledRegionManager.getRegion(0, 3); + TestingFileDataIndexRegion region2 = spilledRegionManager.getRegion(0, 3); assertThat(region2).isNotNull(); assertRegionEquals(region2, regionList.get(1)); } @Test void testGetNonExistentRegion() { - Optional<InternalRegion> region = indexCache.get(0, 0); + Optional<TestingFileDataIndexRegion> region = indexCache.get(0, 0); // get a non-existent region. assertThat(region).isNotPresent(); } @@ -117,21 +118,22 @@ class HsFileDataIndexCacheTest { void testCacheLoadSpilledRegion(@TempDir Path tmpPath) throws Exception { numRetainedIndexEntry = 1; Path indexFilePath = Files.createFile(tmpPath.resolve(UUID.randomUUID().toString())); + TestingFileDataIndexSpilledRegionManager.Factory<TestingFileDataIndexRegion> + testingSpilledRegionManagerFactory = + new TestingFileDataIndexSpilledRegionManager.Factory<>(); indexCache = - new HsFileDataIndexCache( + new FileDataIndexCache<>( numSubpartitions, indexFilePath, numRetainedIndexEntry, - TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE); - spilledRegionManager = - TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE - .getLastSpilledRegionManager(); + testingSpilledRegionManagerFactory); + spilledRegionManager = testingSpilledRegionManagerFactory.getLastSpilledRegionManager(); - indexCache.put(0, createAllUnreleasedRegions(0, 0L, 1, 2)); + indexCache.put(0, createTestRegions(0, 0L, 1, 2)); assertThat(spilledRegionManager.getSpilledRegionSize(0)).isEqualTo(1); assertThat(spilledRegionManager.getFindRegionInvoked()).isZero(); - Optional<InternalRegion> regionOpt = indexCache.get(0, 0); + Optional<TestingFileDataIndexRegion> regionOpt = indexCache.get(0, 0); assertThat(spilledRegionManager.getSpilledRegionSize(0)).isEqualTo(2); assertThat(regionOpt).isPresent(); assertThat(spilledRegionManager.getFindRegionInvoked()).isEqualTo(1); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManagerImplTest.java similarity index 51% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImplTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManagerImplTest.java index bb4c31a4f2d..e488a7f2967 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManagerImplTest.java @@ -16,9 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.partition.hybrid; - -import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; +package org.apache.flink.runtime.io.network.partition.hybrid.index; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -34,13 +32,13 @@ import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.assertRegionEquals; -import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createAllUnreleasedRegions; -import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createSingleUnreleasedRegion; -import static org.apache.flink.runtime.io.network.partition.hybrid.InternalRegionWriteReadUtils.allocateAndConfigureBuffer; +import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createSingleTestRegion; +import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createTestRegions; +import static org.apache.flink.runtime.io.network.partition.hybrid.index.TestingFileDataIndexRegion.readRegionFromFile; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link HsFileDataIndexSpilledRegionManagerImpl}. */ -class HsFileDataIndexSpilledRegionManagerImplTest { +/** Tests for {@link FileDataIndexSpilledRegionManagerImpl}. */ +class FileDataIndexSpilledRegionManagerImplTest { private Path indexFilePath; @BeforeEach @@ -51,7 +49,7 @@ class HsFileDataIndexSpilledRegionManagerImplTest { @Test void testFindNonExistentRegion() throws Exception { CompletableFuture<Void> cachedRegionFuture = new CompletableFuture<>(); - try (HsFileDataIndexSpilledRegionManager spilledRegionManager = + try (FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> spilledRegionManager = createSpilledRegionManager( (ignore1, ignore2) -> cachedRegionFuture.complete(null))) { long regionOffset = spilledRegionManager.findRegion(0, 0, true); @@ -63,114 +61,100 @@ class HsFileDataIndexSpilledRegionManagerImplTest { @Test void testAppendOrOverwriteRegion() throws Exception { CompletableFuture<Void> cachedRegionFuture = new CompletableFuture<>(); - try (HsFileDataIndexSpilledRegionManager spilledRegionManager = + try (FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> spilledRegionManager = createSpilledRegionManager( (ignore1, ignore2) -> cachedRegionFuture.complete(null))) { - InternalRegion region = createSingleUnreleasedRegion(0, 0L, 1); + TestingFileDataIndexRegion region = createSingleTestRegion(0, 0L, 1); // append region to index file. spilledRegionManager.appendOrOverwriteRegion(0, region); assertThat(cachedRegionFuture).isNotCompleted(); FileChannel indexFileChannel = FileChannel.open(indexFilePath, StandardOpenOption.READ); - InternalRegion readRegion = - InternalRegionWriteReadUtils.readRegionFromFile( - indexFileChannel, - allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE), - 0L); + TestingFileDataIndexRegion readRegion = readRegionFromFile(indexFileChannel, 0L); assertRegionEquals(readRegion, region); // new region must have the same size of old region. - InternalRegion newRegion = createSingleUnreleasedRegion(0, 10L, 1); + TestingFileDataIndexRegion newRegion = createSingleTestRegion(0, 10L, 1); // overwrite old region. spilledRegionManager.appendOrOverwriteRegion(0, newRegion); // appendOrOverwriteRegion will not trigger cache load. assertThat(cachedRegionFuture).isNotCompleted(); - InternalRegion readNewRegion = - InternalRegionWriteReadUtils.readRegionFromFile( - indexFileChannel, - allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE), - 0L); + TestingFileDataIndexRegion readNewRegion = readRegionFromFile(indexFileChannel, 0L); assertRegionEquals(readNewRegion, newRegion); } } @Test - void testWriteMoreThanOneSegment() throws Exception { - List<InternalRegion> regions = createAllUnreleasedRegions(0, 0L, 2, 2); - int segmentSize = regions.stream().mapToInt(InternalRegion::getSize).sum() + 1; - try (HsFileDataIndexSpilledRegionManager spilledRegionManager = - createSpilledRegionManager(segmentSize, (ignore1, ignore2) -> {})) { + void testWriteMoreThanOneRegionGroup() throws Exception { + List<TestingFileDataIndexRegion> regions = createTestRegions(0, 0L, 2, 2); + int regionGroupSize = + regions.stream().mapToInt(TestingFileDataIndexRegion::getSize).sum() + 1; + try (FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> spilledRegionManager = + createSpilledRegionManager(regionGroupSize, (ignore1, ignore2) -> {})) { spilledRegionManager.appendOrOverwriteRegion(0, regions.get(0)); spilledRegionManager.appendOrOverwriteRegion(0, regions.get(1)); - // segment has no enough space, will start new segment. - InternalRegion regionInNewSegment = createSingleUnreleasedRegion(4, 4L, 2); - spilledRegionManager.appendOrOverwriteRegion(0, regionInNewSegment); + // region group has no enough space, will start new region group. + TestingFileDataIndexRegion regionInNewRegionGroup = createSingleTestRegion(4, 4L, 2); + spilledRegionManager.appendOrOverwriteRegion(0, regionInNewRegionGroup); FileChannel indexFileChannel = FileChannel.open(indexFilePath, StandardOpenOption.READ); - InternalRegion readRegion = - InternalRegionWriteReadUtils.readRegionFromFile( + TestingFileDataIndexRegion readRegion = + readRegionFromFile( indexFileChannel, - allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE), - // offset is segment size instead of two regions size to prove that new - // segment is started. - segmentSize); - assertRegionEquals(readRegion, regionInNewSegment); + // offset is region group size instead of two regions size to prove that + // new region group is started. + regionGroupSize); + assertRegionEquals(readRegion, regionInNewRegionGroup); } } @Test void testWriteBigRegion() throws Exception { - int segmentSize = 4; - try (HsFileDataIndexSpilledRegionManager spilledRegionManager = - createSpilledRegionManager(segmentSize, (ignore1, ignore2) -> {})) { - List<InternalRegion> regions = createAllUnreleasedRegions(0, 0L, 1, 2); - InternalRegion region1 = regions.get(0); - InternalRegion region2 = regions.get(1); - assertThat(region1.getSize()).isGreaterThan(segmentSize); - assertThat(region2.getSize()).isGreaterThan(segmentSize); + int regionGroupSize = 4; + try (FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> spilledRegionManager = + createSpilledRegionManager(regionGroupSize, (ignore1, ignore2) -> {})) { + List<TestingFileDataIndexRegion> regions = createTestRegions(0, 0L, 1, 2); + TestingFileDataIndexRegion region1 = regions.get(0); + TestingFileDataIndexRegion region2 = regions.get(1); + assertThat(region1.getSize()).isGreaterThan(regionGroupSize); + assertThat(region2.getSize()).isGreaterThan(regionGroupSize); spilledRegionManager.appendOrOverwriteRegion(0, region1); spilledRegionManager.appendOrOverwriteRegion(0, region2); FileChannel indexFileChannel = FileChannel.open(indexFilePath, StandardOpenOption.READ); - InternalRegion readRegion1 = - InternalRegionWriteReadUtils.readRegionFromFile( - indexFileChannel, - allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE), - 0L); + TestingFileDataIndexRegion readRegion1 = readRegionFromFile(indexFileChannel, 0L); assertRegionEquals(readRegion1, region1); - InternalRegion readRegion2 = - InternalRegionWriteReadUtils.readRegionFromFile( - indexFileChannel, - allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE), - readRegion1.getSize()); + TestingFileDataIndexRegion readRegion2 = + readRegionFromFile(indexFileChannel, readRegion1.getSize()); assertRegionEquals(readRegion2, region2); } } @Test - void testFindRegionFirstBufferIndexInMultipleSegments() throws Exception { + void testFindRegionFirstBufferIndexInMultipleRegionGroups() throws Exception { final int numBuffersPerRegion = 2; final int subpartition = 0; - List<InternalRegion> loadedRegions = new ArrayList<>(); - try (HsFileDataIndexSpilledRegionManager spilledRegionManager = + List<TestingFileDataIndexRegion> loadedRegions = new ArrayList<>(); + try (FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> spilledRegionManager = createSpilledRegionManager( - // every segment can store two regions. - (InternalRegion.HEADER_SIZE + numBuffersPerRegion) * 2, + // every region group can store two regions. + TestingFileDataIndexRegion.REGION_SIZE * 2, (ignore, region) -> loadedRegions.add(region))) { - // segment1: region1(0-2), region2(9-11), min:0, max: 11 + // region group1: region1(0-1), region2(9-10), min:0, max: 10 spilledRegionManager.appendOrOverwriteRegion( - subpartition, createSingleUnreleasedRegion(0, 0L, numBuffersPerRegion)); + subpartition, createSingleTestRegion(0, 0L, numBuffersPerRegion)); spilledRegionManager.appendOrOverwriteRegion( - subpartition, createSingleUnreleasedRegion(9, 9L, numBuffersPerRegion)); + subpartition, createSingleTestRegion(9, 9L, numBuffersPerRegion)); - // segment2: region1(2-4), region2(11-13) min: 2, max: 13 - InternalRegion targetRegion = createSingleUnreleasedRegion(2, 2L, 2); + // region group2: region1(2-3), region2(11-12) min: 2, max: 12 + TestingFileDataIndexRegion targetRegion = + createSingleTestRegion(2, 2L, numBuffersPerRegion); spilledRegionManager.appendOrOverwriteRegion(subpartition, targetRegion); spilledRegionManager.appendOrOverwriteRegion( - subpartition, createSingleUnreleasedRegion(11, 11L, 2)); + subpartition, createSingleTestRegion(11, 11L, numBuffersPerRegion)); - // segment3: region1(7-9) min: 7, max: 9 + // region group3: region1(7-8) min: 7, max: 8 spilledRegionManager.appendOrOverwriteRegion( - subpartition, createSingleUnreleasedRegion(7, 7L, 2)); + subpartition, createSingleTestRegion(7, 7L, numBuffersPerRegion)); // find target region long regionOffset = spilledRegionManager.findRegion(subpartition, 3, true); @@ -181,15 +165,27 @@ class HsFileDataIndexSpilledRegionManagerImplTest { } } - private HsFileDataIndexSpilledRegionManager createSpilledRegionManager( - BiConsumer<Integer, InternalRegion> cacheRegionConsumer) { + private FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> + createSpilledRegionManager( + BiConsumer<Integer, TestingFileDataIndexRegion> cacheRegionConsumer) { return createSpilledRegionManager(256, cacheRegionConsumer); } - private HsFileDataIndexSpilledRegionManager createSpilledRegionManager( - int segmentSize, BiConsumer<Integer, InternalRegion> cacheRegionConsumer) { + private FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> + createSpilledRegionManager( + int regionGroupSize, + BiConsumer<Integer, TestingFileDataIndexRegion> cacheRegionConsumer) { int numSubpartitions = 2; - return new HsFileDataIndexSpilledRegionManagerImpl.Factory(segmentSize, Long.MAX_VALUE) + return new FileDataIndexSpilledRegionManagerImpl.Factory<>( + regionGroupSize, + Long.MAX_VALUE, + TestingFileDataIndexRegion.REGION_SIZE, + new TestingFileDataIndexRegionHelper.Builder() + .setReadRegionFromFileFunction( + TestingFileDataIndexRegion::readRegionFromFile) + .setWriteRegionToFileConsumer( + TestingFileDataIndexRegion::writeRegionToFile) + .build()) .create(numSubpartitions, indexFilePath, cacheRegionConsumer); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java similarity index 65% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtilsTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java index e65a8a132d9..fc2cf847002 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java @@ -16,9 +16,9 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.partition.hybrid; +package org.apache.flink.runtime.io.network.partition.hybrid.index; -import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; +import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -34,16 +34,15 @@ import java.util.UUID; import static org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion.HEADER_SIZE; import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.assertRegionEquals; -import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createSingleUnreleasedRegion; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Tests for {@link InternalRegionWriteReadUtils}. */ -class InternalRegionWriteReadUtilsTest { +/** Tests for {@link FileRegionWriteReadUtils}. */ +class FileRegionWriteReadUtilsTest { @Test void testAllocateAndConfigureBuffer() { final int bufferSize = 16; - ByteBuffer buffer = InternalRegionWriteReadUtils.allocateAndConfigureBuffer(bufferSize); + ByteBuffer buffer = FileRegionWriteReadUtils.allocateAndConfigureBuffer(bufferSize); assertThat(buffer.capacity()).isEqualTo(16); assertThat(buffer.limit()).isEqualTo(16); assertThat(buffer.position()).isZero(); @@ -52,27 +51,30 @@ class InternalRegionWriteReadUtilsTest { } @Test - void testReadPrematureEndOfFile(@TempDir Path tmpPath) throws Exception { + void testReadPrematureEndOfFileForHsInternalRegion(@TempDir Path tmpPath) throws Exception { FileChannel channel = tmpFileChannel(tmpPath); - ByteBuffer buffer = InternalRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE); - InternalRegionWriteReadUtils.writeRegionToFile( - channel, buffer, createSingleUnreleasedRegion(0, 0L, 1)); + ByteBuffer buffer = FileRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE); + FileRegionWriteReadUtils.writeHsInternalRegionToFile( + channel, buffer, new HsFileDataIndexImpl.InternalRegion(0, 0, 1, new boolean[1])); channel.truncate(channel.position() - 1); buffer.flip(); assertThatThrownBy( - () -> InternalRegionWriteReadUtils.readRegionFromFile(channel, buffer, 0L)) + () -> + FileRegionWriteReadUtils.readHsInternalRegionFromFile( + channel, buffer, 0L)) .isInstanceOf(IOException.class); } @Test - void testWriteAndReadRegion(@TempDir Path tmpPath) throws Exception { + void testWriteAndReadHsInternalRegion(@TempDir Path tmpPath) throws Exception { FileChannel channel = tmpFileChannel(tmpPath); - ByteBuffer buffer = InternalRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE); - InternalRegion region = createSingleUnreleasedRegion(10, 100L, 1); - InternalRegionWriteReadUtils.writeRegionToFile(channel, buffer, region); + ByteBuffer buffer = FileRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE); + HsFileDataIndexImpl.InternalRegion region = + new HsFileDataIndexImpl.InternalRegion(0, 0, 1, new boolean[1]); + FileRegionWriteReadUtils.writeHsInternalRegionToFile(channel, buffer, region); buffer.flip(); - InternalRegion readRegion = - InternalRegionWriteReadUtils.readRegionFromFile(channel, buffer, 0L); + HsFileDataIndexImpl.InternalRegion readRegion = + FileRegionWriteReadUtils.readHsInternalRegionFromFile(channel, buffer, 0L); assertRegionEquals(readRegion, region); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegion.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegion.java new file mode 100644 index 00000000000..29ee310fba8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegion.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.index; + +import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.function.Function; +import java.util.function.Supplier; + +import static org.apache.flink.runtime.io.network.partition.hybrid.index.FileRegionWriteReadUtils.allocateAndConfigureBuffer; + +/** Testing implementation for {@link FileDataIndexRegionHelper.Region}. */ +public class TestingFileDataIndexRegion implements FileDataIndexRegionHelper.Region { + + public static final int REGION_SIZE = Integer.BYTES + Long.BYTES + Integer.BYTES; + + private final Supplier<Integer> getSizeSupplier; + + private final Supplier<Integer> getFirstBufferIndexSupplier; + + private final Supplier<Long> getRegionFileOffsetSupplier; + + private final Supplier<Integer> getNumBuffersSupplier; + + private final Function<Integer, Boolean> containBufferFunction; + + private TestingFileDataIndexRegion( + Supplier<Integer> getSizeSupplier, + Supplier<Integer> getFirstBufferIndexSupplier, + Supplier<Long> getRegionFileOffsetSupplier, + Supplier<Integer> getNumBuffersSupplier, + Function<Integer, Boolean> containBufferFunction) { + this.getSizeSupplier = getSizeSupplier; + this.getFirstBufferIndexSupplier = getFirstBufferIndexSupplier; + this.getRegionFileOffsetSupplier = getRegionFileOffsetSupplier; + this.getNumBuffersSupplier = getNumBuffersSupplier; + this.containBufferFunction = containBufferFunction; + } + + @Override + public int getSize() { + return getSizeSupplier.get(); + } + + @Override + public int getFirstBufferIndex() { + return getFirstBufferIndexSupplier.get(); + } + + @Override + public long getRegionFileOffset() { + return getRegionFileOffsetSupplier.get(); + } + + @Override + public int getNumBuffers() { + return getNumBuffersSupplier.get(); + } + + @Override + public boolean containBuffer(int bufferIndex) { + return containBufferFunction.apply(bufferIndex); + } + + public static void writeRegionToFile(FileChannel channel, TestingFileDataIndexRegion region) + throws IOException { + ByteBuffer regionBuffer = allocateHeaderBuffer(); + regionBuffer.clear(); + regionBuffer.putInt(region.getFirstBufferIndex()); + regionBuffer.putInt(region.getNumBuffers()); + regionBuffer.putLong(region.getRegionFileOffset()); + regionBuffer.flip(); + BufferReaderWriterUtil.writeBuffers(channel, regionBuffer.capacity(), regionBuffer); + } + + public static TestingFileDataIndexRegion readRegionFromFile( + FileChannel channel, long fileOffset) throws IOException { + ByteBuffer regionBuffer = allocateHeaderBuffer(); + regionBuffer.clear(); + BufferReaderWriterUtil.readByteBufferFully(channel, regionBuffer, fileOffset); + regionBuffer.flip(); + int firstBufferIndex = regionBuffer.getInt(); + int numBuffers = regionBuffer.getInt(); + long firstBufferOffset = regionBuffer.getLong(); + return new TestingFileDataIndexRegion.Builder() + .setGetSizeSupplier(() -> TestingFileDataIndexRegion.REGION_SIZE) + .setGetFirstBufferIndexSupplier(() -> firstBufferIndex) + .setGetRegionFileOffsetSupplier(() -> firstBufferOffset) + .setGetNumBuffersSupplier(() -> numBuffers) + .setContainBufferFunction( + bufferIndex -> + getContainBufferFunction(bufferIndex, firstBufferIndex, numBuffers)) + .build(); + } + + private static ByteBuffer allocateHeaderBuffer() { + return allocateAndConfigureBuffer(TestingFileDataIndexRegion.REGION_SIZE); + } + + public static boolean getContainBufferFunction( + int bufferIndex, int firstBufferIndex, int numBuffers) { + return bufferIndex >= firstBufferIndex && bufferIndex < firstBufferIndex + numBuffers; + } + + /** Builder for {@link TestingFileDataIndexRegion}. */ + public static class Builder { + + private Supplier<Integer> getSizeSupplier = () -> 0; + + private Supplier<Integer> getFirstBufferIndexSupplier = () -> 0; + + private Supplier<Long> getRegionFileOffsetSupplier = () -> 0L; + + private Supplier<Integer> getNumBuffersSupplier = () -> 0; + + private Function<Integer, Boolean> containBufferFunction = bufferIndex -> false; + + public TestingFileDataIndexRegion.Builder setGetSizeSupplier( + Supplier<Integer> getSizeSupplier) { + this.getSizeSupplier = getSizeSupplier; + return this; + } + + public TestingFileDataIndexRegion.Builder setGetFirstBufferIndexSupplier( + Supplier<Integer> getFirstBufferIndexSupplier) { + this.getFirstBufferIndexSupplier = getFirstBufferIndexSupplier; + return this; + } + + public TestingFileDataIndexRegion.Builder setGetRegionFileOffsetSupplier( + Supplier<Long> getRegionFileOffsetSupplier) { + this.getRegionFileOffsetSupplier = getRegionFileOffsetSupplier; + return this; + } + + public TestingFileDataIndexRegion.Builder setGetNumBuffersSupplier( + Supplier<Integer> getNumBuffersSupplier) { + this.getNumBuffersSupplier = getNumBuffersSupplier; + return this; + } + + public TestingFileDataIndexRegion.Builder setContainBufferFunction( + Function<Integer, Boolean> containBufferFunction) { + this.containBufferFunction = containBufferFunction; + return this; + } + + public TestingFileDataIndexRegion build() { + return new TestingFileDataIndexRegion( + getSizeSupplier, + getFirstBufferIndexSupplier, + getRegionFileOffsetSupplier, + getNumBuffersSupplier, + containBufferFunction); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegionHelper.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegionHelper.java new file mode 100644 index 00000000000..3a36e3ca6ac --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegionHelper.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.index; + +import org.apache.flink.util.function.BiConsumerWithException; +import org.apache.flink.util.function.BiFunctionWithException; + +import java.io.IOException; +import java.nio.channels.FileChannel; + +/** Testing implementation for {@link FileDataIndexRegionHelper}. */ +public class TestingFileDataIndexRegionHelper + implements FileDataIndexRegionHelper<TestingFileDataIndexRegion> { + + private final BiConsumerWithException<FileChannel, TestingFileDataIndexRegion, IOException> + writeRegionToFileConsumer; + + private final BiFunctionWithException< + FileChannel, Long, TestingFileDataIndexRegion, IOException> + readRegionFromFileFunction; + + private TestingFileDataIndexRegionHelper( + BiConsumerWithException<FileChannel, TestingFileDataIndexRegion, IOException> + writeRegionToFileConsumer, + BiFunctionWithException<FileChannel, Long, TestingFileDataIndexRegion, IOException> + readRegionFromFileFunction) { + this.writeRegionToFileConsumer = writeRegionToFileConsumer; + this.readRegionFromFileFunction = readRegionFromFileFunction; + } + + @Override + public void writeRegionToFile(FileChannel channel, TestingFileDataIndexRegion region) + throws IOException { + writeRegionToFileConsumer.accept(channel, region); + } + + @Override + public TestingFileDataIndexRegion readRegionFromFile(FileChannel channel, long fileOffset) + throws IOException { + return readRegionFromFileFunction.apply(channel, fileOffset); + } + + /** Builder for {@link TestingFileDataIndexRegionHelper}. */ + public static class Builder { + private BiConsumerWithException<FileChannel, TestingFileDataIndexRegion, IOException> + writeRegionToFileConsumer = (fileChannel, testRegion) -> {}; + + private BiFunctionWithException<FileChannel, Long, TestingFileDataIndexRegion, IOException> + readRegionFromFileFunction = (fileChannel, fileOffset) -> null; + + public TestingFileDataIndexRegionHelper.Builder setWriteRegionToFileConsumer( + BiConsumerWithException<FileChannel, TestingFileDataIndexRegion, IOException> + writeRegionToFileConsumer) { + this.writeRegionToFileConsumer = writeRegionToFileConsumer; + return this; + } + + public TestingFileDataIndexRegionHelper.Builder setReadRegionFromFileFunction( + BiFunctionWithException<FileChannel, Long, TestingFileDataIndexRegion, IOException> + readRegionFromFileFunction) { + this.readRegionFromFileFunction = readRegionFromFileFunction; + return this; + } + + public TestingFileDataIndexRegionHelper build() { + return new TestingFileDataIndexRegionHelper( + writeRegionToFileConsumer, readRegionFromFileFunction); + } + } +}
