This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 910dc1b0babd6b00eea5b7052de5a20d5babfe28
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Jan 5 15:03:34 2023 +0800

    [FLINK-30332][network] Introduce HsFileDataIndexSpilledRegionManager to 
manage spilled regions.
---
 .../partition/hybrid/HsFileDataIndexImpl.java      |   2 +-
 .../HsFileDataIndexSpilledRegionManager.java       |  58 +++
 .../HsFileDataIndexSpilledRegionManagerImpl.java   | 402 +++++++++++++++++++++
 ...sFileDataIndexSpilledRegionManagerImplTest.java | 195 ++++++++++
 .../partition/hybrid/HybridShuffleTestUtils.java   |  37 ++
 .../TestingFileDataIndexSpilledRegionManager.java  | 111 ++++++
 6 files changed, 804 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
index de0081b559d..5249b644b5c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
@@ -198,7 +198,7 @@ public class HsFileDataIndexImpl implements HsFileDataIndex 
{
             this.released = released;
         }
 
-        private boolean containBuffer(int bufferIndex) {
+        boolean containBuffer(int bufferIndex) {
             return bufferIndex >= firstBufferIndex && bufferIndex < 
firstBufferIndex + numBuffers;
         }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManager.java
new file mode 100644
index 00000000000..e0a413a5c9f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManager.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.function.BiConsumer;
+
+/** This class is responsible for spilling region to disk and managing these 
spilled regions. */
+public interface HsFileDataIndexSpilledRegionManager extends AutoCloseable {
+    /**
+     * Write this region to index file. If target region already spilled, 
overwrite it.
+     *
+     * @param subpartition the subpartition id of this region.
+     * @param region the region to be spilled to index file.
+     */
+    void appendOrOverwriteRegion(int subpartition, InternalRegion region) 
throws IOException;
+
+    /**
+     * Find the region contains target bufferIndex and belong to target 
subpartition.
+     *
+     * @param subpartition the subpartition id that target region belong to.
+     * @param bufferIndex the buffer index that target region contains.
+     * @param loadToCache whether to load the found region into the cache.
+     * @return if target region can be founded, return it's offset in index 
file. Otherwise, return
+     *     -1.
+     */
+    long findRegion(int subpartition, int bufferIndex, boolean loadToCache);
+
+    /** Close this spilled region manager. */
+    void close() throws IOException;
+
+    /** Factory of {@link HsFileDataIndexSpilledRegionManager}. */
+    interface Factory {
+        HsFileDataIndexSpilledRegionManager create(
+                int numSubpartitions,
+                Path indexFilePath,
+                BiConsumer<Integer, InternalRegion> cacheRegionConsumer);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImpl.java
new file mode 100644
index 00000000000..c87c0242b7e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImpl.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.function.BiConsumer;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.InternalRegionWriteReadUtils.allocateAndConfigureBuffer;
+
+/**
+ * Default implementation of {@link HsFileDataIndexSpilledRegionManager}. This 
manager will handle
+ * and spill regions in the following way:
+ *
+ * <ul>
+ *   <li>All regions will be written to the same file, namely index file.
+ *   <li>Multiple regions belonging to the same subpartition form a segment.
+ *   <li>The regions in the same segment have no special relationship, but are 
only related to the
+ *       order in which they are spilled.
+ *   <li>Each segment is independent. Even if the previous segment is not 
full, the next segment can
+ *       still be allocated.
+ *   <li>If a region has been written to the index file already, spill it 
again will overwrite the
+ *       previous region.
+ *   <li>The very large region will monopolize a single segment.
+ * </ul>
+ *
+ * <p>The relationships between index file and segment are shown below.
+ *
+ * <pre>
+ *
+ *         - - - - - - - - - Index File - - — - - - - - - - - -
+ *        |                                                     |
+ *        | - - — -Segment1 - - - -   - - - - Segment2- - - -   |
+ *        ||SP1 R1||SP1 R2| Free | |SP2 R3| SP2 R1| SP2 R2 |  |
+ *        | - - - - - - - - - - - -   - - - - - - - - - - - -   |
+ *        |                                                     |
+ *        | - - - - - - - -Segment3 - - - - - - -               |
+ *        ||              Big Region             |              |
+ *        | - - - - - - - - - - - - - - - - - - -               |
+ *         - - - - - - - - - - - - - - - - - - - - - -- - - - -
+ * </pre>
+ */
+public class HsFileDataIndexSpilledRegionManagerImpl
+        implements HsFileDataIndexSpilledRegionManager {
+
+    /** Reusable buffer used to read and write the immutable part of region. */
+    private final ByteBuffer regionHeaderBuffer =
+            allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE);
+
+    /**
+     * List of subpartition's segment meta. Each element is a treeMap contains 
all {@link
+     * SegmentMeta}'s of specific subpartition corresponding to the subscript. 
The value of this
+     * treeMap is a {@link SegmentMeta}, and the key is minBufferIndex of this 
segment. Only
+     * finished(i.e. no longer appended) segment will be put to here.
+     */
+    private final List<TreeMap<Integer, SegmentMeta>> 
subpartitionFinishedSegmentMetas;
+
+    private FileChannel channel;
+
+    /** The Offset of next segment, new segment will start from this offset. */
+    private long nextSegmentOffset = 0L;
+
+    private final long[] subpartitionCurrentOffset;
+
+    /** Free space of every subpartition's current segment. */
+    private final int[] subpartitionFreeSpaceInBytes;
+
+    /** Metadata of every subpartition's current segment. */
+    private final SegmentMeta[] currentSegmentMeta;
+
+    /**
+     * Default size of segment. If the size of a region is larger than this 
value, it will be
+     * allocated and occupy a single segment.
+     */
+    private final int segmentSizeInBytes;
+
+    /**
+     * This consumer is used to load region to cache. The first parameter is 
subpartition id, and
+     * second parameter is the region to load.
+     */
+    private final BiConsumer<Integer, InternalRegion> cacheRegionConsumer;
+
+    /**
+     * When region in segment needs to be loaded to cache, whether to load all 
regions of the entire
+     * segment.
+     */
+    private final boolean loadEntireSegmentToCache;
+
+    public HsFileDataIndexSpilledRegionManagerImpl(
+            int numSubpartitions,
+            Path indexFilePath,
+            int segmentSizeInBytes,
+            long maxCacheCapacity,
+            BiConsumer<Integer, InternalRegion> cacheRegionConsumer) {
+        try {
+            this.channel =
+                    FileChannel.open(
+                            indexFilePath,
+                            StandardOpenOption.CREATE_NEW,
+                            StandardOpenOption.READ,
+                            StandardOpenOption.WRITE);
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e);
+        }
+        this.loadEntireSegmentToCache =
+                shouldLoadEntireSegmentToCache(
+                        numSubpartitions, segmentSizeInBytes, 
maxCacheCapacity);
+        this.subpartitionFinishedSegmentMetas = new 
ArrayList<>(numSubpartitions);
+        this.subpartitionCurrentOffset = new long[numSubpartitions];
+        this.subpartitionFreeSpaceInBytes = new int[numSubpartitions];
+        this.currentSegmentMeta = new SegmentMeta[numSubpartitions];
+        for (int i = 0; i < numSubpartitions; i++) {
+            subpartitionFinishedSegmentMetas.add(new TreeMap<>());
+        }
+        this.cacheRegionConsumer = cacheRegionConsumer;
+        this.segmentSizeInBytes = segmentSizeInBytes;
+    }
+
+    @Override
+    public long findRegion(int subpartition, int bufferIndex, boolean 
loadToCache) {
+        // first of all, find the region from current writing segment.
+        SegmentMeta segmentMeta = currentSegmentMeta[subpartition];
+        if (segmentMeta != null) {
+            long regionOffset =
+                    findRegionInSegment(subpartition, bufferIndex, 
segmentMeta, loadToCache);
+            if (regionOffset != -1) {
+                return regionOffset;
+            }
+        }
+
+        // next, find the region from finished segments.
+        TreeMap<Integer, SegmentMeta> subpartitionSegmentMetaTreeMap =
+                subpartitionFinishedSegmentMetas.get(subpartition);
+        // all segments with a minBufferIndex less than or equal to this 
target buffer index may
+        // contain the target region.
+        for (SegmentMeta meta :
+                subpartitionSegmentMetaTreeMap.headMap(bufferIndex, 
true).values()) {
+            long regionOffset = findRegionInSegment(subpartition, bufferIndex, 
meta, loadToCache);
+            if (regionOffset != -1) {
+                return regionOffset;
+            }
+        }
+        return -1;
+    }
+
+    private long findRegionInSegment(
+            int subpartition, int bufferIndex, SegmentMeta meta, boolean 
loadToCache) {
+        if (bufferIndex <= meta.getMaxBufferIndex()) {
+            try {
+                return readSegmentAndLoadToCacheIfNeeded(
+                        subpartition, bufferIndex, meta, loadToCache);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        // -1 indicates that target region is not founded from this segment
+        return -1;
+    }
+
+    private long readSegmentAndLoadToCacheIfNeeded(
+            int subpartition, int bufferIndex, SegmentMeta meta, boolean 
loadToCache)
+            throws IOException {
+        // read all regions belong to this segment.
+        List<Tuple2<InternalRegion, Long>> regionAndOffsets =
+                readSegment(meta.getOffset(), meta.getNumRegions());
+        // -1 indicates that target region is not founded from this segment.
+        long targetRegionOffset = -1;
+        InternalRegion targetRegion = null;
+        // traverse all regions to find target.
+        Iterator<Tuple2<InternalRegion, Long>> it = 
regionAndOffsets.iterator();
+        while (it.hasNext()) {
+            Tuple2<InternalRegion, Long> regionAndOffset = it.next();
+            InternalRegion region = regionAndOffset.f0;
+            // whether the region contains this buffer.
+            if (region.containBuffer(bufferIndex)) {
+                // target region is founded.
+                targetRegion = region;
+                targetRegionOffset = regionAndOffset.f1;
+                it.remove();
+            }
+        }
+
+        // target region is founded and need to load to cache.
+        if (targetRegion != null && loadToCache) {
+            if (loadEntireSegmentToCache) {
+                // first of all, load all regions except target to cache.
+                regionAndOffsets.forEach(
+                        (regionAndOffsetTuple) ->
+                                cacheRegionConsumer.accept(subpartition, 
regionAndOffsetTuple.f0));
+                // load target region to cache in the end, this is to prevent 
the target
+                // from being eliminated.
+                cacheRegionConsumer.accept(subpartition, targetRegion);
+            } else {
+                // only load target region to cache.
+                cacheRegionConsumer.accept(subpartition, targetRegion);
+            }
+        }
+        // return the offset of target region.
+        return targetRegionOffset;
+    }
+
+    @Override
+    public void appendOrOverwriteRegion(int subpartition, InternalRegion 
newRegion)
+            throws IOException {
+        // This method will only be called when we want to eliminate a region. 
We can't let the
+        // region be reloaded into the cache, otherwise it will lead to an 
infinite loop.
+        long oldRegionOffset = findRegion(subpartition, 
newRegion.getFirstBufferIndex(), false);
+        if (oldRegionOffset != -1) {
+            // if region is already exists in file, overwrite it.
+            writeRegionToOffset(oldRegionOffset, newRegion);
+        } else {
+            // otherwise, append region to segment.
+            appendRegion(subpartition, newRegion);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (channel != null) {
+            channel.close();
+        }
+    }
+
+    private static boolean shouldLoadEntireSegmentToCache(
+            int numSubpartitions, int segmentSizeInBytes, long 
maxCacheCapacity) {
+        // If the cache can put at least two segments (one for reading and one 
for writing) for each
+        // subpartition, it is reasonable to load the entire segment into 
memory, which can improve
+        // the cache hit rate. On the contrary, if the cache capacity is 
small, loading a large
+        // number of regions will lead to performance degradation,only the 
target region should be
+        // loaded.
+        return ((long) 2 * numSubpartitions * segmentSizeInBytes) / 
InternalRegion.HEADER_SIZE
+                <= maxCacheCapacity;
+    }
+
+    private void appendRegion(int subpartition, InternalRegion region) throws 
IOException {
+        int regionSize = region.getSize();
+        // check whether we have enough space to append this region.
+        if (subpartitionFreeSpaceInBytes[subpartition] < regionSize) {
+            // No enough free space, start a new segment. Note that if region 
is larger than
+            // segment's size, this will start a new segment only contains the 
big region.
+            startNewSegment(subpartition, Math.max(regionSize, 
segmentSizeInBytes));
+        }
+        // spill this region to current offset of file index.
+        writeRegionToOffset(subpartitionCurrentOffset[subpartition], region);
+        // a new region was appended to segment, update it.
+        updateSegment(subpartition, region);
+    }
+
+    private void writeRegionToOffset(long offset, InternalRegion region) 
throws IOException {
+        channel.position(offset);
+        InternalRegionWriteReadUtils.writeRegionToFile(channel, 
regionHeaderBuffer, region);
+    }
+
+    private void startNewSegment(int subpartition, int newSegmentSize) {
+        SegmentMeta oldSegmentMeta = currentSegmentMeta[subpartition];
+        currentSegmentMeta[subpartition] = new SegmentMeta(nextSegmentOffset);
+        subpartitionCurrentOffset[subpartition] = nextSegmentOffset;
+        nextSegmentOffset += newSegmentSize;
+        subpartitionFreeSpaceInBytes[subpartition] = newSegmentSize;
+        if (oldSegmentMeta != null) {
+            // put the finished segment to subpartitionFinishedSegmentMetas.
+            subpartitionFinishedSegmentMetas
+                    .get(subpartition)
+                    .put(oldSegmentMeta.minBufferIndex, oldSegmentMeta);
+        }
+    }
+
+    private void updateSegment(int subpartition, InternalRegion region) {
+        int regionSize = region.getSize();
+        subpartitionFreeSpaceInBytes[subpartition] -= regionSize;
+        subpartitionCurrentOffset[subpartition] += regionSize;
+        SegmentMeta segmentMeta = currentSegmentMeta[subpartition];
+        segmentMeta.addRegion(
+                region.getFirstBufferIndex(),
+                region.getFirstBufferIndex() + region.getNumBuffers() - 1);
+    }
+
+    /**
+     * Read segment from index file.
+     *
+     * @param offset offset of this segment.
+     * @param numRegions number of regions of this segment.
+     * @return List of all regions and its offset belong to this segment.
+     */
+    private List<Tuple2<InternalRegion, Long>> readSegment(long offset, int 
numRegions)
+            throws IOException {
+        List<Tuple2<InternalRegion, Long>> regionAndOffsets = new 
ArrayList<>();
+        for (int i = 0; i < numRegions; i++) {
+            InternalRegion region =
+                    InternalRegionWriteReadUtils.readRegionFromFile(
+                            channel, regionHeaderBuffer, offset);
+            regionAndOffsets.add(Tuple2.of(region, offset));
+            offset += region.getSize();
+        }
+        return regionAndOffsets;
+    }
+
+    /**
+     * Metadata of spilled regions segment. When a segment is finished(i.e. no 
longer appended), its
+     * corresponding {@link SegmentMeta} becomes immutable.
+     */
+    private static class SegmentMeta {
+        /**
+         * Minimum buffer index of this segment. It is the smallest 
bufferIndex(inclusive) in all
+         * regions belong to this segment.
+         */
+        private int minBufferIndex;
+
+        /**
+         * Maximum buffer index of this segment. It is the largest 
bufferIndex(inclusive) in all
+         * regions belong to this segment.
+         */
+        private int maxBufferIndex;
+
+        /** Number of regions belong to this segment. */
+        private int numRegions;
+
+        /** The index file offset of this segment. */
+        private final long offset;
+
+        public SegmentMeta(long offset) {
+            this.offset = offset;
+            this.minBufferIndex = Integer.MAX_VALUE;
+            this.maxBufferIndex = 0;
+            this.numRegions = 0;
+        }
+
+        public int getMaxBufferIndex() {
+            return maxBufferIndex;
+        }
+
+        public long getOffset() {
+            return offset;
+        }
+
+        public int getNumRegions() {
+            return numRegions;
+        }
+
+        public void addRegion(int firstBufferIndexOfRegion, int 
maxBufferIndexOfRegion) {
+            if (firstBufferIndexOfRegion < minBufferIndex) {
+                this.minBufferIndex = firstBufferIndexOfRegion;
+            }
+            if (maxBufferIndexOfRegion > maxBufferIndex) {
+                this.maxBufferIndex = maxBufferIndexOfRegion;
+            }
+            this.numRegions++;
+        }
+    }
+
+    /** Factory of {@link HsFileDataIndexSpilledRegionManager}. */
+    public static class Factory implements 
HsFileDataIndexSpilledRegionManager.Factory {
+        private final int segmentSizeInBytes;
+
+        private final long maxCacheCapacity;
+
+        public Factory(int segmentSizeInBytes, long maxCacheCapacity) {
+            this.segmentSizeInBytes = segmentSizeInBytes;
+            this.maxCacheCapacity = maxCacheCapacity;
+        }
+
+        @Override
+        public HsFileDataIndexSpilledRegionManager create(
+                int numSubpartitions,
+                Path indexFilePath,
+                BiConsumer<Integer, InternalRegion> cacheRegionConsumer) {
+            return new HsFileDataIndexSpilledRegionManagerImpl(
+                    numSubpartitions,
+                    indexFilePath,
+                    segmentSizeInBytes,
+                    maxCacheCapacity,
+                    cacheRegionConsumer);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImplTest.java
new file mode 100644
index 00000000000..bb4c31a4f2d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImplTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.assertRegionEquals;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createAllUnreleasedRegions;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createSingleUnreleasedRegion;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.InternalRegionWriteReadUtils.allocateAndConfigureBuffer;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HsFileDataIndexSpilledRegionManagerImpl}. */
+class HsFileDataIndexSpilledRegionManagerImplTest {
+    private Path indexFilePath;
+
+    @BeforeEach
+    void before(@TempDir Path tmpPath) {
+        indexFilePath = tmpPath.resolve(UUID.randomUUID().toString());
+    }
+
+    @Test
+    void testFindNonExistentRegion() throws Exception {
+        CompletableFuture<Void> cachedRegionFuture = new CompletableFuture<>();
+        try (HsFileDataIndexSpilledRegionManager spilledRegionManager =
+                createSpilledRegionManager(
+                        (ignore1, ignore2) -> 
cachedRegionFuture.complete(null))) {
+            long regionOffset = spilledRegionManager.findRegion(0, 0, true);
+            assertThat(regionOffset).isEqualTo(-1);
+            assertThat(cachedRegionFuture).isNotCompleted();
+        }
+    }
+
+    @Test
+    void testAppendOrOverwriteRegion() throws Exception {
+        CompletableFuture<Void> cachedRegionFuture = new CompletableFuture<>();
+        try (HsFileDataIndexSpilledRegionManager spilledRegionManager =
+                createSpilledRegionManager(
+                        (ignore1, ignore2) -> 
cachedRegionFuture.complete(null))) {
+            InternalRegion region = createSingleUnreleasedRegion(0, 0L, 1);
+            // append region to index file.
+            spilledRegionManager.appendOrOverwriteRegion(0, region);
+            assertThat(cachedRegionFuture).isNotCompleted();
+            FileChannel indexFileChannel = FileChannel.open(indexFilePath, 
StandardOpenOption.READ);
+            InternalRegion readRegion =
+                    InternalRegionWriteReadUtils.readRegionFromFile(
+                            indexFileChannel,
+                            
allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE),
+                            0L);
+            assertRegionEquals(readRegion, region);
+
+            // new region must have the same size of old region.
+            InternalRegion newRegion = createSingleUnreleasedRegion(0, 10L, 1);
+            // overwrite old region.
+            spilledRegionManager.appendOrOverwriteRegion(0, newRegion);
+            // appendOrOverwriteRegion will not trigger cache load.
+            assertThat(cachedRegionFuture).isNotCompleted();
+            InternalRegion readNewRegion =
+                    InternalRegionWriteReadUtils.readRegionFromFile(
+                            indexFileChannel,
+                            
allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE),
+                            0L);
+            assertRegionEquals(readNewRegion, newRegion);
+        }
+    }
+
+    @Test
+    void testWriteMoreThanOneSegment() throws Exception {
+        List<InternalRegion> regions = createAllUnreleasedRegions(0, 0L, 2, 2);
+        int segmentSize = 
regions.stream().mapToInt(InternalRegion::getSize).sum() + 1;
+        try (HsFileDataIndexSpilledRegionManager spilledRegionManager =
+                createSpilledRegionManager(segmentSize, (ignore1, ignore2) -> 
{})) {
+            spilledRegionManager.appendOrOverwriteRegion(0, regions.get(0));
+            spilledRegionManager.appendOrOverwriteRegion(0, regions.get(1));
+            // segment has no enough space, will start new segment.
+            InternalRegion regionInNewSegment = 
createSingleUnreleasedRegion(4, 4L, 2);
+            spilledRegionManager.appendOrOverwriteRegion(0, 
regionInNewSegment);
+            FileChannel indexFileChannel = FileChannel.open(indexFilePath, 
StandardOpenOption.READ);
+            InternalRegion readRegion =
+                    InternalRegionWriteReadUtils.readRegionFromFile(
+                            indexFileChannel,
+                            
allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE),
+                            // offset is segment size instead of two regions 
size to prove that new
+                            // segment is started.
+                            segmentSize);
+            assertRegionEquals(readRegion, regionInNewSegment);
+        }
+    }
+
+    @Test
+    void testWriteBigRegion() throws Exception {
+        int segmentSize = 4;
+        try (HsFileDataIndexSpilledRegionManager spilledRegionManager =
+                createSpilledRegionManager(segmentSize, (ignore1, ignore2) -> 
{})) {
+            List<InternalRegion> regions = createAllUnreleasedRegions(0, 0L, 
1, 2);
+            InternalRegion region1 = regions.get(0);
+            InternalRegion region2 = regions.get(1);
+            assertThat(region1.getSize()).isGreaterThan(segmentSize);
+            assertThat(region2.getSize()).isGreaterThan(segmentSize);
+
+            spilledRegionManager.appendOrOverwriteRegion(0, region1);
+            spilledRegionManager.appendOrOverwriteRegion(0, region2);
+            FileChannel indexFileChannel = FileChannel.open(indexFilePath, 
StandardOpenOption.READ);
+            InternalRegion readRegion1 =
+                    InternalRegionWriteReadUtils.readRegionFromFile(
+                            indexFileChannel,
+                            
allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE),
+                            0L);
+            assertRegionEquals(readRegion1, region1);
+
+            InternalRegion readRegion2 =
+                    InternalRegionWriteReadUtils.readRegionFromFile(
+                            indexFileChannel,
+                            
allocateAndConfigureBuffer(InternalRegion.HEADER_SIZE),
+                            readRegion1.getSize());
+            assertRegionEquals(readRegion2, region2);
+        }
+    }
+
+    @Test
+    void testFindRegionFirstBufferIndexInMultipleSegments() throws Exception {
+        final int numBuffersPerRegion = 2;
+        final int subpartition = 0;
+        List<InternalRegion> loadedRegions = new ArrayList<>();
+        try (HsFileDataIndexSpilledRegionManager spilledRegionManager =
+                createSpilledRegionManager(
+                        // every segment can store two regions.
+                        (InternalRegion.HEADER_SIZE + numBuffersPerRegion) * 2,
+                        (ignore, region) -> loadedRegions.add(region))) {
+            // segment1: region1(0-2), region2(9-11), min:0, max: 11
+            spilledRegionManager.appendOrOverwriteRegion(
+                    subpartition, createSingleUnreleasedRegion(0, 0L, 
numBuffersPerRegion));
+            spilledRegionManager.appendOrOverwriteRegion(
+                    subpartition, createSingleUnreleasedRegion(9, 9L, 
numBuffersPerRegion));
+
+            // segment2: region1(2-4), region2(11-13) min: 2, max: 13
+            InternalRegion targetRegion = createSingleUnreleasedRegion(2, 2L, 
2);
+            spilledRegionManager.appendOrOverwriteRegion(subpartition, 
targetRegion);
+            spilledRegionManager.appendOrOverwriteRegion(
+                    subpartition, createSingleUnreleasedRegion(11, 11L, 2));
+
+            // segment3: region1(7-9) min: 7, max: 9
+            spilledRegionManager.appendOrOverwriteRegion(
+                    subpartition, createSingleUnreleasedRegion(7, 7L, 2));
+
+            // find target region
+            long regionOffset = spilledRegionManager.findRegion(subpartition, 
3, true);
+            assertThat(regionOffset).isNotEqualTo(-1L);
+            assertThat(loadedRegions).hasSize(2);
+            // target region must be put to the cache last.
+            assertRegionEquals(loadedRegions.get(1), targetRegion);
+        }
+    }
+
+    private HsFileDataIndexSpilledRegionManager createSpilledRegionManager(
+            BiConsumer<Integer, InternalRegion> cacheRegionConsumer) {
+        return createSpilledRegionManager(256, cacheRegionConsumer);
+    }
+
+    private HsFileDataIndexSpilledRegionManager createSpilledRegionManager(
+            int segmentSize, BiConsumer<Integer, InternalRegion> 
cacheRegionConsumer) {
+        int numSubpartitions = 2;
+        return new 
HsFileDataIndexSpilledRegionManagerImpl.Factory(segmentSize, Long.MAX_VALUE)
+                .create(numSubpartitions, indexFilePath, cacheRegionConsumer);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
index bd26b6d56a8..279f12d2782 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
@@ -24,12 +24,15 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.List;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /** Test utils for hybrid shuffle mode. */
 public class HybridShuffleTestUtils {
     public static final int MEMORY_SEGMENT_SIZE = 128;
@@ -69,4 +72,38 @@ public class HybridShuffleTestUtils {
     public static HsOutputMetrics createTestingOutputMetrics() {
         return new HsOutputMetrics(new TestCounter(), new TestCounter());
     }
+
+    public static InternalRegion createSingleUnreleasedRegion(
+            int firstBufferIndex, long firstBufferOffset, int 
numBuffersPerRegion) {
+        return new InternalRegion(
+                firstBufferIndex,
+                firstBufferOffset,
+                numBuffersPerRegion,
+                new boolean[numBuffersPerRegion]);
+    }
+
+    public static List<InternalRegion> createAllUnreleasedRegions(
+            int firstBufferIndex, long firstBufferOffset, int 
numBuffersPerRegion, int numRegions) {
+        List<InternalRegion> regions = new ArrayList<>();
+        int bufferIndex = firstBufferIndex;
+        long bufferOffset = firstBufferOffset;
+        for (int i = 0; i < numRegions; i++) {
+            regions.add(
+                    new InternalRegion(
+                            bufferIndex,
+                            bufferOffset,
+                            numBuffersPerRegion,
+                            new boolean[numBuffersPerRegion]));
+            bufferIndex += numBuffersPerRegion;
+            bufferOffset += bufferOffset;
+        }
+        return regions;
+    }
+
+    public static void assertRegionEquals(InternalRegion expected, 
InternalRegion region) {
+        
assertThat(region.getFirstBufferIndex()).isEqualTo(expected.getFirstBufferIndex());
+        
assertThat(region.getFirstBufferOffset()).isEqualTo(expected.getFirstBufferOffset());
+        assertThat(region.getNumBuffers()).isEqualTo(region.getNumBuffers());
+        assertThat(region.getReleased()).isEqualTo(region.getReleased());
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndexSpilledRegionManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndexSpilledRegionManager.java
new file mode 100644
index 00000000000..0c287c9fb25
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingFileDataIndexSpilledRegionManager.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.function.BiConsumer;
+
+/** Mock {@link HsFileDataIndexSpilledRegionManager} for testing. */
+public class TestingFileDataIndexSpilledRegionManager
+        implements HsFileDataIndexSpilledRegionManager {
+    private final List<TreeMap<Integer, InternalRegion>> regions;
+
+    private final BiConsumer<Integer, InternalRegion> cacheRegionConsumer;
+
+    private int findRegionInvoked = 0;
+
+    public TestingFileDataIndexSpilledRegionManager(
+            int numSubpartitions, BiConsumer<Integer, InternalRegion> 
cacheRegionConsumer) {
+        this.regions = new ArrayList<>();
+        this.cacheRegionConsumer = cacheRegionConsumer;
+        for (int i = 0; i < numSubpartitions; i++) {
+            regions.add(new TreeMap<>());
+        }
+    }
+
+    @Nullable
+    public InternalRegion getRegion(int subpartition, int bufferIndex) {
+        return regions.get(subpartition).get(bufferIndex);
+    }
+
+    public int getSpilledRegionSize(int subpartition) {
+        return regions.get(subpartition).size();
+    }
+
+    public int getFindRegionInvoked() {
+        return findRegionInvoked;
+    }
+
+    @Override
+    public void appendOrOverwriteRegion(int subpartition, InternalRegion 
region)
+            throws IOException {
+        regions.get(subpartition).put(region.getFirstBufferIndex(), region);
+    }
+
+    @Override
+    public long findRegion(int subpartition, int bufferIndex, boolean 
loadToCache) {
+        findRegionInvoked++;
+        InternalRegion region = regions.get(subpartition).get(bufferIndex);
+        if (region == null) {
+            return -1;
+        } else {
+            // return non -1 value indicates this region is exists.
+            if (loadToCache) {
+                cacheRegionConsumer.accept(subpartition, region);
+            }
+            return 1;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        // do nothing.
+    }
+
+    /** Factory of {@link TestingFileDataIndexSpilledRegionManager}. */
+    public static class Factory implements 
HsFileDataIndexSpilledRegionManager.Factory {
+        public static final Factory INSTANCE = new Factory();
+
+        public TestingFileDataIndexSpilledRegionManager 
lastSpilledRegionManager;
+
+        public TestingFileDataIndexSpilledRegionManager 
getLastSpilledRegionManager() {
+            return lastSpilledRegionManager;
+        }
+
+        @Override
+        public HsFileDataIndexSpilledRegionManager create(
+                int numSubpartitions,
+                Path indexFilePath,
+                BiConsumer<Integer, InternalRegion> cacheRegionConsumer) {
+            TestingFileDataIndexSpilledRegionManager 
testingFileDataIndexSpilledRegionManager =
+                    new TestingFileDataIndexSpilledRegionManager(
+                            numSubpartitions, cacheRegionConsumer);
+            lastSpilledRegionManager = 
testingFileDataIndexSpilledRegionManager;
+            return testingFileDataIndexSpilledRegionManager;
+        }
+    }
+}


Reply via email to