This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e6dc74ac9c7e33e50296e6c86d658bc86149a876 Author: Weijie Guo <[email protected]> AuthorDate: Thu Jan 5 15:24:58 2023 +0800 [FLINK-30332][network] Introduce HsFileDataIndexCache to cache and manage index data. --- .../partition/hybrid/HsFileDataIndexCache.java | 245 +++++++++++++++++++++ .../partition/hybrid/HsFileDataIndexCacheTest.java | 142 ++++++++++++ 2 files changed, 387 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCache.java new file mode 100644 index 00000000000..3b05d8d6fa0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCache.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; + +import org.apache.flink.shaded.guava30.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava30.com.google.common.cache.RemovalNotification; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.TreeMap; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A cache layer of hybrid data index. This class encapsulates the logic of the index's put and get, + * and automatically caches some indexes in memory. When there are too many cached indexes, it is + * this class's responsibility to decide and eliminate some indexes to disk. + */ +public class HsFileDataIndexCache { + /** + * This struct stores all in memory {@link InternalRegion}s. Each element is a treeMap contains + * all in memory {@link InternalRegion}'s of specific subpartition corresponding to the + * subscript. The value of this treeMap is a {@link InternalRegion}, and the key is + * firstBufferIndex of this region. Only cached in memory region will be put to here. + */ + private final List<TreeMap<Integer, InternalRegion>> + subpartitionFirstBufferIndexInternalRegions; + + /** + * This cache is used to help eliminate regions from memory. It is only maintains the key of + * each in memory region, the value is just a placeholder. Note that this internal cache must be + * consistent with subpartitionFirstBufferIndexInternalRegions, that means both of them must add + * or delete elements at the same time. + */ + private final Cache<CachedRegionKey, Object> internalCache; + + private final HsFileDataIndexSpilledRegionManager spilledRegionManager; + + private final Path indexFilePath; + + /** + * Placeholder of cache entry's value. Because the cache is only used for managing region's + * elimination, does not need the real region as value. + */ + public static final Object PLACEHOLDER = new Object(); + + public HsFileDataIndexCache( + int numSubpartitions, + Path indexFilePath, + long numRetainedInMemoryRegionsMax, + HsFileDataIndexSpilledRegionManager.Factory spilledRegionManagerFactory) { + this.subpartitionFirstBufferIndexInternalRegions = new ArrayList<>(numSubpartitions); + for (int subpartitionId = 0; subpartitionId < numSubpartitions; ++subpartitionId) { + subpartitionFirstBufferIndexInternalRegions.add(new TreeMap<>()); + } + this.internalCache = + CacheBuilder.newBuilder() + .maximumSize(numRetainedInMemoryRegionsMax) + .removalListener(this::handleRemove) + .build(); + this.indexFilePath = checkNotNull(indexFilePath); + this.spilledRegionManager = + spilledRegionManagerFactory.create( + numSubpartitions, + indexFilePath, + (subpartition, region) -> { + if (!getCachedRegionContainsTargetBufferIndex( + subpartition, region.getFirstBufferIndex()) + .isPresent()) { + subpartitionFirstBufferIndexInternalRegions + .get(subpartition) + .put(region.getFirstBufferIndex(), region); + internalCache.put( + new CachedRegionKey( + subpartition, region.getFirstBufferIndex()), + PLACEHOLDER); + } else { + // this is needed for cache entry remove algorithm like LRU. + internalCache.getIfPresent( + new CachedRegionKey( + subpartition, region.getFirstBufferIndex())); + } + }); + } + + /** + * Get a region contains target bufferIndex and belong to target subpartition. + * + * @param subpartitionId the subpartition that target buffer belong to. + * @param bufferIndex the index of target buffer. + * @return If target region can be founded from memory or disk, return optional contains target + * region. Otherwise, return {@code Optional#empty()}; + */ + public Optional<InternalRegion> get(int subpartitionId, int bufferIndex) { + // first of all, try to get region in memory. + Optional<InternalRegion> regionOpt = + getCachedRegionContainsTargetBufferIndex(subpartitionId, bufferIndex); + if (regionOpt.isPresent()) { + InternalRegion region = regionOpt.get(); + checkNotNull( + // this is needed for cache entry remove algorithm like LRU. + internalCache.getIfPresent( + new CachedRegionKey(subpartitionId, region.getFirstBufferIndex()))); + return Optional.of(region); + } else { + // try to find target region and load it into cache if founded. + spilledRegionManager.findRegion(subpartitionId, bufferIndex, true); + return getCachedRegionContainsTargetBufferIndex(subpartitionId, bufferIndex); + } + } + + /** + * Put regions to cache. + * + * @param subpartition the subpartition's id of regions. + * @param internalRegions regions to be cached. + */ + public void put(int subpartition, List<InternalRegion> internalRegions) { + TreeMap<Integer, InternalRegion> treeMap = + subpartitionFirstBufferIndexInternalRegions.get(subpartition); + for (InternalRegion internalRegion : internalRegions) { + internalCache.put( + new CachedRegionKey(subpartition, internalRegion.getFirstBufferIndex()), + PLACEHOLDER); + treeMap.put(internalRegion.getFirstBufferIndex(), internalRegion); + } + } + + /** + * Close {@link HsFileDataIndexCache}, this will delete the index file. After that, the index + * can no longer be read or written. + */ + public void close() throws IOException { + spilledRegionManager.close(); + IOUtils.deleteFileQuietly(indexFilePath); + } + + // This is a callback after internal cache removed an entry from itself. + private void handleRemove(RemovalNotification<CachedRegionKey, Object> removedEntry) { + CachedRegionKey removedKey = removedEntry.getKey(); + // remove the corresponding region from memory. + InternalRegion removedRegion = + subpartitionFirstBufferIndexInternalRegions + .get(removedKey.getSubpartition()) + .remove(removedKey.getFirstBufferIndex()); + + // write this region to file. After that, no strong reference point to this region, it can + // be safely released by gc. + writeRegion(removedKey.getSubpartition(), removedRegion); + } + + private void writeRegion(int subpartition, InternalRegion region) { + try { + spilledRegionManager.appendOrOverwriteRegion(subpartition, region); + } catch (IOException e) { + ExceptionUtils.rethrow(e); + } + } + + /** + * Get the cached in memory region contains target buffer. + * + * @param subpartitionId the subpartition that target buffer belong to. + * @param bufferIndex the index of target buffer. + * @return If target region is cached in memory, return optional contains target region. + * Otherwise, return {@code Optional#empty()}; + */ + private Optional<InternalRegion> getCachedRegionContainsTargetBufferIndex( + int subpartitionId, int bufferIndex) { + return Optional.ofNullable( + subpartitionFirstBufferIndexInternalRegions + .get(subpartitionId) + .floorEntry(bufferIndex)) + .map(Map.Entry::getValue) + .filter(internalRegion -> internalRegion.containBuffer(bufferIndex)); + } + + /** + * This class represents the key of cached region, it is uniquely identified by the region's + * subpartition id and firstBufferIndex. + */ + private static class CachedRegionKey { + /** The subpartition id of cached region. */ + private final int subpartition; + + /** The first buffer's index of cached region. */ + private final int firstBufferIndex; + + public CachedRegionKey(int subpartition, int firstBufferIndex) { + this.subpartition = subpartition; + this.firstBufferIndex = firstBufferIndex; + } + + public int getSubpartition() { + return subpartition; + } + + public int getFirstBufferIndex() { + return firstBufferIndex; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CachedRegionKey that = (CachedRegionKey) o; + return subpartition == that.subpartition && firstBufferIndex == that.firstBufferIndex; + } + + @Override + public int hashCode() { + return Objects.hash(subpartition, firstBufferIndex); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCacheTest.java new file mode 100644 index 00000000000..5e1f79dce32 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexCacheTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.assertRegionEquals; +import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createAllUnreleasedRegions; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link HsFileDataIndexCache}. */ +class HsFileDataIndexCacheTest { + private HsFileDataIndexCache indexCache; + + private TestingFileDataIndexSpilledRegionManager spilledRegionManager; + + private final int numSubpartitions = 1; + + private static final int SPILLED_INDEX_SEGMENT_SIZE = 256; + + private int numRetainedIndexEntry = 10; + + @BeforeEach + void before(@TempDir Path tmpPath) throws Exception { + Path indexFilePath = Files.createFile(tmpPath.resolve(UUID.randomUUID().toString())); + indexCache = + new HsFileDataIndexCache( + numSubpartitions, + indexFilePath, + numRetainedIndexEntry, + TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE); + spilledRegionManager = + TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE + .getLastSpilledRegionManager(); + } + + @Test + void testPutAndGet() { + indexCache.put(0, createAllUnreleasedRegions(0, 0L, 3, 1)); + Optional<InternalRegion> regionOpt = indexCache.get(0, 0); + assertThat(regionOpt) + .hasValueSatisfying( + (region) -> { + assertThat(region.getFirstBufferIndex()).isEqualTo(0); + assertThat(region.getFirstBufferOffset()).isEqualTo(0); + assertThat(region.getNumBuffers()).isEqualTo(3); + }); + } + + @Test + void testCachedRegionRemovedWhenExceedsRetainedEntry(@TempDir Path tmpPath) throws Exception { + numRetainedIndexEntry = 3; + Path indexFilePath = Files.createFile(tmpPath.resolve(UUID.randomUUID().toString())); + indexCache = + new HsFileDataIndexCache( + numSubpartitions, + indexFilePath, + numRetainedIndexEntry, + TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE); + spilledRegionManager = + TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE + .getLastSpilledRegionManager(); + + // region 0, 1, 2 + List<InternalRegion> regionList = createAllUnreleasedRegions(0, 0L, 3, 3); + indexCache.put(0, regionList); + assertThat(spilledRegionManager.getSpilledRegionSize(0)).isZero(); + + // number of regions exceeds numRetainedIndexEntry, trigger cache purge. + indexCache.put(0, createAllUnreleasedRegions(9, 9L, 3, 1)); + assertThat(spilledRegionManager.getSpilledRegionSize(0)).isEqualTo(1); + InternalRegion region = spilledRegionManager.getRegion(0, 0); + assertThat(region).isNotNull(); + assertRegionEquals(region, regionList.get(0)); + // number of regions exceeds numRetainedIndexEntry, trigger cache purge. + indexCache.put(0, createAllUnreleasedRegions(12, 12L, 3, 1)); + assertThat(spilledRegionManager.getSpilledRegionSize(0)).isEqualTo(2); + InternalRegion region2 = spilledRegionManager.getRegion(0, 3); + assertThat(region2).isNotNull(); + assertRegionEquals(region2, regionList.get(1)); + } + + @Test + void testGetNonExistentRegion() { + Optional<InternalRegion> region = indexCache.get(0, 0); + // get a non-existent region. + assertThat(region).isNotPresent(); + } + + @Test + void testCacheLoadSpilledRegion(@TempDir Path tmpPath) throws Exception { + numRetainedIndexEntry = 1; + Path indexFilePath = Files.createFile(tmpPath.resolve(UUID.randomUUID().toString())); + indexCache = + new HsFileDataIndexCache( + numSubpartitions, + indexFilePath, + numRetainedIndexEntry, + TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE); + spilledRegionManager = + TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE + .getLastSpilledRegionManager(); + + indexCache.put(0, createAllUnreleasedRegions(0, 0L, 1, 2)); + assertThat(spilledRegionManager.getSpilledRegionSize(0)).isEqualTo(1); + + assertThat(spilledRegionManager.getFindRegionInvoked()).isZero(); + Optional<InternalRegion> regionOpt = indexCache.get(0, 0); + assertThat(spilledRegionManager.getSpilledRegionSize(0)).isEqualTo(2); + assertThat(regionOpt).isPresent(); + assertThat(spilledRegionManager.getFindRegionInvoked()).isEqualTo(1); + // previously get should already load this region to cache. + assertThat(indexCache.get(0, 0)).isPresent(); + assertThat(spilledRegionManager.getFindRegionInvoked()).isEqualTo(1); + } +}
