xintongsong commented on code in PR #21603: URL: https://github.com/apache/flink/pull/21603#discussion_r1066635610
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImpl.java: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; +import org.apache.flink.util.ExceptionUtils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.TreeMap; +import java.util.function.BiConsumer; + +import static org.apache.flink.runtime.io.network.partition.hybrid.InternalRegionWriteReadUtils.allocateAndConfigureBuffer; + +/** + * Default implementation of {@link HsFileDataIndexSpilledRegionManager}. This manager will handle + * and spill regions in the following way: + * + * <ul> + * <li>All regions will be written to the same file, namely index file. + * <li>Multiple regions belonging to the same subpartition form a segment. + * <li>The regions in the same segment have no special relationship, but are only related to the + * order in which they are spilled. + * <li>Each segment is independent. Even if the previous segment is not full, the next segment can + * still be allocated. + * <li>If a region has been written to the index file already, spill it again will overwrite the + * previous region. + * <li>The very large region will monopolize a single segment. + * </ul> + */ +public class HsFileDataIndexSpilledRegionManagerImpl + implements HsFileDataIndexSpilledRegionManager { + + /** Reusable buffer used to read and write the immutable part of region. */ + private final ByteBuffer immutablePartBuffer = + allocateAndConfigureBuffer(InternalRegion.FIXED_SIZE); + + /** + * List of subpartition's segment meta. Each element is a treeMap contains all {@link + * SegmentMeta}'s of specific subpartition corresponding to the subscript. The value of this + * treeMap is a {@link SegmentMeta}, and the key is minBufferIndex of this segment. Only + * finished(i.e. no longer appended) segment will be put to here. + */ + private final List<TreeMap<Integer, SegmentMeta>> subpartitionFinishedSegmentMetas; + + private FileChannel channel; + + /** The Offset of next segment, new segment will start from this offset. */ + private long nextSegmentOffset = 0L; + + private final long[] subpartitionCurrentOffset; + + /** Free space of every subpartition's current segment. */ + private final int[] subpartitionFreeSpace; + + /** Metadata of every subpartition's current segment. */ + private final SegmentMeta[] currentSegmentMeta; + + /** + * Default size of segment. If the size of a region is larger than this value, it will be + * allocated and occupy a single segment. + */ + private final int segmentSize; + + /** + * This consumer is used to load region to cache. The first parameter is subpartition id, and + * second parameter is the region to load. + */ + private final BiConsumer<Integer, InternalRegion> cacheRegionConsumer; + + public HsFileDataIndexSpilledRegionManagerImpl( + int numSubpartitions, + Path indexFilePath, + int segmentSize, + BiConsumer<Integer, InternalRegion> cacheRegionConsumer) { + try { + this.channel = + FileChannel.open( + indexFilePath, + StandardOpenOption.CREATE_NEW, + StandardOpenOption.READ, + StandardOpenOption.WRITE); + } catch (IOException e) { + ExceptionUtils.rethrow(e); + } + this.subpartitionFinishedSegmentMetas = new ArrayList<>(numSubpartitions); + this.subpartitionCurrentOffset = new long[numSubpartitions]; + this.subpartitionFreeSpace = new int[numSubpartitions]; + this.currentSegmentMeta = new SegmentMeta[numSubpartitions]; + for (int i = 0; i < numSubpartitions; i++) { + subpartitionFinishedSegmentMetas.add(new TreeMap<>()); + } + this.cacheRegionConsumer = cacheRegionConsumer; + this.segmentSize = segmentSize; + } + + @Override + public long findRegion(int subpartition, int bufferIndex, boolean loadToCache) { + // first of all, find the region from current writing segment. + SegmentMeta segmentMeta = currentSegmentMeta[subpartition]; + if (segmentMeta != null) { + long regionOffset = + findRegionInSegment(subpartition, bufferIndex, segmentMeta, loadToCache); + if (regionOffset != -1) { + return regionOffset; + } + } + + // next, find the region from finished segments. + TreeMap<Integer, SegmentMeta> subpartitionSegmentMetaTreeMap = + subpartitionFinishedSegmentMetas.get(subpartition); + // all segments with a minBufferIndex less than or equal to this target buffer index may + // contain the target region. + for (SegmentMeta meta : + subpartitionSegmentMetaTreeMap.headMap(bufferIndex, true).values()) { + long regionOffset = findRegionInSegment(subpartition, bufferIndex, meta, loadToCache); + if (regionOffset != -1) { + return regionOffset; + } + } + return -1; + } + + private long findRegionInSegment( + int subpartition, int bufferIndex, SegmentMeta meta, boolean loadToCache) { + if (bufferIndex < meta.getMaxBufferIndex()) { + try { + // read all regions belong to this segment. + List<Tuple2<InternalRegion, Long>> regionAndOffsets = + readSegment(meta.getOffset(), meta.getNumRegions()); + for (Tuple2<InternalRegion, Long> regionAndOffset : regionAndOffsets) { + InternalRegion region = regionAndOffset.f0; + // whether the region contains this buffer. + if (region.containBuffer(bufferIndex)) { + // target region is founded. + if (loadToCache) { + // load this region to cache if needed. + cacheRegionConsumer.accept(subpartition, region); + } + // return the offset of target region. + return regionAndOffset.f1; + } + } Review Comment: Maybe we should cache all the loaded regions. The IO price is already paid. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
