This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1786e2ddfd1a1df5e907f5719138f63e819cb1a3 Author: Yuxin Tan <[email protected]> AuthorDate: Mon Jul 17 15:09:26 2023 +0800 [FLINK-32576][network] ProducerMergedPartitionFileIndex supports caching regions and spilling regions to file --- .../hybrid/index/FileRegionWriteReadUtils.java | 42 ++++++ .../tiered/file/ProducerMergedPartitionFile.java | 2 + .../file/ProducerMergedPartitionFileIndex.java | 149 ++++++++++++++------- .../file/ProducerMergedPartitionFileReader.java | 9 +- .../hybrid/tiered/tier/disk/DiskTierFactory.java | 19 ++- .../partition/hybrid/HybridShuffleTestUtils.java | 7 + .../hybrid/index/FileRegionWriteReadUtilsTest.java | 29 ++++ .../file/ProducerMergedPartitionFileIndexTest.java | 15 ++- .../ProducerMergedPartitionFileReaderTest.java | 10 +- .../ProducerMergedPartitionFileWriterTest.java | 2 + .../TestingProducerMergedPartitionFileIndex.java | 54 +++++++- 11 files changed, 275 insertions(+), 63 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java index 443bdbf863e..1ca689d0c2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid.index; import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil; import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex.FixedSizeRegion; import java.io.IOException; import java.nio.ByteBuffer; @@ -105,4 +106,45 @@ public class FileRegionWriteReadUtils { } return new InternalRegion(firstBufferIndex, firstBufferOffset, numBuffers, released); } + + /** + * Write {@link FixedSizeRegion} to {@link FileChannel}. + * + * <p>Note that this type of region's length is fixed. + * + * @param channel the file's channel to write. + * @param regionBuffer the buffer to write {@link FixedSizeRegion}'s header. + * @param region the region to be written to channel. + */ + public static void writeFixedSizeRegionToFile( + FileChannel channel, ByteBuffer regionBuffer, FileDataIndexRegionHelper.Region region) + throws IOException { + regionBuffer.clear(); + regionBuffer.putInt(region.getFirstBufferIndex()); + regionBuffer.putInt(region.getNumBuffers()); + regionBuffer.putLong(region.getRegionFileOffset()); + regionBuffer.flip(); + BufferReaderWriterUtil.writeBuffers(channel, regionBuffer.capacity(), regionBuffer); + } + + /** + * Read {@link FixedSizeRegion} from {@link FileChannel}. + * + * <p>Note that this type of region's length is fixed. + * + * @param channel the channel to read. + * @param regionBuffer the buffer to read {@link FixedSizeRegion}'s header. + * @param fileOffset the file offset to start read. + * @return the {@link FixedSizeRegion} that read from this channel. + */ + public static FixedSizeRegion readFixedSizeRegionFromFile( + FileChannel channel, ByteBuffer regionBuffer, long fileOffset) throws IOException { + regionBuffer.clear(); + BufferReaderWriterUtil.readByteBufferFully(channel, regionBuffer, fileOffset); + regionBuffer.flip(); + int firstBufferIndex = regionBuffer.getInt(); + int numBuffers = regionBuffer.getInt(); + long firstBufferOffset = regionBuffer.getLong(); + return new FixedSizeRegion(firstBufferIndex, firstBufferOffset, numBuffers); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFile.java index fef384286b7..00aa0344534 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFile.java @@ -28,6 +28,8 @@ public class ProducerMergedPartitionFile { public static final String DATA_FILE_SUFFIX = ".tier-storage.data"; + public static final String INDEX_FILE_SUFFIX = ".tier-storage.index"; + public static ProducerMergedPartitionFileWriter createPartitionFileWriter( Path dataFilePath, ProducerMergedPartitionFileIndex partitionFileIndex) { return new ProducerMergedPartitionFileWriter(dataFilePath, partitionFileIndex); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java index 34d86a2caf6..01cc9086d56 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java @@ -18,18 +18,28 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file; +import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexCache; +import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexRegionHelper; +import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexSpilledRegionManagerImpl; +import org.apache.flink.runtime.io.network.partition.hybrid.index.FileRegionWriteReadUtils; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; import javax.annotation.concurrent.GuardedBy; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.TreeMap; +import static org.apache.flink.runtime.io.network.partition.hybrid.index.FileRegionWriteReadUtils.allocateAndConfigureBuffer; import static org.apache.flink.util.Preconditions.checkArgument; /** @@ -39,7 +49,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; * * <p>For efficiency, buffers from the same subpartition that are both logically (i.e. index in the * subpartition) and physically (i.e. offset in the file) consecutive are combined into a {@link - * Region}. + * FixedSizeRegion}. * * <pre>For example, the following buffers (indicated by subpartitionId-bufferIndex): * 1-1, 1-2, 1-3, 2-1, 2-2, 2-5, 1-4, 1-5, 2-6 @@ -49,6 +59,8 @@ import static org.apache.flink.util.Preconditions.checkArgument; */ public class ProducerMergedPartitionFileIndex { + private final Path indexFilePath; + /** * The regions belonging to each subpartitions. * @@ -56,18 +68,26 @@ public class ProducerMergedPartitionFileIndex { * to ensure the thread safety. */ @GuardedBy("lock") - private final List<TreeMap<Integer, Region>> subpartitionRegions; - - @GuardedBy("lock") - private boolean isReleased; + private final FileDataIndexCache<FixedSizeRegion> indexCache; private final Object lock = new Object(); - public ProducerMergedPartitionFileIndex(int numSubpartitions) { - this.subpartitionRegions = new ArrayList<>(); - for (int subpartitionId = 0; subpartitionId < numSubpartitions; ++subpartitionId) { - subpartitionRegions.add(new TreeMap<>()); - } + public ProducerMergedPartitionFileIndex( + int numSubpartitions, + Path indexFilePath, + int regionGroupSizeInBytes, + long numRetainedInMemoryRegionsMax) { + this.indexFilePath = indexFilePath; + this.indexCache = + new FileDataIndexCache<>( + numSubpartitions, + indexFilePath, + numRetainedInMemoryRegionsMax, + new FileDataIndexSpilledRegionManagerImpl.Factory<>( + regionGroupSizeInBytes, + numRetainedInMemoryRegionsMax, + FixedSizeRegion.REGION_SIZE, + ProducerMergedPartitionFileDataIndexRegionHelper.INSTANCE)); } /** @@ -81,48 +101,34 @@ public class ProducerMergedPartitionFileIndex { return; } - Map<Integer, List<Region>> convertedRegions = convertToRegions(buffers); + Map<Integer, List<FixedSizeRegion>> convertedRegions = convertToRegions(buffers); synchronized (lock) { - convertedRegions.forEach( - (subpartition, regions) -> { - Map<Integer, Region> regionMap = subpartitionRegions.get(subpartition); - for (Region region : regions) { - regionMap.put(region.getFirstBufferIndex(), region); - } - }); + convertedRegions.forEach(indexCache::put); } } /** - * Get the subpartition's {@link Region} containing the specific buffer index. + * Get the subpartition's {@link FixedSizeRegion} containing the specific buffer index. * * @param subpartitionId the subpartition id * @param bufferIndex the buffer index * @return the region containing the buffer index, or return emtpy if the region is not found. */ - Optional<Region> getRegion(TieredStorageSubpartitionId subpartitionId, int bufferIndex) { + Optional<FixedSizeRegion> getRegion( + TieredStorageSubpartitionId subpartitionId, int bufferIndex) { synchronized (lock) { - if (isReleased) { - return Optional.empty(); - } - Map.Entry<Integer, Region> regionEntry = - subpartitionRegions - .get(subpartitionId.getSubpartitionId()) - .floorEntry(bufferIndex); - if (regionEntry == null) { - return Optional.empty(); - } - Region region = regionEntry.getValue(); - return bufferIndex < region.getFirstBufferIndex() + region.numBuffers - ? Optional.of(region) - : Optional.empty(); + return indexCache.get(subpartitionId.getSubpartitionId(), bufferIndex); } } void release() { synchronized (lock) { - subpartitionRegions.clear(); - isReleased = true; + try { + indexCache.close(); + IOUtils.deleteFileQuietly(indexFilePath); + } catch (IOException e) { + ExceptionUtils.rethrow(e); + } } } @@ -130,8 +136,9 @@ public class ProducerMergedPartitionFileIndex { // Internal Methods // ------------------------------------------------------------------------ - private static Map<Integer, List<Region>> convertToRegions(List<FlushedBuffer> buffers) { - Map<Integer, List<Region>> subpartitionRegionMap = new HashMap<>(); + private static Map<Integer, List<FixedSizeRegion>> convertToRegions( + List<FlushedBuffer> buffers) { + Map<Integer, List<FixedSizeRegion>> subpartitionRegionMap = new HashMap<>(); Iterator<FlushedBuffer> iterator = buffers.iterator(); FlushedBuffer firstBufferInRegion = iterator.next(); FlushedBuffer lastBufferInRegion = firstBufferInRegion; @@ -155,7 +162,7 @@ public class ProducerMergedPartitionFileIndex { private static void addRegionToMap( FlushedBuffer firstBufferInRegion, FlushedBuffer lastBufferInRegion, - Map<Integer, List<Region>> subpartitionRegionMap) { + Map<Integer, List<FixedSizeRegion>> subpartitionRegionMap) { checkArgument( firstBufferInRegion.getSubpartitionId() == lastBufferInRegion.getSubpartitionId()); checkArgument(firstBufferInRegion.getBufferIndex() <= lastBufferInRegion.getBufferIndex()); @@ -163,7 +170,7 @@ public class ProducerMergedPartitionFileIndex { subpartitionRegionMap .computeIfAbsent(firstBufferInRegion.getSubpartitionId(), ArrayList::new) .add( - new Region( + new FixedSizeRegion( firstBufferInRegion.getBufferIndex(), firstBufferInRegion.getFileOffset(), lastBufferInRegion.getBufferIndex() @@ -205,6 +212,38 @@ public class ProducerMergedPartitionFileIndex { } } + /** + * The implementation of {@link FileDataIndexRegionHelper} to writing a region to the file or + * reading a region from the file. + * + * <p>Note that this type of region's length is fixed. + */ + static class ProducerMergedPartitionFileDataIndexRegionHelper + implements FileDataIndexRegionHelper<FixedSizeRegion> { + + /** Reusable buffer used to read and write the immutable part of region. */ + private final ByteBuffer regionBuffer = + allocateAndConfigureBuffer(FixedSizeRegion.REGION_SIZE); + + static final ProducerMergedPartitionFileDataIndexRegionHelper INSTANCE = + new ProducerMergedPartitionFileDataIndexRegionHelper(); + + private ProducerMergedPartitionFileDataIndexRegionHelper() {} + + @Override + public void writeRegionToFile(FileChannel channel, FixedSizeRegion region) + throws IOException { + FileRegionWriteReadUtils.writeFixedSizeRegionToFile(channel, regionBuffer, region); + } + + @Override + public FixedSizeRegion readRegionFromFile(FileChannel channel, long fileOffset) + throws IOException { + return FileRegionWriteReadUtils.readFixedSizeRegionFromFile( + channel, regionBuffer, fileOffset); + } + } + /** * Represents a series of buffers that are: * @@ -213,8 +252,12 @@ public class ProducerMergedPartitionFileIndex { * <li>Logically (i.e. buffer index) consecutive * <li>Physically (i.e. offset in the file) consecutive * </ul> + * + * <p>Note that the region has a fixed size. */ - static class Region { + public static class FixedSizeRegion implements FileDataIndexRegionHelper.Region { + + public static final int REGION_SIZE = Integer.BYTES + Long.BYTES + Integer.BYTES; /** The buffer index of first buffer. */ private final int firstBufferIndex; @@ -225,21 +268,35 @@ public class ProducerMergedPartitionFileIndex { /** The number of buffers that the region contains. */ private final int numBuffers; - Region(int firstBufferIndex, long regionFileOffset, int numBuffers) { + public FixedSizeRegion(int firstBufferIndex, long regionFileOffset, int numBuffers) { this.firstBufferIndex = firstBufferIndex; this.regionFileOffset = regionFileOffset; this.numBuffers = numBuffers; } - long getRegionFileOffset() { + @Override + public boolean containBuffer(int bufferIndex) { + return bufferIndex >= firstBufferIndex && bufferIndex < firstBufferIndex + numBuffers; + } + + /** Get the total size in bytes of this region, including the fields and the buffers. */ + @Override + public int getSize() { + return REGION_SIZE + numBuffers; + } + + @Override + public long getRegionFileOffset() { return regionFileOffset; } - int getNumBuffers() { + @Override + public int getNumBuffers() { return numBuffers; } - int getFirstBufferIndex() { + @Override + public int getFirstBufferIndex() { return firstBufferIndex; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java index d82aa78659f..00b8bd49db4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java @@ -40,7 +40,6 @@ import java.util.Optional; import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.positionToNextBuffer; import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel; -import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex.Region; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -192,7 +191,8 @@ public class ProducerMergedPartitionFileReader implements PartitionFileReader { Tuple2<TieredStorageSubpartitionId, Integer> cacheKey, boolean removeKey) { BufferOffsetCache bufferOffsetCache = bufferOffsetCaches.remove(cacheKey); if (bufferOffsetCache == null) { - Optional<Region> regionOpt = dataIndex.getRegion(cacheKey.f0, cacheKey.f1); + Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> regionOpt = + dataIndex.getRegion(cacheKey.f0, cacheKey.f1); return regionOpt.map(region -> new BufferOffsetCache(cacheKey.f1, region)); } else { if (removeKey) { @@ -211,13 +211,14 @@ public class ProducerMergedPartitionFileReader implements PartitionFileReader { */ private class BufferOffsetCache { - private final Region region; + private final ProducerMergedPartitionFileIndex.FixedSizeRegion region; private long fileOffset; private int nextBufferIndex; - private BufferOffsetCache(int bufferIndex, Region region) { + private BufferOffsetCache( + int bufferIndex, ProducerMergedPartitionFileIndex.FixedSizeRegion region) { this.nextBufferIndex = bufferIndex; this.region = region; moveFileOffsetToBuffer(bufferIndex); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java index 65288fe01a6..fad6677f4b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.concurrent.ScheduledExecutorService; import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFile.DATA_FILE_SUFFIX; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFile.INDEX_FILE_SUFFIX; /** The implementation of {@link TierFactory} for disk tier. */ public class DiskTierFactory implements TierFactory { @@ -51,11 +52,21 @@ public class DiskTierFactory implements TierFactory { private final float minReservedDiskSpaceFraction; + private final int regionGroupSizeInBytes; + + private final long numRetainedInMemoryRegionsMax; + public DiskTierFactory( - int numBytesPerSegment, int bufferSizeBytes, float minReservedDiskSpaceFraction) { + int numBytesPerSegment, + int bufferSizeBytes, + float minReservedDiskSpaceFraction, + int regionGroupSizeInBytes, + long numRetainedInMemoryRegionsMax) { this.numBytesPerSegment = numBytesPerSegment; this.bufferSizeBytes = bufferSizeBytes; this.minReservedDiskSpaceFraction = minReservedDiskSpaceFraction; + this.regionGroupSizeInBytes = regionGroupSizeInBytes; + this.numRetainedInMemoryRegionsMax = numRetainedInMemoryRegionsMax; } @Override @@ -78,7 +89,11 @@ public class DiskTierFactory implements TierFactory { Duration bufferRequestTimeout, int maxBufferReadAhead) { ProducerMergedPartitionFileIndex partitionFileIndex = - new ProducerMergedPartitionFileIndex(isBroadcastOnly ? 1 : numSubpartitions); + new ProducerMergedPartitionFileIndex( + isBroadcastOnly ? 1 : numSubpartitions, + Paths.get(dataFileBasePath + INDEX_FILE_SUFFIX), + regionGroupSizeInBytes, + numRetainedInMemoryRegionsMax); Path dataFilePath = Paths.get(dataFileBasePath + DATA_FILE_SUFFIX); ProducerMergedPartitionFileWriter partitionFileWriter = ProducerMergedPartitionFile.createPartitionFileWriter( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java index bbb88314b4c..5b0629bd1a5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexRegionHelper; import org.apache.flink.runtime.io.network.partition.hybrid.index.TestingFileDataIndexRegion; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex; import java.util.ArrayDeque; import java.util.ArrayList; @@ -116,6 +117,12 @@ public class HybridShuffleTestUtils { return regions; } + public static FileDataIndexRegionHelper.Region createSingleFixedSizeRegion( + int firstBufferIndex, long firstBufferOffset, int numBuffersPerRegion) { + return new ProducerMergedPartitionFileIndex.FixedSizeRegion( + firstBufferIndex, firstBufferOffset, numBuffersPerRegion); + } + public static void assertRegionEquals( FileDataIndexRegionHelper.Region expected, FileDataIndexRegionHelper.Region region) { assertThat(region.getFirstBufferIndex()).isEqualTo(expected.getFirstBufferIndex()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java index fc2cf847002..8e04f24186e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid.index; import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -34,6 +35,7 @@ import java.util.UUID; import static org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion.HEADER_SIZE; import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.assertRegionEquals; +import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createSingleFixedSizeRegion; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -78,6 +80,33 @@ class FileRegionWriteReadUtilsTest { assertRegionEquals(readRegion, region); } + @Test + void testReadPrematureEndOfFileForFixedSizeRegion(@TempDir Path tmpPath) throws Exception { + FileChannel channel = tmpFileChannel(tmpPath); + ByteBuffer buffer = FileRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE); + FileRegionWriteReadUtils.writeFixedSizeRegionToFile( + channel, buffer, createSingleFixedSizeRegion(0, 0L, 1)); + channel.truncate(channel.position() - 1); + buffer.flip(); + assertThatThrownBy( + () -> + FileRegionWriteReadUtils.readFixedSizeRegionFromFile( + channel, buffer, 0L)) + .isInstanceOf(IOException.class); + } + + @Test + void testWriteAndReadFixedSizeRegion(@TempDir Path tmpPath) throws Exception { + FileChannel channel = tmpFileChannel(tmpPath); + ByteBuffer buffer = FileRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE); + FileDataIndexRegionHelper.Region region = createSingleFixedSizeRegion(10, 100L, 1); + FileRegionWriteReadUtils.writeFixedSizeRegionToFile(channel, buffer, region); + buffer.flip(); + ProducerMergedPartitionFileIndex.FixedSizeRegion readRegion = + FileRegionWriteReadUtils.readFixedSizeRegionFromFile(channel, buffer, 0L); + assertRegionEquals(readRegion, region); + } + private static FileChannel tmpFileChannel(Path tempPath) throws IOException { return FileChannel.open( Files.createFile(tempPath.resolve(UUID.randomUUID().toString())), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndexTest.java index 74d96f1c6d1..615e2235643 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndexTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndexTest.java @@ -21,8 +21,11 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.nio.file.Path; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -35,13 +38,21 @@ import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link ProducerMergedPartitionFileIndex}. */ class ProducerMergedPartitionFileIndexTest { + private Path indexFilePath; + + @BeforeEach + void before(@TempDir Path tempDir) { + this.indexFilePath = tempDir.resolve(".index"); + } + @Test void testAddBufferAndGetRegion() { int numSubpartitions = 5; int numBuffersPerSubpartition = 10; ProducerMergedPartitionFileIndex partitionFileIndex = - new ProducerMergedPartitionFileIndex(numSubpartitions); + new ProducerMergedPartitionFileIndex( + numSubpartitions, indexFilePath, 256, Long.MAX_VALUE); List<ProducerMergedPartitionFileIndex.FlushedBuffer> flushedBuffers = new ArrayList<>(); Tuple2<Integer, Integer> numExpectedRegionsAndMaxBufferIndex = @@ -91,7 +102,7 @@ class ProducerMergedPartitionFileIndexTest { for (int i = 0; i < numSubpartitions; i++) { subpartitionFirstBufferIndexes.add(new HashSet<>()); for (int j = 0; j <= maxBufferIndex; j++) { - Optional<ProducerMergedPartitionFileIndex.Region> region = + Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> region = partitionFileIndex.getRegion(new TieredStorageSubpartitionId(i), j); if (region.isPresent()) { subpartitionFirstBufferIndexes.get(i).add(region.get().getFirstBufferIndex()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReaderTest.java index 7d6f3b25da0..fc29e07c045 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReaderTest.java @@ -58,6 +58,8 @@ class ProducerMergedPartitionFileReaderTest { private static final String DEFAULT_TEST_FILE_NAME = "testFile"; + private static final String DEFAULT_TEST_INDEX_NAME = "testIndex"; + private static final TieredStoragePartitionId DEFAULT_PARTITION_ID = TieredStorageIdMappingUtils.convertId(new ResultPartitionID()); @@ -72,8 +74,10 @@ class ProducerMergedPartitionFileReaderTest { @BeforeEach void before() throws ExecutionException, InterruptedException { + Path testIndexPath = new File(tempFolder.toFile(), DEFAULT_TEST_INDEX_NAME).toPath(); ProducerMergedPartitionFileIndex partitionFileIndex = - new ProducerMergedPartitionFileIndex(DEFAULT_NUM_SUBPARTITION); + new ProducerMergedPartitionFileIndex( + DEFAULT_NUM_SUBPARTITION, testIndexPath, 256, Long.MAX_VALUE); testFilePath = new File(tempFolder.toFile(), DEFAULT_TEST_FILE_NAME).toPath(); ProducerMergedPartitionFileWriter partitionFileWriter = new ProducerMergedPartitionFileWriter(testFilePath, partitionFileIndex); @@ -133,11 +137,13 @@ class ProducerMergedPartitionFileReaderTest { AtomicInteger indexQueryTime = new AtomicInteger(0); TestingProducerMergedPartitionFileIndex partitionFileIndex = new TestingProducerMergedPartitionFileIndex.Builder() + .setIndexFilePath(new File(tempFolder.toFile(), "test-Index").toPath()) .setGetRegionFunction( (subpartitionId, integer) -> { indexQueryTime.incrementAndGet(); return Optional.of( - new ProducerMergedPartitionFileIndex.Region(0, 0, 2)); + new ProducerMergedPartitionFileIndex.FixedSizeRegion( + 0, 0, 2)); }) .build(); partitionFileReader = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileWriterTest.java index ddde5377567..be93b80b72b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileWriterTest.java @@ -50,6 +50,7 @@ class ProducerMergedPartitionFileWriterTest { AtomicInteger receivedBuffers = new AtomicInteger(0); TestingProducerMergedPartitionFileIndex partitionFileIndex = new TestingProducerMergedPartitionFileIndex.Builder() + .setIndexFilePath(new File(tempFolder.toFile(), "testIndex").toPath()) .setAddBuffersConsumer(buffers -> receivedBuffers.getAndAdd(buffers.size())) .build(); @@ -83,6 +84,7 @@ class ProducerMergedPartitionFileWriterTest { AtomicBoolean isReleased = new AtomicBoolean(false); TestingProducerMergedPartitionFileIndex partitionFileIndex = new TestingProducerMergedPartitionFileIndex.Builder() + .setIndexFilePath(new File(tempFolder.toFile(), "testIndex").toPath()) .setReleaseRunnable(() -> isReleased.set(true)) .build(); ProducerMergedPartitionFileWriter partitionFileWriter = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/TestingProducerMergedPartitionFileIndex.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/TestingProducerMergedPartitionFileIndex.java index 74a4bc8708d..52859f3b21c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/TestingProducerMergedPartitionFileIndex.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/TestingProducerMergedPartitionFileIndex.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import java.nio.file.Path; import java.util.List; import java.util.Optional; import java.util.function.BiFunction; @@ -30,17 +31,25 @@ public class TestingProducerMergedPartitionFileIndex extends ProducerMergedParti private final Consumer<List<FlushedBuffer>> addBuffersConsumer; - private final BiFunction<TieredStorageSubpartitionId, Integer, Optional<Region>> + private final BiFunction<TieredStorageSubpartitionId, Integer, Optional<FixedSizeRegion>> getRegionFunction; private final Runnable releaseRunnable; private TestingProducerMergedPartitionFileIndex( int numSubpartitions, + Path indexFilePath, + int regionGroupSizeInBytes, + long numRetainedInMemoryRegionsMax, Consumer<List<FlushedBuffer>> addBuffersConsumer, - BiFunction<TieredStorageSubpartitionId, Integer, Optional<Region>> getRegionFunction, + BiFunction<TieredStorageSubpartitionId, Integer, Optional<FixedSizeRegion>> + getRegionFunction, Runnable releaseRunnable) { - super(numSubpartitions); + super( + numSubpartitions, + indexFilePath, + regionGroupSizeInBytes, + numRetainedInMemoryRegionsMax); this.addBuffersConsumer = addBuffersConsumer; this.getRegionFunction = getRegionFunction; this.releaseRunnable = releaseRunnable; @@ -52,7 +61,8 @@ public class TestingProducerMergedPartitionFileIndex extends ProducerMergedParti } @Override - Optional<Region> getRegion(TieredStorageSubpartitionId subpartitionId, int bufferIndex) { + Optional<FixedSizeRegion> getRegion( + TieredStorageSubpartitionId subpartitionId, int bufferIndex) { return getRegionFunction.apply(subpartitionId, bufferIndex); } @@ -66,9 +76,15 @@ public class TestingProducerMergedPartitionFileIndex extends ProducerMergedParti private int numSubpartitions = 1; + private Path indexFilePath = null; + + private int regionGroupSizeInBytes = 256; + + private long numRetainedInMemoryRegionsMax = Long.MAX_VALUE; + private Consumer<List<FlushedBuffer>> addBuffersConsumer = flushedBuffers -> {}; - private BiFunction<TieredStorageSubpartitionId, Integer, Optional<Region>> + private BiFunction<TieredStorageSubpartitionId, Integer, Optional<FixedSizeRegion>> getRegionFunction = (tieredStorageSubpartitionId, integer) -> Optional.empty(); private Runnable releaseRunnable = () -> {}; @@ -81,6 +97,24 @@ public class TestingProducerMergedPartitionFileIndex extends ProducerMergedParti return this; } + public TestingProducerMergedPartitionFileIndex.Builder setIndexFilePath( + Path indexFilePath) { + this.indexFilePath = indexFilePath; + return this; + } + + public TestingProducerMergedPartitionFileIndex.Builder setRegionGroupSizeInBytes( + int regionGroupSizeInBytes) { + this.regionGroupSizeInBytes = regionGroupSizeInBytes; + return this; + } + + public TestingProducerMergedPartitionFileIndex.Builder setNumRetainedInMemoryRegionsMax( + long numRetainedInMemoryRegionsMax) { + this.numRetainedInMemoryRegionsMax = numRetainedInMemoryRegionsMax; + return this; + } + public TestingProducerMergedPartitionFileIndex.Builder setAddBuffersConsumer( Consumer<List<FlushedBuffer>> addBuffersConsumer) { this.addBuffersConsumer = addBuffersConsumer; @@ -88,7 +122,7 @@ public class TestingProducerMergedPartitionFileIndex extends ProducerMergedParti } public TestingProducerMergedPartitionFileIndex.Builder setGetRegionFunction( - BiFunction<TieredStorageSubpartitionId, Integer, Optional<Region>> + BiFunction<TieredStorageSubpartitionId, Integer, Optional<FixedSizeRegion>> getRegionFunction) { this.getRegionFunction = getRegionFunction; return this; @@ -102,7 +136,13 @@ public class TestingProducerMergedPartitionFileIndex extends ProducerMergedParti public TestingProducerMergedPartitionFileIndex build() { return new TestingProducerMergedPartitionFileIndex( - numSubpartitions, addBuffersConsumer, getRegionFunction, releaseRunnable); + numSubpartitions, + indexFilePath, + regionGroupSizeInBytes, + numRetainedInMemoryRegionsMax, + addBuffersConsumer, + getRegionFunction, + releaseRunnable); } } }
