This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 910dc1b0babd6b00eea5b7052de5a20d5babfe28 Author: Weijie Guo <[email protected]> AuthorDate: Thu Jan 5 15:03:34 2023 +0800 [FLINK-30332][network] Introduce HsFileDataIndexSpilledRegionManager to manage spilled regions. --- .../partition/hybrid/HsFileDataIndexImpl.java | 2 +- .../HsFileDataIndexSpilledRegionManager.java | 58 +++ .../HsFileDataIndexSpilledRegionManagerImpl.java | 402 +++++++++++++++++++++ ...sFileDataIndexSpilledRegionManagerImplTest.java | 195 ++++++++++ .../partition/hybrid/HybridShuffleTestUtils.java | 37 ++ .../TestingFileDataIndexSpilledRegionManager.java | 111 ++++++ 6 files changed, 804 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java index de0081b559d..5249b644b5c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java @@ -198,7 +198,7 @@ public class HsFileDataIndexImpl implements HsFileDataIndex { this.released = released; } - private boolean containBuffer(int bufferIndex) { + boolean containBuffer(int bufferIndex) { return bufferIndex >= firstBufferIndex && bufferIndex < firstBufferIndex + numBuffers; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManager.java new file mode 100644 index 00000000000..e0a413a5c9f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManager.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.function.BiConsumer; + +/** This class is responsible for spilling region to disk and managing these spilled regions. */ +public interface HsFileDataIndexSpilledRegionManager extends AutoCloseable { + /** + * Write this region to index file. If target region already spilled, overwrite it. + * + * @param subpartition the subpartition id of this region. + * @param region the region to be spilled to index file. + */ + void appendOrOverwriteRegion(int subpartition, InternalRegion region) throws IOException; + + /** + * Find the region contains target bufferIndex and belong to target subpartition. + * + * @param subpartition the subpartition id that target region belong to. + * @param bufferIndex the buffer index that target region contains. + * @param loadToCache whether to load the found region into the cache. + * @return if target region can be founded, return it's offset in index file. Otherwise, return + * -1. + */ + long findRegion(int subpartition, int bufferIndex, boolean loadToCache); + + /** Close this spilled region manager. */ + void close() throws IOException; + + /** Factory of {@link HsFileDataIndexSpilledRegionManager}. */ + interface Factory { + HsFileDataIndexSpilledRegionManager create( + int numSubpartitions, + Path indexFilePath, + BiConsumer<Integer, InternalRegion> cacheRegionConsumer); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImpl.java new file mode 100644 index 00000000000..c87c0242b7e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImpl.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; +import org.apache.flink.util.ExceptionUtils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.TreeMap; +import java.util.function.BiConsumer; + +import static org.apache.flink.runtime.io.network.partition.hybrid.InternalRegionWriteReadUtils.allocateAndConfigureBuffer; + +/** + * Default implementation of {@link HsFileDataIndexSpilledRegionManager}. This manager will handle + * and spill regions in the following way: + * + * <ul> + * <li>All regions will be written to the same file, namely index file. + * <li>Multiple regions belonging to the same subpartition form a segment. + * <li>The regions in the same segment have no special relationship, but are only related to the + * order in which they are spilled. + * <li>Each segment is independent. Even if the previous segment is not full, the next segment can + * still be allocated. + * <li>If a region has been written to the index file already, spill it again will overwrite the + * previous region. + * <li>The very large region will monopolize a single segment. + * </ul> + * + * <p>The relationships between index file and segment are shown below. + * + * <pre> + * + * - - - - - - - - - Index File - - — - - - - - - - - - + * | | + * | - - — -Segment1 - - - - - - - - Segment2- - - - | + * ||SP1 R1||SP1 R2| Free | |SP2 R3| SP2 R1| SP2 R2 | | + * | - - - - - - - - - - - - - - - - - - - - - - - - | + * | | + * | - - - - - - - -Segment3 - - - - - - - | + * || Big Region | | + * | - - - - - - - - - - - - - - - - - - - | + * - - - - - - - - - - - - - - - - - - - - - -- - - - - + * </pre> + */ +public class HsFileDataIndexSpilledRegionManagerImpl + implements HsFileDataIndexSpilledRegionManager { + + /** Reusable buffer used to read and write the immutable part of region. */ + private final ByteBuffer regionHeaderBuffer = + allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE); + + /** + * List of subpartition's segment meta. Each element is a treeMap contains all {@link + * SegmentMeta}'s of specific subpartition corresponding to the subscript. The value of this + * treeMap is a {@link SegmentMeta}, and the key is minBufferIndex of this segment. Only + * finished(i.e. no longer appended) segment will be put to here. + */ + private final List<TreeMap<Integer, SegmentMeta>> subpartitionFinishedSegmentMetas; + + private FileChannel channel; + + /** The Offset of next segment, new segment will start from this offset. */ + private long nextSegmentOffset = 0L; + + private final long[] subpartitionCurrentOffset; + + /** Free space of every subpartition's current segment. */ + private final int[] subpartitionFreeSpaceInBytes; + + /** Metadata of every subpartition's current segment. */ + private final SegmentMeta[] currentSegmentMeta; + + /** + * Default size of segment. If the size of a region is larger than this value, it will be + * allocated and occupy a single segment. + */ + private final int segmentSizeInBytes; + + /** + * This consumer is used to load region to cache. The first parameter is subpartition id, and + * second parameter is the region to load. + */ + private final BiConsumer<Integer, InternalRegion> cacheRegionConsumer; + + /** + * When region in segment needs to be loaded to cache, whether to load all regions of the entire + * segment. + */ + private final boolean loadEntireSegmentToCache; + + public HsFileDataIndexSpilledRegionManagerImpl( + int numSubpartitions, + Path indexFilePath, + int segmentSizeInBytes, + long maxCacheCapacity, + BiConsumer<Integer, InternalRegion> cacheRegionConsumer) { + try { + this.channel = + FileChannel.open( + indexFilePath, + StandardOpenOption.CREATE_NEW, + StandardOpenOption.READ, + StandardOpenOption.WRITE); + } catch (IOException e) { + ExceptionUtils.rethrow(e); + } + this.loadEntireSegmentToCache = + shouldLoadEntireSegmentToCache( + numSubpartitions, segmentSizeInBytes, maxCacheCapacity); + this.subpartitionFinishedSegmentMetas = new ArrayList<>(numSubpartitions); + this.subpartitionCurrentOffset = new long[numSubpartitions]; + this.subpartitionFreeSpaceInBytes = new int[numSubpartitions]; + this.currentSegmentMeta = new SegmentMeta[numSubpartitions]; + for (int i = 0; i < numSubpartitions; i++) { + subpartitionFinishedSegmentMetas.add(new TreeMap<>()); + } + this.cacheRegionConsumer = cacheRegionConsumer; + this.segmentSizeInBytes = segmentSizeInBytes; + } + + @Override + public long findRegion(int subpartition, int bufferIndex, boolean loadToCache) { + // first of all, find the region from current writing segment. + SegmentMeta segmentMeta = currentSegmentMeta[subpartition]; + if (segmentMeta != null) { + long regionOffset = + findRegionInSegment(subpartition, bufferIndex, segmentMeta, loadToCache); + if (regionOffset != -1) { + return regionOffset; + } + } + + // next, find the region from finished segments. + TreeMap<Integer, SegmentMeta> subpartitionSegmentMetaTreeMap = + subpartitionFinishedSegmentMetas.get(subpartition); + // all segments with a minBufferIndex less than or equal to this target buffer index may + // contain the target region. + for (SegmentMeta meta : + subpartitionSegmentMetaTreeMap.headMap(bufferIndex, true).values()) { + long regionOffset = findRegionInSegment(subpartition, bufferIndex, meta, loadToCache); + if (regionOffset != -1) { + return regionOffset; + } + } + return -1; + } + + private long findRegionInSegment( + int subpartition, int bufferIndex, SegmentMeta meta, boolean loadToCache) { + if (bufferIndex <= meta.getMaxBufferIndex()) { + try { + return readSegmentAndLoadToCacheIfNeeded( + subpartition, bufferIndex, meta, loadToCache); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + // -1 indicates that target region is not founded from this segment + return -1; + } + + private long readSegmentAndLoadToCacheIfNeeded( + int subpartition, int bufferIndex, SegmentMeta meta, boolean loadToCache) + throws IOException { + // read all regions belong to this segment. + List<Tuple2<InternalRegion, Long>> regionAndOffsets = + readSegment(meta.getOffset(), meta.getNumRegions()); + // -1 indicates that target region is not founded from this segment. + long targetRegionOffset = -1; + InternalRegion targetRegion = null; + // traverse all regions to find target. + Iterator<Tuple2<InternalRegion, Long>> it = regionAndOffsets.iterator(); + while (it.hasNext()) { + Tuple2<InternalRegion, Long> regionAndOffset = it.next(); + InternalRegion region = regionAndOffset.f0; + // whether the region contains this buffer. + if (region.containBuffer(bufferIndex)) { + // target region is founded. + targetRegion = region; + targetRegionOffset = regionAndOffset.f1; + it.remove(); + } + } + + // target region is founded and need to load to cache. + if (targetRegion != null && loadToCache) { + if (loadEntireSegmentToCache) { + // first of all, load all regions except target to cache. + regionAndOffsets.forEach( + (regionAndOffsetTuple) -> + cacheRegionConsumer.accept(subpartition, regionAndOffsetTuple.f0)); + // load target region to cache in the end, this is to prevent the target + // from being eliminated. + cacheRegionConsumer.accept(subpartition, targetRegion); + } else { + // only load target region to cache. + cacheRegionConsumer.accept(subpartition, targetRegion); + } + } + // return the offset of target region. + return targetRegionOffset; + } + + @Override + public void appendOrOverwriteRegion(int subpartition, InternalRegion newRegion) + throws IOException { + // This method will only be called when we want to eliminate a region. We can't let the + // region be reloaded into the cache, otherwise it will lead to an infinite loop. + long oldRegionOffset = findRegion(subpartition, newRegion.getFirstBufferIndex(), false); + if (oldRegionOffset != -1) { + // if region is already exists in file, overwrite it. + writeRegionToOffset(oldRegionOffset, newRegion); + } else { + // otherwise, append region to segment. + appendRegion(subpartition, newRegion); + } + } + + @Override + public void close() throws IOException { + if (channel != null) { + channel.close(); + } + } + + private static boolean shouldLoadEntireSegmentToCache( + int numSubpartitions, int segmentSizeInBytes, long maxCacheCapacity) { + // If the cache can put at least two segments (one for reading and one for writing) for each + // subpartition, it is reasonable to load the entire segment into memory, which can improve + // the cache hit rate. On the contrary, if the cache capacity is small, loading a large + // number of regions will lead to performance degradation,only the target region should be + // loaded. + return ((long) 2 * numSubpartitions * segmentSizeInBytes) / InternalRegion.HEADER_SIZE + <= maxCacheCapacity; + } + + private void appendRegion(int subpartition, InternalRegion region) throws IOException { + int regionSize = region.getSize(); + // check whether we have enough space to append this region. + if (subpartitionFreeSpaceInBytes[subpartition] < regionSize) { + // No enough free space, start a new segment. Note that if region is larger than + // segment's size, this will start a new segment only contains the big region. + startNewSegment(subpartition, Math.max(regionSize, segmentSizeInBytes)); + } + // spill this region to current offset of file index. + writeRegionToOffset(subpartitionCurrentOffset[subpartition], region); + // a new region was appended to segment, update it. + updateSegment(subpartition, region); + } + + private void writeRegionToOffset(long offset, InternalRegion region) throws IOException { + channel.position(offset); + InternalRegionWriteReadUtils.writeRegionToFile(channel, regionHeaderBuffer, region); + } + + private void startNewSegment(int subpartition, int newSegmentSize) { + SegmentMeta oldSegmentMeta = currentSegmentMeta[subpartition]; + currentSegmentMeta[subpartition] = new SegmentMeta(nextSegmentOffset); + subpartitionCurrentOffset[subpartition] = nextSegmentOffset; + nextSegmentOffset += newSegmentSize; + subpartitionFreeSpaceInBytes[subpartition] = newSegmentSize; + if (oldSegmentMeta != null) { + // put the finished segment to subpartitionFinishedSegmentMetas. + subpartitionFinishedSegmentMetas + .get(subpartition) + .put(oldSegmentMeta.minBufferIndex, oldSegmentMeta); + } + } + + private void updateSegment(int subpartition, InternalRegion region) { + int regionSize = region.getSize(); + subpartitionFreeSpaceInBytes[subpartition] -= regionSize; + subpartitionCurrentOffset[subpartition] += regionSize; + SegmentMeta segmentMeta = currentSegmentMeta[subpartition]; + segmentMeta.addRegion( + region.getFirstBufferIndex(), + region.getFirstBufferIndex() + region.getNumBuffers() - 1); + } + + /** + * Read segment from index file. + * + * @param offset offset of this segment. + * @param numRegions number of regions of this segment. + * @return List of all regions and its offset belong to this segment. + */ + private List<Tuple2<InternalRegion, Long>> readSegment(long offset, int numRegions) + throws IOException { + List<Tuple2<InternalRegion, Long>> regionAndOffsets = new ArrayList<>(); + for (int i = 0; i < numRegions; i++) { + InternalRegion region = + InternalRegionWriteReadUtils.readRegionFromFile( + channel, regionHeaderBuffer, offset); + regionAndOffsets.add(Tuple2.of(region, offset)); + offset += region.getSize(); + } + return regionAndOffsets; + } + + /** + * Metadata of spilled regions segment. When a segment is finished(i.e. no longer appended), its + * corresponding {@link SegmentMeta} becomes immutable. + */ + private static class SegmentMeta { + /** + * Minimum buffer index of this segment. It is the smallest bufferIndex(inclusive) in all + * regions belong to this segment. + */ + private int minBufferIndex; + + /** + * Maximum buffer index of this segment. It is the largest bufferIndex(inclusive) in all + * regions belong to this segment. + */ + private int maxBufferIndex; + + /** Number of regions belong to this segment. */ + private int numRegions; + + /** The index file offset of this segment. */ + private final long offset; + + public SegmentMeta(long offset) { + this.offset = offset; + this.minBufferIndex = Integer.MAX_VALUE; + this.maxBufferIndex = 0; + this.numRegions = 0; + } + + public int getMaxBufferIndex() { + return maxBufferIndex; + } + + public long getOffset() { + return offset; + } + + public int getNumRegions() { + return numRegions; + } + + public void addRegion(int firstBufferIndexOfRegion, int maxBufferIndexOfRegion) { + if (firstBufferIndexOfRegion < minBufferIndex) { + this.minBufferIndex = firstBufferIndexOfRegion; + } + if (maxBufferIndexOfRegion > maxBufferIndex) { + this.maxBufferIndex = maxBufferIndexOfRegion; + } + this.numRegions++; + } + } + + /** Factory of {@link HsFileDataIndexSpilledRegionManager}. */ + public static class Factory implements HsFileDataIndexSpilledRegionManager.Factory { + private final int segmentSizeInBytes; + + private final long maxCacheCapacity; + + public Factory(int segmentSizeInBytes, long maxCacheCapacity) { + this.segmentSizeInBytes = segmentSizeInBytes; + this.maxCacheCapacity = maxCacheCapacity; + } + + @Override + public HsFileDataIndexSpilledRegionManager create( + int numSubpartitions, + Path indexFilePath, + BiConsumer<Integer, InternalRegion> cacheRegionConsumer) { + return new HsFileDataIndexSpilledRegionManagerImpl( + numSubpartitions, + indexFilePath, + segmentSizeInBytes, + maxCacheCapacity, + cacheRegionConsumer); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImplTest.java new file mode 100644 index 00000000000..bb4c31a4f2d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImplTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; + +import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.assertRegionEquals; +import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createAllUnreleasedRegions; +import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createSingleUnreleasedRegion; +import static org.apache.flink.runtime.io.network.partition.hybrid.InternalRegionWriteReadUtils.allocateAndConfigureBuffer; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link HsFileDataIndexSpilledRegionManagerImpl}. */ +class HsFileDataIndexSpilledRegionManagerImplTest { + private Path indexFilePath; + + @BeforeEach + void before(@TempDir Path tmpPath) { + indexFilePath = tmpPath.resolve(UUID.randomUUID().toString()); + } + + @Test + void testFindNonExistentRegion() throws Exception { + CompletableFuture<Void> cachedRegionFuture = new CompletableFuture<>(); + try (HsFileDataIndexSpilledRegionManager spilledRegionManager = + createSpilledRegionManager( + (ignore1, ignore2) -> cachedRegionFuture.complete(null))) { + long regionOffset = spilledRegionManager.findRegion(0, 0, true); + assertThat(regionOffset).isEqualTo(-1); + assertThat(cachedRegionFuture).isNotCompleted(); + } + } + + @Test + void testAppendOrOverwriteRegion() throws Exception { + CompletableFuture<Void> cachedRegionFuture = new CompletableFuture<>(); + try (HsFileDataIndexSpilledRegionManager spilledRegionManager = + createSpilledRegionManager( + (ignore1, ignore2) -> cachedRegionFuture.complete(null))) { + InternalRegion region = createSingleUnreleasedRegion(0, 0L, 1); + // append region to index file. + spilledRegionManager.appendOrOverwriteRegion(0, region); + assertThat(cachedRegionFuture).isNotCompleted(); + FileChannel indexFileChannel = FileChannel.open(indexFilePath, StandardOpenOption.READ); + InternalRegion readRegion = + InternalRegionWriteReadUtils.readRegionFromFile( + indexFileChannel, + allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE), + 0L); + assertRegionEquals(readRegion, region); + + // new region must have the same size of old region. + InternalRegion newRegion = createSingleUnreleasedRegion(0, 10L, 1); + // overwrite old region. + spilledRegionManager.appendOrOverwriteRegion(0, newRegion); + // appendOrOverwriteRegion will not trigger cache load. + assertThat(cachedRegionFuture).isNotCompleted(); + InternalRegion readNewRegion = + InternalRegionWriteReadUtils.readRegionFromFile( + indexFileChannel, + allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE), + 0L); + assertRegionEquals(readNewRegion, newRegion); + } + } + + @Test + void testWriteMoreThanOneSegment() throws Exception { + List<InternalRegion> regions = createAllUnreleasedRegions(0, 0L, 2, 2); + int segmentSize = regions.stream().mapToInt(InternalRegion::getSize).sum() + 1; + try (HsFileDataIndexSpilledRegionManager spilledRegionManager = + createSpilledRegionManager(segmentSize, (ignore1, ignore2) -> {})) { + spilledRegionManager.appendOrOverwriteRegion(0, regions.get(0)); + spilledRegionManager.appendOrOverwriteRegion(0, regions.get(1)); + // segment has no enough space, will start new segment. + InternalRegion regionInNewSegment = createSingleUnreleasedRegion(4, 4L, 2); + spilledRegionManager.appendOrOverwriteRegion(0, regionInNewSegment); + FileChannel indexFileChannel = FileChannel.open(indexFilePath, StandardOpenOption.READ); + InternalRegion readRegion = + InternalRegionWriteReadUtils.readRegionFromFile( + indexFileChannel, + allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE), + // offset is segment size instead of two regions size to prove that new + // segment is started. + segmentSize); + assertRegionEquals(readRegion, regionInNewSegment); + } + } + + @Test + void testWriteBigRegion() throws Exception { + int segmentSize = 4; + try (HsFileDataIndexSpilledRegionManager spilledRegionManager = + createSpilledRegionManager(segmentSize, (ignore1, ignore2) -> {})) { + List<InternalRegion> regions = createAllUnreleasedRegions(0, 0L, 1, 2); + InternalRegion region1 = regions.get(0); + InternalRegion region2 = regions.get(1); + assertThat(region1.getSize()).isGreaterThan(segmentSize); + assertThat(region2.getSize()).isGreaterThan(segmentSize); + + spilledRegionManager.appendOrOverwriteRegion(0, region1); + spilledRegionManager.appendOrOverwriteRegion(0, region2); + FileChannel indexFileChannel = FileChannel.open(indexFilePath, StandardOpenOption.READ); + InternalRegion readRegion1 = + InternalRegionWriteReadUtils.readRegionFromFile( + indexFileChannel, + allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE), + 0L); + assertRegionEquals(readRegion1, region1); + + InternalRegion readRegion2 = + InternalRegionWriteReadUtils.readRegionFromFile( + indexFileChannel, + allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE), + readRegion1.getSize()); + assertRegionEquals(readRegion2, region2); + } + } + + @Test + void testFindRegionFirstBufferIndexInMultipleSegments() throws Exception { + final int numBuffersPerRegion = 2; + final int subpartition = 0; + List<InternalRegion> loadedRegions = new ArrayList<>(); + try (HsFileDataIndexSpilledRegionManager spilledRegionManager = + createSpilledRegionManager( + // every segment can store two regions. + (InternalRegion.HEADER_SIZE + numBuffersPerRegion) * 2, + (ignore, region) -> loadedRegions.add(region))) { + // segment1: region1(0-2), region2(9-11), min:0, max: 11 + spilledRegionManager.appendOrOverwriteRegion( + subpartition, createSingleUnreleasedRegion(0, 0L, numBuffersPerRegion)); + spilledRegionManager.appendOrOverwriteRegion( + subpartition, createSingleUnreleasedRegion(9, 9L, numBuffersPerRegion)); + + // segment2: region1(2-4), region2(11-13) min: 2, max: 13 + InternalRegion targetRegion = createSingleUnreleasedRegion(2, 2L, 2); + spilledRegionManager.appendOrOverwriteRegion(subpartition, targetRegion); + spilledRegionManager.appendOrOverwriteRegion( + subpartition, createSingleUnreleasedRegion(11, 11L, 2)); + + // segment3: region1(7-9) min: 7, max: 9 + spilledRegionManager.appendOrOverwriteRegion( + subpartition, createSingleUnreleasedRegion(7, 7L, 2)); + + // find target region + long regionOffset = spilledRegionManager.findRegion(subpartition, 3, true); + assertThat(regionOffset).isNotEqualTo(-1L); + assertThat(loadedRegions).hasSize(2); + // target region must be put to the cache last. + assertRegionEquals(loadedRegions.get(1), targetRegion); + } + } + + private HsFileDataIndexSpilledRegionManager createSpilledRegionManager( + BiConsumer<Integer, InternalRegion> cacheRegionConsumer) { + return createSpilledRegionManager(256, cacheRegionConsumer); + } + + private HsFileDataIndexSpilledRegionManager createSpilledRegionManager( + int segmentSize, BiConsumer<Integer, InternalRegion> cacheRegionConsumer) { + int numSubpartitions = 2; + return new HsFileDataIndexSpilledRegionManagerImpl.Factory(segmentSize, Long.MAX_VALUE) + .create(numSubpartitions, indexFilePath, cacheRegionConsumer); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java index bd26b6d56a8..279f12d2782 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java @@ -24,12 +24,15 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; import java.util.List; +import static org.assertj.core.api.Assertions.assertThat; + /** Test utils for hybrid shuffle mode. */ public class HybridShuffleTestUtils { public static final int MEMORY_SEGMENT_SIZE = 128; @@ -69,4 +72,38 @@ public class HybridShuffleTestUtils { public static HsOutputMetrics createTestingOutputMetrics() { return new HsOutputMetrics(new TestCounter(), new TestCounter()); } + + public static InternalRegion createSingleUnreleasedRegion( + int firstBufferIndex, long firstBufferOffset, int numBuffersPerRegion) { + return new InternalRegion( + firstBufferIndex, + firstBufferOffset, + numBuffersPerRegion, + new boolean[numBuffersPerRegion]); + } + + public static List<InternalRegion> createAllUnreleasedRegions( + int firstBufferIndex, long firstBufferOffset, int numBuffersPerRegion, int numRegions) { + List<InternalRegion> regions = new ArrayList<>(); + int bufferIndex = firstBufferIndex; + long bufferOffset = firstBufferOffset; + for (int i = 0; i < numRegions; i++) { + regions.add( + new InternalRegion( + bufferIndex, + bufferOffset, + numBuffersPerRegion, + new boolean[numBuffersPerRegion])); + bufferIndex += numBuffersPerRegion; + bufferOffset += bufferOffset; + } + return regions; + } + + public static void assertRegionEquals(InternalRegion expected, InternalRegion region) { + assertThat(region.getFirstBufferIndex()).isEqualTo(expected.getFirstBufferIndex()); + assertThat(region.getFirstBufferOffset()).isEqualTo(expected.getFirstBufferOffset()); + assertThat(region.getNumBuffers()).isEqualTo(region.getNumBuffers()); + assertThat(region.getReleased()).isEqualTo(region.getReleased()); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndexSpilledRegionManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndexSpilledRegionManager.java new file mode 100644 index 00000000000..0c287c9fb25 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndexSpilledRegionManager.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.TreeMap; +import java.util.function.BiConsumer; + +/** Mock {@link HsFileDataIndexSpilledRegionManager} for testing. */ +public class TestingFileDataIndexSpilledRegionManager + implements HsFileDataIndexSpilledRegionManager { + private final List<TreeMap<Integer, InternalRegion>> regions; + + private final BiConsumer<Integer, InternalRegion> cacheRegionConsumer; + + private int findRegionInvoked = 0; + + public TestingFileDataIndexSpilledRegionManager( + int numSubpartitions, BiConsumer<Integer, InternalRegion> cacheRegionConsumer) { + this.regions = new ArrayList<>(); + this.cacheRegionConsumer = cacheRegionConsumer; + for (int i = 0; i < numSubpartitions; i++) { + regions.add(new TreeMap<>()); + } + } + + @Nullable + public InternalRegion getRegion(int subpartition, int bufferIndex) { + return regions.get(subpartition).get(bufferIndex); + } + + public int getSpilledRegionSize(int subpartition) { + return regions.get(subpartition).size(); + } + + public int getFindRegionInvoked() { + return findRegionInvoked; + } + + @Override + public void appendOrOverwriteRegion(int subpartition, InternalRegion region) + throws IOException { + regions.get(subpartition).put(region.getFirstBufferIndex(), region); + } + + @Override + public long findRegion(int subpartition, int bufferIndex, boolean loadToCache) { + findRegionInvoked++; + InternalRegion region = regions.get(subpartition).get(bufferIndex); + if (region == null) { + return -1; + } else { + // return non -1 value indicates this region is exists. + if (loadToCache) { + cacheRegionConsumer.accept(subpartition, region); + } + return 1; + } + } + + @Override + public void close() throws IOException { + // do nothing. + } + + /** Factory of {@link TestingFileDataIndexSpilledRegionManager}. */ + public static class Factory implements HsFileDataIndexSpilledRegionManager.Factory { + public static final Factory INSTANCE = new Factory(); + + public TestingFileDataIndexSpilledRegionManager lastSpilledRegionManager; + + public TestingFileDataIndexSpilledRegionManager getLastSpilledRegionManager() { + return lastSpilledRegionManager; + } + + @Override + public HsFileDataIndexSpilledRegionManager create( + int numSubpartitions, + Path indexFilePath, + BiConsumer<Integer, InternalRegion> cacheRegionConsumer) { + TestingFileDataIndexSpilledRegionManager testingFileDataIndexSpilledRegionManager = + new TestingFileDataIndexSpilledRegionManager( + numSubpartitions, cacheRegionConsumer); + lastSpilledRegionManager = testingFileDataIndexSpilledRegionManager; + return testingFileDataIndexSpilledRegionManager; + } + } +}
